Skip to content
Snippets Groups Projects
Verified Commit e1391ed5 authored by Yorick Peterse's avatar Yorick Peterse
Browse files

Use separate queues for all Sidekiq workers

Dumping too many jobs in the same queue (e.g. the "default" queue) is a
dangerous setup. Jobs that take a long time to process can effectively
block any other work from being performed given there are enough of
these jobs.

Furthermore it becomes harder to monitor the jobs as a single queue
could contain jobs for different workers. In such a setup the only
reliable way of getting counts per job is to iterate over all jobs in a
queue, which is a rather time consuming process.

Finally, using separate queues allows us to throttle and prioritize
processing of these queues in the future. For example, the jobs for the
MergeWorker are more important than for example a job used to update
some statistics. Putting all jobs in the same queue would not allow for
this.

This commit includes a Rails migration that moves Sidekiq jobs from the
old queues to the new ones. This migration also takes care of doing the
inverse if ever needed. This does require downtime as otherwise new jobs
could be scheduled in the old queues after this migration completes.

Fixes gitlab-org/gitlab-ce#23370
parent 33127207
No related branches found
No related tags found
No related merge requests found
Pipeline #
Showing
with 25 additions and 12 deletions
Loading
Loading
@@ -3,6 +3,7 @@ Please view this file on the master branch, on stable branches it's out of date.
## 8.13.0 (2016-10-22)
 
- Fix save button on project pipeline settings page. (!6955)
- All Sidekiq workers now use their own queue
- Avoid race condition when asynchronously removing expired artifacts. (!6881)
- Improve Merge When Build Succeeds triggers and execute on pipeline success. (!6675)
- Respond with 404 Not Found for non-existent tags (Linus Thiel)
Loading
Loading
class AdminEmailWorker
include Sidekiq::Worker
 
sidekiq_options retry: false # this job auto-repeats via sidekiq-cron
sidekiq_options retry: false, queue: :admin_email # this job auto-repeats via sidekiq-cron
 
def perform
repository_check_failed_count = Project.where(last_repository_check_failed: true).count
Loading
Loading
class BuildCoverageWorker
include Sidekiq::Worker
sidekiq_options queue: :default
sidekiq_options queue: :build_coverage
 
def perform(build_id)
Ci::Build.find_by(id: build_id)
Loading
Loading
class BuildEmailWorker
include Sidekiq::Worker
 
sidekiq_options queue: :build_emails
def perform(build_id, recipients, push_data)
recipients.each do |recipient|
begin
Loading
Loading
class BuildFinishedWorker
include Sidekiq::Worker
 
sidekiq_options queue: :finished_builds
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
BuildCoverageWorker.new.perform(build.id)
Loading
Loading
class BuildHooksWorker
include Sidekiq::Worker
sidekiq_options queue: :default
sidekiq_options queue: :build_hooks
 
def perform(build_id)
Ci::Build.find_by(id: build_id)
Loading
Loading
class BuildSuccessWorker
include Sidekiq::Worker
sidekiq_options queue: :default
sidekiq_options queue: :successful_builds
 
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
Loading
Loading
# This worker clears all cache fields in the database, working in batches.
class ClearDatabaseCacheWorker
include Sidekiq::Worker
sidekiq_options queue: :clear_database_cache
 
BATCH_SIZE = 1000
 
Loading
Loading
class DeleteUserWorker
include Sidekiq::Worker
sidekiq_options queue: :delete_user
 
def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id)
Loading
Loading
class ExpireBuildArtifactsWorker
include Sidekiq::Worker
 
sidekiq_options queue: :expire_build_artifacts
def perform
Rails.logger.info 'Scheduling removal of build artifacts'
 
Loading
Loading
class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker
 
sidekiq_options queue: :expire_build_instance_artifacts
def perform(build_id)
build = Ci::Build
.with_expired_artifacts
Loading
Loading
Loading
Loading
@@ -2,7 +2,7 @@ class GitGarbageCollectWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
 
sidekiq_options queue: :gitlab_shell, retry: false
sidekiq_options queue: :git_garbage_collection, retry: false
 
def perform(project_id)
project = Project.find(project_id)
Loading
Loading
class GroupDestroyWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :delete_group
 
def perform(group_id, user_id)
begin
Loading
Loading
class ImportExportProjectCleanupWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :import_export_project_cleanup
 
def perform
ImportExportCleanUpService.new.execute
Loading
Loading
Loading
Loading
@@ -4,6 +4,8 @@ require 'socket'
class IrkerWorker
include Sidekiq::Worker
 
sidekiq_options queue: :irker
def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id)
 
Loading
Loading
class MergeWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :merges
 
def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access
Loading
Loading
class NewNoteWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :new_note
 
def perform(note_id, note_params)
note = Note.find(note_id)
Loading
Loading
class PipelineHooksWorker
include Sidekiq::Worker
sidekiq_options queue: :default
sidekiq_options queue: :pipeline_hooks
 
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
Loading
Loading
class PipelineMetricsWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :pipeline_metrics
 
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
Loading
Loading
class PipelineProcessWorker
include Sidekiq::Worker
 
sidekiq_options queue: :default
sidekiq_options queue: :process_pipeline
 
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment