Skip to content
Snippets Groups Projects
Commit a179e057 authored by Douwe Maan's avatar Douwe Maan Committed by Jose Ivan Vargas Lopez
Browse files

Merge branch '3108-improve-handling-of-stuck-mirrors' into 'master'

Improves handling of stuck projects

Closes #3108

See merge request !2628
parent 61653a64
No related branches found
No related tags found
1 merge request!2698WIP: 9.5.0 RC5 EE
Showing
with 197 additions and 150 deletions
Loading
Loading
@@ -154,8 +154,6 @@ end
 
# State machine
gem 'state_machines-activerecord', '~> 0.4.0'
# Run events after state machine commits
gem 'after_commit_queue', '~> 1.3.0'
 
# Issue tags
gem 'acts-as-taggable-on', '~> 4.0'
Loading
Loading
Loading
Loading
@@ -42,8 +42,6 @@ GEM
acts-as-taggable-on (4.0.0)
activerecord (>= 4.0)
addressable (2.3.8)
after_commit_queue (1.3.0)
activerecord (>= 3.0)
akismet (2.0.0)
allocations (1.0.5)
arel (6.0.4)
Loading
Loading
@@ -959,7 +957,6 @@ DEPENDENCIES
activerecord_sane_schema_dumper (= 0.2)
acts-as-taggable-on (~> 4.0)
addressable (~> 2.3.8)
after_commit_queue (~> 1.3.0)
akismet (~> 2.0)
allocations (~> 1.0)
asana (~> 0.6.0)
Loading
Loading
Loading
Loading
@@ -369,7 +369,10 @@ def self.with_feature_available_for_user(feature, user)
state :failed
 
after_transition [:none, :finished, :failed] => :scheduled do |project, _|
project.run_after_commit { add_import_job }
project.run_after_commit do
job_id = add_import_job
update(import_jid: job_id) if job_id
end
end
 
after_transition started: :finished do |project, _|
Loading
Loading
@@ -524,17 +527,26 @@ def saved?
def add_import_job
job_id =
if forked?
RepositoryForkWorker.perform_async(id, forked_from_project.repository_storage_path,
forked_from_project.full_path,
self.namespace.full_path)
RepositoryForkWorker.perform_async(id,
forked_from_project.repository_storage_path,
forked_from_project.full_path,
self.namespace.full_path)
else
RepositoryImportWorker.perform_async(self.id)
end
 
log_import_activity(job_id)
job_id
end
def log_import_activity(job_id, type: :import)
job_type = type.to_s.capitalize
if job_id
Rails.logger.info "Import job started for #{full_path} with job ID #{job_id}"
Rails.logger.info("#{job_type} job scheduled for #{full_path} with job ID #{job_id}.")
else
Rails.logger.error "Import job failed to start for #{full_path}"
Rails.logger.error("#{job_type} job failed to create for #{full_path}.")
end
end
 
Loading
Loading
@@ -543,6 +555,7 @@ def reset_cache_and_import_attrs
ProjectCacheWorker.perform_async(self.id)
end
 
update(import_error: nil)
remove_import_data
end
 
Loading
Loading
Loading
Loading
@@ -5,14 +5,17 @@ class RepositoryForkWorker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
 
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
project = Project.find(project_id)
return unless start_fork(project)
Gitlab::Metrics.add_event(:fork_repository,
source_path: source_path,
target_path: target_path)
 
project = Project.find(project_id)
project.import_start
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path,
project.repository_storage_path, target_path)
raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result
Loading
Loading
@@ -33,6 +36,13 @@ def perform(project_id, forked_from_repository_storage_path, source_path, target
 
private
 
def start_fork(project)
return true if project.import_start
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
false
end
def fail_fork(project, message)
Rails.logger.error(message)
project.mark_import_as_failed(message)
Loading
Loading
Loading
Loading
@@ -4,23 +4,18 @@ class RepositoryImportWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
 
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
attr_accessor :project, :current_user
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
 
def perform(project_id)
@project = Project.find(project_id)
@current_user = @project.creator
project = Project.find(project_id)
 
project.import_start
return unless start_import(project)
 
Gitlab::Metrics.add_event(:import_repository,
import_url: @project.import_url,
path: @project.full_path)
project.update_columns(import_jid: self.jid, import_error: nil)
import_url: project.import_url,
path: project.full_path)
 
result = Projects::ImportService.new(project, current_user).execute
result = Projects::ImportService.new(project, project.creator).execute
raise ImportError, result[:message] if result[:status] == :error
 
project.repository.after_import
Loading
Loading
@@ -41,6 +36,13 @@ def perform(project_id)
 
private
 
def start_import(project)
return true if project.import_start
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
false
end
def fail_import(project, message)
project.mark_import_as_failed(message)
end
Loading
Loading
class RepositoryUpdateMirrorWorker
UpdateError = Class.new(StandardError)
UpdateAlreadyInProgressError = Class.new(StandardError)
 
include Sidekiq::Worker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
 
# Retry not neccessary. It will try again at the next update interval.
sidekiq_options retry: false
sidekiq_options retry: false, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
 
attr_accessor :project, :repository, :current_user
 
def perform(project_id)
project = Project.find(project_id)
 
raise UpdateAlreadyInProgressError if project.import_started?
start_mirror(project)
return unless start_mirror(project)
 
@current_user = project.mirror_user || project.creator
 
Loading
Loading
@@ -23,8 +21,6 @@ def perform(project_id)
raise UpdateError, result[:message] if result[:status] == :error
 
finish_mirror(project)
rescue UpdateAlreadyInProgressError
raise
rescue UpdateError => ex
fail_mirror(project, ex.message)
raise
Loading
Loading
@@ -40,10 +36,15 @@ def perform(project_id)
private
 
def start_mirror(project)
project.import_start
Gitlab::Mirror.increment_metric(:mirrors_running, 'Mirrors running count')
Rails.logger.info("Mirror update for #{project.full_path} started. Waiting duration: #{project.mirror_waiting_duration}")
if project.import_start
Gitlab::Mirror.increment_metric(:mirrors_running, 'Mirrors running count')
Rails.logger.info("Mirror update for #{project.full_path} started. Waiting duration: #{project.mirror_waiting_duration}")
true
else
Rails.logger.info("Project #{project.full_path} was in inconsistent state: #{project.import_status}")
false
end
end
 
def fail_mirror(project, message)
Loading
Loading
Loading
Loading
@@ -2,36 +2,60 @@ class StuckImportJobsWorker
include Sidekiq::Worker
include CronjobQueue
 
IMPORT_EXPIRATION = 15.hours.to_i
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
 
def perform
stuck_projects.find_in_batches(batch_size: 500) do |group|
projects_without_jid_count = mark_projects_without_jid_as_failed!
projects_with_jid_count = mark_projects_with_jid_as_failed!
Gitlab::Metrics.add_event(:stuck_import_jobs,
projects_without_jid_count: projects_without_jid_count,
projects_with_jid_count: projects_with_jid_count)
end
private
def mark_projects_without_jid_as_failed!
started_projects_without_jid.each do |project|
project.mark_import_as_failed(error_message)
end.count
end
def mark_projects_with_jid_as_failed!
completed_jids_count = 0
started_projects_with_jid.find_in_batches(batch_size: 500) do |group|
jids = group.map(&:import_jid)
 
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set
 
if completed_jids.any?
completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
completed_jids_count += completed_jids.count
group.each do |project|
project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid)
end
 
fail_batch!(completed_jids, completed_ids)
Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}")
end
end
end
 
private
completed_jids_count
end
 
def stuck_projects
Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
def started_projects
Project.with_import_status(:started)
end
 
def fail_batch!(completed_jids, completed_ids)
Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
def started_projects_with_jid
started_projects.where.not(import_jid: nil)
end
 
Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
def started_projects_without_jid
started_projects.where(import_jid: nil)
end
 
def error_message
"Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
"Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds"
end
end
Loading
Loading
@@ -9,19 +9,11 @@ def perform
lease_uuid = try_obtain_lease
return unless lease_uuid
 
fail_stuck_mirrors!
schedule_mirrors!
 
cancel_lease(lease_uuid)
end
 
def fail_stuck_mirrors!
Project.stuck_mirrors.find_each(batch_size: 50) do |project|
project.mark_import_as_failed('The mirror update took too long to complete.')
end
end
def schedule_mirrors!
capacity = batch_size = Gitlab::Mirror.available_capacity
 
Loading
Loading
---
title: Improves handling of stuck imports.
merge_request: 2628
author:
Loading
Loading
@@ -42,12 +42,6 @@ module Project
 
scope :with_shared_runners_limit_enabled, -> { with_shared_runners.non_public_only }
 
scope :stuck_mirrors, -> do
mirror.joins(:mirror_data)
.where("(import_status = 'started' AND project_mirror_data.last_update_started_at < :limit) OR (import_status = 'scheduled' AND project_mirror_data.last_update_scheduled_at < :limit)",
{ limit: 20.minutes.ago })
end
scope :mirror, -> { where(mirror: true) }
scope :with_remote_mirrors, -> { joins(:remote_mirrors).where(remote_mirrors: { enabled: true }).distinct }
scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) }
Loading
Loading
@@ -214,9 +208,11 @@ def add_import_job
super
elsif mirror?
::Gitlab::Mirror.increment_metric(:mirrors_scheduled, 'Mirrors scheduled count')
Rails.logger.info("Mirror update for #{full_path} was scheduled.")
job_id = RepositoryUpdateMirrorWorker.perform_async(self.id)
log_import_activity(job_id, type: :mirror)
 
RepositoryUpdateMirrorWorker.perform_async(self.id)
job_id
end
end
 
Loading
Loading
module AfterCommitQueue
extend ActiveSupport::Concern
included do
after_commit :_run_after_commit_queue
after_rollback :_clear_after_commit_queue
end
def run_after_commit(method = nil, &block)
_after_commit_queue << proc { self.send(method) } if method
_after_commit_queue << block if block
true
end
protected
def _run_after_commit_queue
while action = _after_commit_queue.pop
self.instance_eval(&action)
end
end
def _after_commit_queue
@after_commit_queue ||= []
end
def _clear_after_commit_queue
_after_commit_queue.clear
end
end
Loading
Loading
@@ -90,9 +90,14 @@ def self.job_status(job_ids)
#
# Returns an array of completed JIDs
def self.completed_jids(job_ids)
Sidekiq.redis do |redis|
job_ids.reject { |jid| redis.exists(key_for(jid)) }
statuses = job_status(job_ids)
completed = []
job_ids.zip(statuses).each do |job_id, status|
completed << job_id unless status
end
completed
end
 
def self.key_for(jid)
Loading
Loading
require 'spec_helper'
describe AfterCommitQueue do
it 'runs after transaction is committed' do
called = false
test_proc = proc { called = true }
project = build(:project)
project.run_after_commit(&test_proc)
project.save
expect(called).to be true
end
end
Loading
Loading
@@ -1813,8 +1813,7 @@ def create_pipeline
it 'imports a project' do
expect_any_instance_of(RepositoryImportWorker).to receive(:perform).and_call_original
 
project.import_schedule
expect { project.import_schedule }.to change { project.import_jid }
expect(project.reload.import_status).to eq('finished')
end
 
Loading
Loading
@@ -1826,7 +1825,7 @@ def create_pipeline
expect_any_instance_of(EE::Project).to receive(:force_import_job!)
expect_any_instance_of(RepositoryImportWorker).to receive(:perform).with(project.id).and_call_original
 
project.import_schedule
expect { project.import_schedule }.to change { project.import_jid }
end
end
end
Loading
Loading
@@ -1839,6 +1838,13 @@ def create_pipeline
allow(Projects::HousekeepingService).to receive(:new) { housekeeping_service }
end
 
it 'resets project import_error' do
error_message = 'Some error'
mirror = create(:project_empty_repo, :import_started, import_error: error_message)
expect { mirror.import_finish }.to change { mirror.import_error }.from(error_message).to(nil)
end
it 'performs housekeeping when an import of a fresh project is completed' do
project = create(:project_empty_repo, :import_started, import_type: :github)
 
Loading
Loading
@@ -2047,24 +2053,28 @@ def create_build(new_pipeline = pipeline, name = 'test')
end
 
describe '#add_import_job' do
let!(:import_jid) { '123' }
context 'forked' do
let(:forked_project_link) { create(:forked_project_link, :forked_to_empty_project) }
let(:forked_from_project) { forked_project_link.forked_from_project }
let(:project) { forked_project_link.forked_to_project }
 
it 'schedules a RepositoryForkWorker job' do
expect(RepositoryForkWorker).to receive(:perform_async)
.with(project.id, forked_from_project.repository_storage_path,
forked_from_project.disk_path, project.namespace.full_path)
expect(RepositoryForkWorker).to receive(:perform_async).with(
project.id,
forked_from_project.repository_storage_path,
forked_from_project.disk_path,
project.namespace.full_path).and_return(import_jid)
 
project.add_import_job
expect(project.add_import_job).to eq(import_jid)
end
 
context 'without mirror' do
it 'returns nil' do
project = create(:project)
 
expect(project.add_import_job).to be_nil
expect(project.add_import_job).to be nil
end
end
 
Loading
Loading
@@ -2072,9 +2082,8 @@ def create_build(new_pipeline = pipeline, name = 'test')
it 'schedules RepositoryImportWorker' do
project = create(:project, import_url: generate(:url))
 
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id)
project.add_import_job
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id).and_return(import_jid)
expect(project.add_import_job).to eq(import_jid)
end
end
 
Loading
Loading
@@ -2082,9 +2091,8 @@ def create_build(new_pipeline = pipeline, name = 'test')
it 'schedules RepositoryUpdateMirrorWorker' do
project = create(:project, :mirror, :repository)
 
expect(RepositoryUpdateMirrorWorker).to receive(:perform_async).with(project.id)
project.add_import_job
expect(RepositoryUpdateMirrorWorker).to receive(:perform_async).with(project.id).and_return(import_jid)
expect(project.add_import_job).to eq(import_jid)
end
end
end
Loading
Loading
@@ -2093,9 +2101,8 @@ def create_build(new_pipeline = pipeline, name = 'test')
it 'schedules a RepositoryImportWorker job' do
project = create(:project, import_url: generate(:url))
 
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id)
project.add_import_job
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id).and_return(import_jid)
expect(project.add_import_job).to eq(import_jid)
end
end
end
Loading
Loading
Loading
Loading
@@ -35,8 +35,8 @@
it 'hide the credentials that were used in the import URL' do
error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found }
 
project.update_attributes(import_jid: '123')
expect_any_instance_of(Projects::ImportService).to receive(:execute).and_return({ status: :error, message: error })
allow(subject).to receive(:jid).and_return('123')
 
expect do
subject.perform(project.id)
Loading
Loading
Loading
Loading
@@ -26,12 +26,10 @@
end
 
context 'with another worker already running' do
it 'raises UpdateAlreadyInProgressError' do
it 'returns nil' do
mirror = create(:project, :repository, :mirror, :import_started)
 
expect do
subject.perform(mirror.id)
end.to raise_error(RepositoryUpdateMirrorWorker::UpdateAlreadyInProgressError)
expect(subject.perform(mirror.id)).to be nil
end
end
 
Loading
Loading
Loading
Loading
@@ -8,29 +8,29 @@
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(exclusive_lease_uuid)
end
 
describe 'long running import' do
let(:project) { create(:project, import_jid: '123', import_status: 'started') }
describe 'with started import_status' do
let(:project) { create(:project, :import_started, import_jid: '123') }
 
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
end
describe 'long running import' do
it 'marks the project as failed' do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
 
it 'marks the project as failed' do
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
end
end
end
 
describe 'running import' do
let(:project) { create(:project, import_jid: '123', import_status: 'started') }
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
describe 'running import' do
it 'does not mark the project as failed' do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
 
it 'does not mark the project as failed' do
worker.perform
expect { worker.perform }.not_to change { project.reload.import_status }
end
 
expect(project.reload.import_status).to eq('started')
describe 'import without import_jid' do
it 'marks the project as failed' do
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
end
end
end
end
end
Loading
Loading
@@ -8,12 +8,6 @@
end
 
describe '#perform' do
it 'fails stuck mirrors' do
expect(worker).to receive(:fail_stuck_mirrors!)
worker.perform
end
it 'does not execute if cannot get the lease' do
create(:project, :mirror)
 
Loading
Loading
@@ -31,45 +25,6 @@
end
end
 
describe '#fail_stuck_mirrors!' do
delegate :fail_stuck_mirrors!, to: :worker
it 'ignores records that are not mirrors' do
create(:project, :import_started, mirror_last_update_at: 12.hours.ago)
expect_any_instance_of(Project).not_to receive(:import_fail)
fail_stuck_mirrors!
end
it 'ignores records without in-progress import' do
create(:project, :mirror, :import_finished, mirror_last_update_at: 12.hours.ago)
expect_any_instance_of(Project).not_to receive(:import_fail)
fail_stuck_mirrors!
end
it 'ignores records with recently updated mirrors' do
create(:project, :mirror, mirror_last_update_at: Time.now)
expect_any_instance_of(Project).not_to receive(:import_fail)
fail_stuck_mirrors!
end
it 'transitions stuck mirrors to a failed state and updates import_error message' do
project = create(:project, :mirror, :import_started)
project.mirror_data.update_attributes(last_update_started_at: 25.minutes.ago)
fail_stuck_mirrors!
project.reload
expect(project).to be_import_failed
expect(project.reload.import_error).to eq 'The mirror update took too long to complete.'
end
end
describe '#schedule_mirrors!' do
def schedule_mirrors!(capacity:)
allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity)
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment