diff --git a/lib/gitlab/background_migration.rb b/lib/gitlab/background_migration.rb index d95ecd7b291278fa887595eaf0f8a4dbab4f0189..b0741b1fba743594710f851114c2b5b54b608d4b 100644 --- a/lib/gitlab/background_migration.rb +++ b/lib/gitlab/background_migration.rb @@ -1,24 +1,45 @@ module Gitlab module BackgroundMigration + def self.queue + @queue ||= BackgroundMigrationWorker.sidekiq_options['queue'] + end + # Begins stealing jobs from the background migrations queue, blocking the # caller until all jobs have been completed. # + # When a migration raises a StandardError is is going to be retries up to + # three times, for example, to recover from a deadlock. + # + # When Exception is being raised, it enqueues the migration again, and + # re-raises the exception. + # # steal_class - The name of the class for which to steal jobs. def self.steal(steal_class) - queue = Sidekiq::Queue - .new(BackgroundMigrationWorker.sidekiq_options['queue']) + enqueued = Sidekiq::Queue.new(self.queue) + scheduled = Sidekiq::ScheduledSet.new - queue.each do |job| - migration_class, migration_args = job.args + [scheduled, enqueued].each do |queue| + queue.each do |job| + migration_class, migration_args = job.args - next unless migration_class == steal_class + next unless job.queue == self.queue + next unless migration_class == steal_class - perform(migration_class, migration_args) + begin + perform(migration_class, migration_args, retries: 3) if job.delete + rescue Exception # rubocop:disable Lint/RescueException + BackgroundMigrationWorker # enqueue this migration again + .perform_async(migration_class, migration_args) - job.delete + raise + end + end end end + ## + # Performs a background migration. + # # class_name - The name of the background migration class as defined in the # Gitlab::BackgroundMigration namespace. # diff --git a/spec/lib/gitlab/background_migration_spec.rb b/spec/lib/gitlab/background_migration_spec.rb index 64f82fe27b2adb11fbedd56936ad6a5552dbec6a..cfa59280139c718aaabf511314b7b8089aa231af 100644 --- a/spec/lib/gitlab/background_migration_spec.rb +++ b/spec/lib/gitlab/background_migration_spec.rb @@ -1,46 +1,120 @@ require 'spec_helper' describe Gitlab::BackgroundMigration do + describe '.queue' do + it 'returns background migration worker queue' do + expect(described_class.queue) + .to eq BackgroundMigrationWorker.sidekiq_options['queue'] + end + end + describe '.steal' do - it 'steals jobs from a queue' do - queue = [double(:job, args: ['Foo', [10, 20]])] + context 'when there are enqueued jobs present' do + let(:queue) do + [double(args: ['Foo', [10, 20]], queue: described_class.queue)] + end + + before do + allow(Sidekiq::Queue).to receive(:new) + .with(described_class.queue) + .and_return(queue) + end + + context 'when queue contains unprocessed jobs' do + it 'steals jobs from a queue' do + expect(queue[0]).to receive(:delete).and_return(true) + + expect(described_class).to receive(:perform) + .with('Foo', [10, 20], anything) + + described_class.steal('Foo') + end + + it 'does not steal job that has already been taken' do + expect(queue[0]).to receive(:delete).and_return(false) + + expect(described_class).not_to receive(:perform) + + described_class.steal('Foo') + end + + it 'does not steal jobs for a different migration' do + expect(described_class).not_to receive(:perform) - allow(Sidekiq::Queue).to receive(:new) - .with(BackgroundMigrationWorker.sidekiq_options['queue']) - .and_return(queue) + expect(queue[0]).not_to receive(:delete) - expect(queue[0]).to receive(:delete) + described_class.steal('Bar') + end + end - expect(described_class).to receive(:perform).with('Foo', [10, 20]) + context 'when one of the jobs raises an error' do + let(:migration) { spy(:migration) } - described_class.steal('Foo') + let(:queue) do + [double(args: ['Foo', [10, 20]], queue: described_class.queue), + double(args: ['Foo', [20, 30]], queue: described_class.queue)] + end + + before do + stub_const("#{described_class}::Foo", migration) + + allow(queue[0]).to receive(:delete).and_return(true) + allow(queue[1]).to receive(:delete).and_return(true) + end + + it 'enqueues the migration again and re-raises the error' do + allow(migration).to receive(:perform).with(10, 20) + .and_raise(Exception, 'Migration error').once + + expect(BackgroundMigrationWorker).to receive(:perform_async) + .with('Foo', [10, 20]).once + + expect { described_class.steal('Foo') }.to raise_error(Exception) + end + end end - it 'does not steal jobs for a different migration' do - queue = [double(:job, args: ['Foo', [10, 20]])] + context 'when there are scheduled jobs present', :sidekiq, :redis do + it 'steals all jobs from the scheduled sets' do + Sidekiq::Testing.disable! do + BackgroundMigrationWorker.perform_in(10.minutes, 'Object') - allow(Sidekiq::Queue).to receive(:new) - .with(BackgroundMigrationWorker.sidekiq_options['queue']) - .and_return(queue) + expect(Sidekiq::ScheduledSet.new).to be_one + expect(described_class).to receive(:perform).with('Object', any_args) - expect(described_class).not_to receive(:perform) + described_class.steal('Object') - expect(queue[0]).not_to receive(:delete) + expect(Sidekiq::ScheduledSet.new).to be_none + end + end + end - described_class.steal('Bar') + context 'when there are enqueued and scheduled jobs present', :sidekiq, :redis do + it 'steals from the scheduled sets queue first' do + Sidekiq::Testing.disable! do + expect(described_class).to receive(:perform) + .with('Object', [1], anything).ordered + expect(described_class).to receive(:perform) + .with('Object', [2], anything).ordered + + BackgroundMigrationWorker.perform_async('Object', [2]) + BackgroundMigrationWorker.perform_in(10.minutes, 'Object', [1]) + + described_class.steal('Object') + end + end end end describe '.perform' do - it 'performs a background migration' do - instance = double(:instance) - klass = double(:klass, new: instance) + let(:migration) { spy(:migration) } - expect(described_class).to receive(:const_get) - .with('Foo') - .and_return(klass) + before do + stub_const("#{described_class.name}::Foo", migration) + end - expect(instance).to receive(:perform).with(10, 20) + it 'performs a background migration' do + expect(migration).to receive(:perform).with(10, 20).once described_class.perform('Foo', [10, 20]) end diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb index 5478fea4e64a64cab3408d218aa926a2f929f12e..d143014692da3ba6c15554263c788366fd1dad67 100644 --- a/spec/support/sidekiq.rb +++ b/spec/support/sidekiq.rb @@ -8,4 +8,8 @@ RSpec.configure do |config| config.after(:each, :sidekiq) do Sidekiq::Worker.clear_all end + + config.after(:each, :sidekiq, :redis) do + Sidekiq.redis { |redis| redis.flushdb } + end end