Skip to content
Snippets Groups Projects
Commit 0529fd43 authored by Stan Hu's avatar Stan Hu
Browse files

Merge branch 'georgekoltsov/add-context-information-to-bulk-import-workers' into 'master'

Add structured payload to BulkImport workers

See merge request gitlab-org/gitlab!85208
parents e893d88a 48a89ee1
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -12,12 +12,24 @@ module BulkImports
worker_has_external_dependencies!
 
def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage)
if stage_running?(entity_id, current_stage)
logger.info(
structured_payload(
entity_id: entity_id,
current_stage: current_stage,
message: 'Stage running'
)
)
return
end
 
logger.info(
worker: self.class.name,
entity_id: entity_id,
current_stage: current_stage
structured_payload(
entity_id: entity_id,
current_stage: current_stage,
message: 'Stage starting'
)
)
 
next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
Loading
Loading
@@ -29,10 +41,11 @@ module BulkImports
end
rescue StandardError => e
logger.error(
worker: self.class.name,
entity_id: entity_id,
current_stage: current_stage,
error_message: e.message
structured_payload(
entity_id: entity_id,
current_stage: current_stage,
message: e.message
)
)
 
Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
Loading
Loading
Loading
Loading
@@ -42,10 +42,12 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
 
Gitlab::Import::Logger.warn(
attributes.merge(
bulk_import_id: entity.bulk_import.id,
bulk_import_entity_type: entity.source_type
Gitlab::Import::Logger.error(
structured_payload(
attributes.merge(
bulk_import_id: entity.bulk_import.id,
bulk_import_entity_type: entity.source_type
)
)
)
 
Loading
Loading
Loading
Loading
@@ -18,18 +18,20 @@ module BulkImports
 
if pipeline_tracker.present?
logger.info(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name
structured_payload(
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
)
 
run(pipeline_tracker)
else
logger.error(
worker: self.class.name,
entity_id: entity_id,
pipeline_tracker_id: pipeline_tracker_id,
message: 'Unstarted pipeline not found'
structured_payload(
entity_id: entity_id,
pipeline_tracker_id: pipeline_tracker_id,
message: 'Unstarted pipeline not found'
)
)
end
 
Loading
Loading
@@ -63,10 +65,11 @@ module BulkImports
rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker)
logger.error(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: "Retrying error: #{e.message}"
structured_payload(
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: "Retrying error: #{e.message}"
)
)
 
pipeline_tracker.update!(status_event: 'retry', jid: jid)
Loading
Loading
@@ -83,10 +86,11 @@ module BulkImports
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
 
logger.error(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: exception.message
structured_payload(
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: exception.message
)
)
 
Gitlab::ErrorTracking.track_exception(
Loading
Loading
Loading
Loading
@@ -36,9 +36,11 @@ RSpec.describe BulkImports::EntityWorker do
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
hash_including(
'entity_id' => entity.id,
'current_stage' => nil,
'message' => 'Stage starting'
)
)
end
 
Loading
Loading
@@ -58,24 +60,26 @@ RSpec.describe BulkImports::EntityWorker do
 
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.and_raise(exception)
.and_raise(exception)
 
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
hash_including(
'entity_id' => entity.id,
'current_stage' => nil
)
)
 
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil,
error_message: 'Error!'
hash_including(
'entity_id' => entity.id,
'current_stage' => nil,
'message' => 'Error!'
)
)
end
 
Loading
Loading
@@ -90,6 +94,18 @@ RSpec.describe BulkImports::EntityWorker do
let(:job_args) { [entity.id, 0] }
 
it 'do not enqueue a new pipeline job if the current stage still running' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
hash_including(
'entity_id' => entity.id,
'current_stage' => 0,
'message' => 'Stage running'
)
)
end
expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
 
Loading
Loading
@@ -110,9 +126,10 @@ RSpec.describe BulkImports::EntityWorker do
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: 0
hash_including(
'entity_id' => entity.id,
'current_stage' => 0
)
)
end
 
Loading
Loading
Loading
Loading
@@ -35,14 +35,16 @@ RSpec.describe BulkImports::ExportRequestWorker do
expect(client).to receive(:post).and_raise(BulkImports::NetworkError, 'Export error').twice
end
 
expect(Gitlab::Import::Logger).to receive(:warn).with(
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
exception_class: 'BulkImports::NetworkError',
exception_message: 'Export error',
correlation_id_value: anything,
bulk_import_id: bulk_import.id,
bulk_import_entity_type: entity.source_type
expect(Gitlab::Import::Logger).to receive(:error).with(
hash_including(
'bulk_import_entity_id' => entity.id,
'pipeline_class' => 'ExportRequestWorker',
'exception_class' => 'BulkImports::NetworkError',
'exception_message' => 'Export error',
'correlation_id_value' => anything,
'bulk_import_id' => bulk_import.id,
'bulk_import_entity_type' => entity.source_type
)
).twice
 
perform_multiple(job_args)
Loading
Loading
Loading
Loading
@@ -34,9 +34,10 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
pipeline_name: 'FakePipeline',
entity_id: entity.id
hash_including(
'pipeline_name' => 'FakePipeline',
'entity_id' => entity.id
)
)
end
 
Loading
Loading
@@ -44,7 +45,7 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
 
expect(subject).to receive(:jid).and_return('jid')
allow(subject).to receive(:jid).and_return('jid')
 
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
 
Loading
Loading
@@ -79,10 +80,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_tracker_id: pipeline_tracker.id,
entity_id: entity.id,
message: 'Unstarted pipeline not found'
hash_including(
'pipeline_tracker_id' => pipeline_tracker.id,
'entity_id' => entity.id,
'message' => 'Unstarted pipeline not found'
)
)
end
 
Loading
Loading
@@ -107,10 +109,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'InexistentPipeline',
entity_id: entity.id,
message: "'InexistentPipeline' is not a valid BulkImport Pipeline"
hash_including(
'pipeline_name' => 'InexistentPipeline',
'entity_id' => entity.id,
'message' => "'InexistentPipeline' is not a valid BulkImport Pipeline"
)
)
end
 
Loading
Loading
@@ -126,7 +129,7 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
 
expect(subject).to receive(:jid).and_return('jid')
allow(subject).to receive(:jid).and_return('jid')
 
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
 
Loading
Loading
@@ -151,10 +154,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'Pipeline',
entity_id: entity.id,
message: 'Failed entity status'
hash_including(
'pipeline_name' => 'Pipeline',
'entity_id' => entity.id,
'message' => 'Failed entity status'
)
)
end
 
Loading
Loading
@@ -183,7 +187,7 @@ RSpec.describe BulkImports::PipelineWorker do
.and_raise(exception)
end
 
expect(subject).to receive(:jid).and_return('jid').twice
allow(subject).to receive(:jid).and_return('jid')
 
expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original
Loading
Loading
@@ -193,9 +197,10 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
pipeline_name: 'FakePipeline',
entity_id: entity.id
hash_including(
'pipeline_name' => 'FakePipeline',
'entity_id' => entity.id
)
)
end
 
Loading
Loading
@@ -292,10 +297,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Pipeline timeout'
hash_including(
'pipeline_name' => 'NdjsonPipeline',
'entity_id' => entity.id,
'message' => 'Pipeline timeout'
)
)
end
 
Loading
Loading
@@ -318,10 +324,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Error!'
hash_including(
'pipeline_name' => 'NdjsonPipeline',
'entity_id' => entity.id,
'message' => 'Error!'
)
)
end
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment