Skip to content
Snippets Groups Projects
Unverified Commit 5e7f54b8 authored by Stan Hu's avatar Stan Hu Committed by GitLab
Browse files

Merge branch 'sc1-optimize-concurrency-tracking' into 'master'

Track worker concurrency using Redis hash - 2nd try

See merge request https://gitlab.com/gitlab-org/gitlab/-/merge_requests/169561



Merged-by: default avatarStan Hu <stanhu@gmail.com>
Approved-by: default avatarDmitry Gruzd <dgruzd@gitlab.com>
Approved-by: default avatarStan Hu <stanhu@gmail.com>
Reviewed-by: default avatarDmitry Gruzd <dgruzd@gitlab.com>
Reviewed-by: default avatarSylvester Chin <schin@gitlab.com>
Co-authored-by: default avatarSylvester Chin <schin@gitlab.com>
parents c074d05e aab064e9
No related branches found
No related tags found
No related merge requests found
Showing
with 833 additions and 239 deletions
Loading
@@ -7,7 +7,7 @@ class ResumeWorker
Loading
@@ -7,7 +7,7 @@ class ResumeWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- There is no onward scheduling and this cron handles work from across the include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add. # application, so there's no useful context to add.
   
DEFAULT_LIMIT = 1_000 BATCH_SIZE = 1_000
RESCHEDULE_DELAY = 1.second RESCHEDULE_DELAY = 1.second
   
data_consistency :sticky data_consistency :sticky
Loading
@@ -18,24 +18,26 @@ def perform
Loading
@@ -18,24 +18,26 @@ def perform
reschedule_job = false reschedule_job = false
   
workers.each do |worker| workers.each do |worker|
limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)&.call limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
queue_size = queue_size(worker) queue_size = queue_size(worker)
report_prometheus_metrics(worker, queue_size, limit) report_prometheus_metrics(worker, queue_size, limit)
   
next unless queue_size > 0 next unless queue_size > 0
next if limit < 0 # do not re-queue jobs if circuit-broken
   
reschedule_job = true reschedule_job = true
   
processing_limit = if limit current = current_concurrency(worker: worker)
current = current_concurrency(worker: worker) processing_limit = if limit > 0
limit - current limit - current
else else
DEFAULT_LIMIT BATCH_SIZE
end end
   
next unless processing_limit > 0 next unless processing_limit > 0
   
resume_processing!(worker, limit: processing_limit) resume_processing!(worker, limit: processing_limit)
cleanup_stale_trackers(worker)
end end
   
self.class.perform_in(RESCHEDULE_DELAY) if reschedule_job self.class.perform_in(RESCHEDULE_DELAY) if reschedule_job
Loading
@@ -44,6 +46,10 @@ def perform
Loading
@@ -44,6 +46,10 @@ def perform
private private
   
def current_concurrency(worker:) def current_concurrency(worker:)
if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request)
return concurrent_worker_count(worker)
end
@current_concurrency ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency.workers( @current_concurrency ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency.workers(
skip_cache: true skip_cache: true
) )
Loading
@@ -51,10 +57,18 @@ def current_concurrency(worker:)
Loading
@@ -51,10 +57,18 @@ def current_concurrency(worker:)
@current_concurrency[worker.name].to_i @current_concurrency[worker.name].to_i
end end
   
def concurrent_worker_count(worker)
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.concurrent_worker_count(worker.name)
end
def queue_size(worker) def queue_size(worker)
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.queue_size(worker.name) Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.queue_size(worker.name)
end end
   
def cleanup_stale_trackers(worker)
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.cleanup_stale_trackers(worker.name)
end
def resume_processing!(worker, limit:) def resume_processing!(worker, limit:)
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.resume_processing!(worker.name, limit: limit) Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.resume_processing!(worker.name, limit: limit)
end end
Loading
@@ -73,7 +87,7 @@ def report_prometheus_metrics(worker, queue_size, limit)
Loading
@@ -73,7 +87,7 @@ def report_prometheus_metrics(worker, queue_size, limit)
limit_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_max_concurrent_jobs, limit_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_max_concurrent_jobs,
'Max number of concurrent running jobs.', 'Max number of concurrent running jobs.',
{}) {})
limit_metric.set({ worker: worker.name }, limit || DEFAULT_LIMIT) limit_metric.set({ worker: worker.name }, limit || BATCH_SIZE)
end end
end end
end end
---
name: sidekiq_concurrency_limit_optimized_count
feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/490936
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/169561
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/496335
milestone: '17.6'
group: group::scalability
type: gitlab_com_derisk
default_enabled: false
Loading
@@ -12,28 +12,16 @@
Loading
@@ -12,28 +12,16 @@
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:resume_processing!) allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:resume_processing!)
end end
   
context 'when there are no jobs in the queue' do shared_examples 'report prometheus metrics' do |limit = described_class::BATCH_SIZE, queue_size = 100|
before do it do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:has_jobs_in_queue?)
.and_return(0)
end
it 'does nothing' do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.not_to receive(:resume_processing!)
worker.perform
end
it 'reports prometheus metrics' do
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 30)
queue_size_gauge_double = instance_double(Prometheus::Client::Gauge) queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once) expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max) .with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.and_return(queue_size_gauge_double) .and_return(queue_size_gauge_double)
   
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, 0) allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 0) expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name },
queue_size)
   
limit_gauge_double = instance_double(Prometheus::Client::Gauge) limit_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once) expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
Loading
@@ -41,104 +29,123 @@
Loading
@@ -41,104 +29,123 @@
.and_return(limit_gauge_double) .and_return(limit_gauge_double)
   
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything) allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything)
expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 30) expect(limit_gauge_double).to receive(:set)
.with({ worker: worker_with_concurrency_limit.name }, limit)
worker.perform
end
end
shared_examples 'no jobs in the queue' do
before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
.and_return(10)
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
.and_return(0)
end
it 'does nothing' do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.not_to receive(:resume_processing!)
   
worker.perform worker.perform
end end
it_behaves_like 'report prometheus metrics', 10, 0
end end
   
context 'when there are jobs in the queue' do shared_examples 'jobs in the queue' do
before do before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size) allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
.and_return(100) .and_return(0)
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:queue_size)
.with(worker_with_concurrency_limit.name).and_return(100)
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
end end
   
it 'resumes processing' do it 'resumes processing' do
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35) stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35)
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:resume_processing!) .to receive(:resume_processing!)
.with(worker_with_concurrency_limit.name, limit: 35) .with(worker_with_concurrency_limit.name, limit: 35 - concurrent_workers)
   
worker.perform worker.perform
end end
   
it 'resumes processing if there are other jobs' do it 'resumes processing if there are other jobs' do
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60) stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
.and_return(worker_with_concurrency_limit.name => 15)
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:resume_processing!) .to receive(:resume_processing!)
.with(worker_with_concurrency_limit.name, limit: 45) .with(worker_with_concurrency_limit.name, limit: 60 - concurrent_workers)
   
worker.perform worker.perform
end end
   
it 'reports prometheus metrics' do it_behaves_like 'report prometheus metrics', 60
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 60)
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
.and_return(worker_with_concurrency_limit.name => 15)
queue_size_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once)
.with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.and_return(queue_size_gauge_double)
   
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything) context 'when limit is negative' do
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100) before do
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
.with(worker: worker_with_concurrency_limit)
.and_return(-1)
end
   
limit_gauge_double = instance_double(Prometheus::Client::Gauge) it 'does not schedule any workers' do
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once) expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {}) .not_to receive(:resume_processing!)
.and_return(limit_gauge_double) expect(described_class).not_to receive(:perform_in)
   
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything) worker.perform
expect(limit_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 60) end
   
worker.perform it_behaves_like 'report prometheus metrics', -1
end end
   
context 'when limit is not set' do context 'when limit is not set' do
before do before do
allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for) allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
nil_proc = -> { nil }
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for) allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for)
.with(worker: worker_with_concurrency_limit) .with(worker: worker_with_concurrency_limit)
.and_return(nil_proc) .and_return(0)
end end
   
it 'resumes processing using the DEFAULT_LIMIT' do it 'resumes processing using the BATCH_SIZE' do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService) expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:resume_processing!) .to receive(:resume_processing!)
.with(worker_with_concurrency_limit.name, limit: described_class::DEFAULT_LIMIT) .with(worker_with_concurrency_limit.name, limit: described_class::BATCH_SIZE)
expect(described_class).to receive(:perform_in) expect(described_class).to receive(:perform_in)
   
worker.perform worker.perform
end end
   
it 'reports limit as DEFAULT_LIMIT' do it_behaves_like 'report prometheus metrics', 0
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers) end
.and_return(worker_with_concurrency_limit.name => 15) end
   
queue_size_gauge_double = instance_double(Prometheus::Client::Gauge) context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once) let(:concurrent_workers) { 6 }
.with(:sidekiq_concurrency_limit_queue_jobs, anything, {}, :max)
.and_return(queue_size_gauge_double)
   
allow(queue_size_gauge_double).to receive(:set).with({ worker: anything }, anything) before do
expect(queue_size_gauge_double).to receive(:set).with({ worker: worker_with_concurrency_limit.name }, 100) stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersConcurrency).to receive(:workers)
.and_return(worker_with_concurrency_limit.name => concurrent_workers)
end
   
limit_gauge_double = instance_double(Prometheus::Client::Gauge) it_behaves_like 'no jobs in the queue'
expect(Gitlab::Metrics).to receive(:gauge).at_least(:once) it_behaves_like 'jobs in the queue'
.with(:sidekiq_concurrency_limit_max_concurrent_jobs, anything, {}) end
.and_return(limit_gauge_double)
   
allow(limit_gauge_double).to receive(:set).with({ worker: anything }, anything) context 'when sidekiq_concurrency_limit_optimized_count feature flag is enabled' do
expect(limit_gauge_double).to receive(:set) let(:concurrent_workers) { 5 }
.with({ worker: worker_with_concurrency_limit.name }, described_class::DEFAULT_LIMIT)
   
worker.perform before do
end allow(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:concurrent_worker_count).and_return(concurrent_workers)
end end
it_behaves_like 'no jobs in the queue'
it_behaves_like 'jobs in the queue'
end end
end end
end end
Loading
@@ -256,7 +256,7 @@
Loading
@@ -256,7 +256,7 @@
it 'registers worker to limit concurrency' do it 'registers worker to limit concurrency' do
stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35) stub_application_setting(elasticsearch_max_code_indexing_concurrency: 35)
   
max_jobs = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: described_class).call max_jobs = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: described_class)
expect(max_jobs).to eq(35) expect(max_jobs).to eq(35)
end end
end end
Loading
@@ -110,9 +110,7 @@ def log_job_done(job, started_time, payload, job_exception = nil)
Loading
@@ -110,9 +110,7 @@ def log_job_done(job, started_time, payload, job_exception = nil)
end end
   
def add_thread_identity(payload) def add_thread_identity(payload)
# Similar to what Sidekiq does ith it's out-of-the-box logger: payload['sidekiq_tid'] = Gitlab::SidekiqProcess.tid
# https://github.com/sidekiq/sidekiq/blob/2451d70080db95cb5f69effcbd74381cf3b3f727/lib/sidekiq/logger.rb#L80
payload['sidekiq_tid'] = (Thread.current.object_id ^ ::Process.pid).to_s(36)
payload['sidekiq_thread_name'] = Thread.current.name if Thread.current.name payload['sidekiq_thread_name'] = Thread.current.name if Thread.current.name
end end
   
Loading
Loading
Loading
@@ -4,11 +4,17 @@ module Gitlab
Loading
@@ -4,11 +4,17 @@ module Gitlab
module SidekiqMiddleware module SidekiqMiddleware
module ConcurrencyLimit module ConcurrencyLimit
class ConcurrencyLimitService class ConcurrencyLimitService
# Class for managing queues for deferred workers REDIS_KEY_PREFIX = 'sidekiq:concurrency_limit'
delegate :add_to_queue!, :queue_size, :has_jobs_in_queue?, :resume_processing!, to: :@queue_manager
delegate :track_execution_start, :track_execution_end, :cleanup_stale_trackers,
:concurrent_worker_count, to: :@worker_execution_tracker
   
def initialize(worker_name) def initialize(worker_name)
@worker_name = worker_name @worker_name = worker_name
@redis_key = "sidekiq:concurrency_limit:throttled_jobs:{#{worker_name.underscore}}" @queue_manager = QueueManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
@worker_execution_tracker = WorkerExecutionTracker.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
end end
   
class << self class << self
Loading
@@ -27,84 +33,22 @@ def resume_processing!(worker_name, limit:)
Loading
@@ -27,84 +33,22 @@ def resume_processing!(worker_name, limit:)
def queue_size(worker_name) def queue_size(worker_name)
new(worker_name).queue_size new(worker_name).queue_size
end end
end
   
def add_to_queue!(args, context) def cleanup_stale_trackers(worker_name)
with_redis do |redis| new(worker_name).cleanup_stale_trackers
redis.rpush(redis_key, serialize(args, context))
end end
   
deferred_job_counter.increment({ worker: worker_name }) def track_execution_start(worker_name)
end new(worker_name).track_execution_start
def queue_size
with_redis { |redis| redis.llen(redis_key) }
end
def has_jobs_in_queue?
queue_size != 0
end
def resume_processing!(limit:)
with_redis do |redis|
jobs = next_batch_from_queue(redis, limit: limit)
break if jobs.empty?
jobs.each { |j| send_to_processing_queue(deserialize(j)) }
remove_processed_jobs(redis, limit: jobs.length)
jobs.length
end end
end
private
attr_reader :worker_name, :redis_key
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
end
def serialize(args, context)
{
args: args,
context: context
}.to_json
end
def deserialize(json)
Gitlab::Json.parse(json)
end
def send_to_processing_queue(job)
context = job['context'] || {}
Gitlab::ApplicationContext.with_raw_context(context) do
args = job['args']
Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.resumed_log(worker_name, args)
   
worker_klass = worker_name.safe_constantize def track_execution_end(worker_name)
next if worker_klass.nil? new(worker_name).track_execution_end
worker_klass.concurrency_limit_resume.perform_async(*args)
end end
end
def next_batch_from_queue(redis, limit:)
return [] unless limit > 0
redis.lrange(redis_key, 0, limit - 1)
end
def remove_processed_jobs(redis, limit:)
redis.ltrim(redis_key, limit, -1)
end
   
def deferred_job_counter def concurrent_worker_count(worker_name)
@deferred_job_count ||= ::Gitlab::Metrics.counter(:sidekiq_concurrency_limit_deferred_jobs_total, new(worker_name).concurrent_worker_count
'Count of jobs deferred by the concurrency limit middleware.') end
end end
end end
end end
Loading
Loading
Loading
@@ -7,6 +7,9 @@ class Middleware
Loading
@@ -7,6 +7,9 @@ class Middleware
def initialize(worker, job) def initialize(worker, job)
@worker = worker @worker = worker
@job = job @job = job
worker_class = worker.is_a?(Class) ? worker : worker.class
@worker_class = worker_class.name
end end
   
# This will continue the middleware chain if the job should be scheduled # This will continue the middleware chain if the job should be scheduled
Loading
@@ -29,17 +32,21 @@ def perform
Loading
@@ -29,17 +32,21 @@ def perform
return return
end end
   
track_execution_start
yield yield
ensure
track_execution_end
end end
   
private private
   
attr_reader :job, :worker attr_reader :job, :worker, :worker_class
   
def should_defer_schedule? def should_defer_schedule?
return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops) return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
return false if resumed? return false if resumed?
return false unless ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker) return false if worker_limit == 0
   
has_jobs_in_queue? has_jobs_in_queue?
end end
Loading
@@ -57,13 +64,30 @@ def concurrency_service
Loading
@@ -57,13 +64,30 @@ def concurrency_service
::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService
end end
   
def track_execution_start
return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
return unless worker_limit > 0
concurrency_service.track_execution_start(worker_class)
end
def track_execution_end
return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
return unless worker_limit > 0
concurrency_service.track_execution_end(worker_class)
end
def worker_limit
@worker_limit ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
end
def resumed? def resumed?
job['concurrency_limit_resume'] == true job['concurrency_limit_resume'] == true
end end
   
def has_jobs_in_queue? def has_jobs_in_queue?
worker_class = worker.is_a?(Class) ? worker : worker.class concurrency_service.has_jobs_in_queue?(worker_class)
concurrency_service.has_jobs_in_queue?(worker_class.name)
end end
   
def defer_job! def defer_job!
Loading
Loading
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module ConcurrencyLimit
class QueueManager
attr_reader :redis_key
def initialize(worker_name:, prefix:)
@worker_name = worker_name
@redis_key = "#{prefix}:throttled_jobs:{#{worker_name.underscore}}"
end
def add_to_queue!(args, context)
with_redis do |redis|
redis.rpush(@redis_key, serialize(args, context))
end
deferred_job_counter.increment({ worker: @worker_name })
end
def queue_size
with_redis { |redis| redis.llen(@redis_key) }
end
def has_jobs_in_queue?
queue_size != 0
end
def resume_processing!(limit:)
with_redis do |redis|
jobs = next_batch_from_queue(redis, limit: limit)
break if jobs.empty?
jobs.each { |job| send_to_processing_queue(deserialize(job)) }
remove_processed_jobs(redis, limit: jobs.length)
jobs.length
end
end
private
def with_redis(&)
Gitlab::Redis::SharedState.with(&) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
end
def serialize(args, context)
{ args: args, context: context }.to_json
end
def deserialize(json)
Gitlab::Json.parse(json)
end
def send_to_processing_queue(job)
context = job['context'] || {}
Gitlab::ApplicationContext.with_raw_context(context) do
args = job['args']
Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.resumed_log(@worker_name, args)
worker_klass = @worker_name.safe_constantize
next if worker_klass.nil?
worker_klass.concurrency_limit_resume.perform_async(*args)
end
end
def next_batch_from_queue(redis, limit:)
return [] unless limit > 0
redis.lrange(@redis_key, 0, limit - 1)
end
def remove_processed_jobs(redis, limit:)
redis.ltrim(@redis_key, limit, -1)
end
def deferred_job_counter
@deferred_job_counter ||= ::Gitlab::Metrics.counter(:sidekiq_concurrency_limit_deferred_jobs_total,
'Count of jobs deferred by the concurrency limit middleware.')
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module ConcurrencyLimit
class WorkerExecutionTracker
TRACKING_KEY_TTL = 600.seconds
def initialize(worker_name:, prefix:)
@worker_name = worker_name
@prefix = prefix
end
def track_execution_start
return if sidekiq_pid.nil?
process_thread_id = process_thread_id_key(sidekiq_pid, sidekiq_tid)
with_redis do |r|
r.hset(worker_executing_hash_key, process_thread_id, Time.now.utc.tv_sec)
end
end
def track_execution_end
return if sidekiq_pid.nil?
process_thread_id = process_thread_id_key(sidekiq_pid, sidekiq_tid)
with_redis do |r|
r.hdel(worker_executing_hash_key, process_thread_id)
end
end
def cleanup_stale_trackers
executing_threads_hash = with_redis { |r| r.hgetall(worker_executing_hash_key) }
return if executing_threads_hash.empty?
dangling = executing_threads_hash.filter { |k, v| !still_executing?(k, v) }
return if dangling.empty?
with_redis do |r|
r.hdel(worker_executing_hash_key, dangling)
end
end
def concurrent_worker_count
with_redis { |r| r.hlen(worker_executing_hash_key).to_i }
end
private
attr_reader :worker_name
def with_redis(&)
Redis::QueuesMetadata.with(&) # rubocop:disable CodeReuse/ActiveRecord -- Not active record
end
def worker_executing_hash_key
"#{@prefix}:{#{worker_name.underscore}}:executing"
end
def process_thread_id_key(pid, tid)
"#{pid}:tid:#{tid}"
end
def sidekiq_pid
Gitlab::SidekiqProcess.pid
end
def sidekiq_tid
Gitlab::SidekiqProcess.tid
end
def still_executing?(ptid, started_at)
return true unless started_at.to_i < TRACKING_KEY_TTL.ago.utc.to_i
pid, tid = ptid.split(":tid:")
return false unless pid && tid
job_hash = fetch_sidekiq_process_work_hash(pid, tid)
return false if job_hash.empty?
job_hash['class'] == worker_name
end
def fetch_sidekiq_process_work_hash(pid, tid)
job_hash = {}
Gitlab::SidekiqSharding::Router.route(worker_name.safe_constantize) do
hash = Sidekiq.redis { |r| r.hget("#{pid}:work", tid) } # rubocop:disable Cop/SidekiqRedisCall -- checking process work hash
next if hash.nil?
# There are 2 layers of JSON encoding
# 1. when a job is pushed into the queue -- https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/client.rb#L261
# 2. When the workstate is written into the pid:work hash -- https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/launcher.rb#L148
job_hash = ::Gitlab::Json.parse(::Gitlab::Json.parse(hash)&.dig('payload'))
end
job_hash
rescue JSON::ParserError => e
Gitlab::ErrorTracking.track_exception(e, worker_class: worker_name)
{}
end
end
end
end
end
Loading
@@ -15,6 +15,11 @@ def current_for(worker:, skip_cache: false)
Loading
@@ -15,6 +15,11 @@ def current_for(worker:, skip_cache: false)
worker_class = worker.is_a?(Class) ? worker : worker.class worker_class = worker.is_a?(Class) ? worker : worker.class
worker_name = worker_class.name worker_name = worker_class.name
   
if ::Feature.enabled?(:sidekiq_concurrency_limit_optimized_count, Feature.current_request,
type: :gitlab_com_derisk)
return ConcurrencyLimitService.concurrent_worker_count(worker_name)
end
workers(skip_cache: skip_cache)[worker_name].to_i workers(skip_cache: skip_cache)[worker_name].to_i
end end
   
Loading
Loading
Loading
@@ -12,18 +12,20 @@ def set_limit_for(worker:, max_jobs:)
Loading
@@ -12,18 +12,20 @@ def set_limit_for(worker:, max_jobs:)
@data[worker] = max_jobs @data[worker] = max_jobs
end end
   
# Returns an integer value where:
# - positive value is returned to enforce a valid concurrency limit
# - 0 value is returned for workers without concurrency limits
# - negative value is returned for paused workers
def limit_for(worker:) def limit_for(worker:)
return unless data return 0 unless data
return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops) return 0 if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
   
worker_class = worker.is_a?(Class) ? worker : worker.class worker_class = worker.is_a?(Class) ? worker : worker.class
data[worker_class] data[worker_class]&.call.to_i
end end
   
def over_the_limit?(worker:) def over_the_limit?(worker:)
limit_proc = limit_for(worker: worker) limit = limit_for(worker: worker)
limit = limit_proc&.call.to_i
return false if limit == 0 return false if limit == 0
return true if limit < 0 return true if limit < 0
   
Loading
Loading
# frozen_string_literal: true
module Gitlab
module SidekiqProcess
class << self
def pid
# The sidekiq thread-local capsule is set in the Processor.
# https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/processor.rb#L70
Thread.current[:sidekiq_capsule]&.identity
end
def tid
Thread.current[:sidekiq_capsule]&.tid
end
end
end
end
Loading
@@ -7,7 +7,14 @@
Loading
@@ -7,7 +7,14 @@
# We disable a memory instrumentation feature # We disable a memory instrumentation feature
# as this requires a special patched Ruby # as this requires a special patched Ruby
allow(Gitlab::Memory::Instrumentation).to receive(:available?) { false } allow(Gitlab::Memory::Instrumentation).to receive(:available?) { false }
# We disable Thread.current.name there could be state leak from other specs.
allow(Thread.current).to receive(:name).and_return(nil) allow(Thread.current).to receive(:name).and_return(nil)
Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
end
after do
Thread.current[:sidekiq_capsule] = nil
end end
   
describe '#call', :request_store do describe '#call', :request_store do
Loading
Loading
Loading
@@ -2,7 +2,8 @@
Loading
@@ -2,7 +2,8 @@
   
require 'spec_helper' require 'spec_helper'
   
RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService, :clean_gitlab_redis_shared_state, feature_category: :global_search do RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService,
:clean_gitlab_redis_shared_state, :clean_gitlab_redis_queues_metadata, feature_category: :global_search do
let(:worker_class) do let(:worker_class) do
Class.new do Class.new do
def self.name def self.name
Loading
@@ -90,78 +91,51 @@ def self.name
Loading
@@ -90,78 +91,51 @@ def self.name
end end
end end
   
describe '#add_to_queue!' do describe '.track_execution_start' do
subject(:add_to_queue!) { service.add_to_queue!(worker_args, worker_context) } subject(:track_execution_start) { described_class.track_execution_start(worker_class_name) }
   
it 'adds a job to the set' do it 'calls an instance method' do
expect { add_to_queue! } expect_next_instance_of(described_class) do |instance|
.to change { service.queue_size } expect(instance).to receive(:track_execution_start)
.from(0).to(1) end
end
   
it 'adds only one unique job to the set' do track_execution_start
expect do
2.times { add_to_queue! }
end.to change { service.queue_size }.from(0).to(1)
end end
end
   
it 'stores context information' do describe '.track_execution_end' do
add_to_queue! subject(:track_execution_end) { described_class.track_execution_end(worker_class_name) }
service.send(:with_redis) do |r|
set_key = service.send(:redis_key)
stored_job = service.send(:deserialize, r.lrange(set_key, 0, -1).first)
   
expect(stored_job['context']).to eq(stored_context) it 'calls an instance method' do
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:track_execution_end)
end end
end
end
   
describe '#has_jobs_in_queue?' do track_execution_end
it 'uses queue_size' do
expect { service.add_to_queue!(worker_args, worker_context) }
.to change { service.has_jobs_in_queue? }
.from(false).to(true)
end end
end end
   
describe '#resume_processing!' do describe '.concurrent_worker_count' do
let(:jobs) { [[1], [2], [3]] } subject(:concurrent_worker_count) { described_class.concurrent_worker_count(worker_class_name) }
let(:setter) { instance_double('Sidekiq::Job::Setter') }
   
it 'puts jobs back into the queue and respects order' do it 'calls an instance method' do
jobs.each do |j| expect_next_instance_of(described_class) do |instance|
service.add_to_queue!(j, worker_context) expect(instance).to receive(:concurrent_worker_count)
end end
   
expect(worker_class).to receive(:concurrency_limit_resume).twice.and_return(setter) concurrent_worker_count
expect(setter).to receive(:perform_async).with(1).ordered
expect(setter).to receive(:perform_async).with(2).ordered
expect(setter).not_to receive(:perform_async).with(3).ordered
expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
.to receive(:resumed_log)
.with(worker_class_name, [1])
expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
.to receive(:resumed_log)
.with(worker_class_name, [2])
service.resume_processing!(limit: 2)
end end
end
   
it 'drops a set after execution' do describe '.cleanup_stale_trackers' do
jobs.each do |j| subject(:cleanup_stale_trackers) { described_class.cleanup_stale_trackers(worker_class_name) }
service.add_to_queue!(j, worker_context)
end
   
expect(Gitlab::ApplicationContext).to receive(:with_raw_context) it 'calls an instance method' do
.with(stored_context) expect_next_instance_of(described_class) do |instance|
.exactly(jobs.count).times.and_call_original expect(instance).to receive(:cleanup_stale_trackers)
expect(worker_class).to receive(:concurrency_limit_resume).exactly(3).times.and_return(setter) end
expect(setter).to receive(:perform_async).exactly(jobs.count).times
   
expect { service.resume_processing!(limit: jobs.count) } cleanup_stale_trackers
.to change { service.has_jobs_in_queue? }.from(true).to(false)
end end
end end
   
Loading
Loading
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::QueueManager,
:clean_gitlab_redis_shared_state, feature_category: :global_search do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
end
end
let(:worker_class_name) { worker_class.name }
let(:worker_context) do
{ 'correlation_id' => 'context_correlation_id',
'meta.project' => 'gitlab-org/gitlab' }
end
let(:stored_context) do
{
"#{Gitlab::ApplicationContext::LOG_KEY}.project" => 'gitlab-org/gitlab',
"correlation_id" => 'context_correlation_id'
}
end
let(:worker_args) { [1, 2] }
subject(:service) { described_class.new(worker_name: worker_class_name, prefix: 'some_prefix') }
before do
stub_const(worker_class_name, worker_class)
end
describe '#add_to_queue!' do
subject(:add_to_queue!) { service.add_to_queue!(worker_args, worker_context) }
it 'adds a job to the set' do
expect { add_to_queue! }
.to change { service.queue_size }
.from(0).to(1)
end
it 'adds only one unique job to the set' do
expect do
2.times { add_to_queue! }
end.to change { service.queue_size }.from(0).to(1)
end
it 'stores context information' do
add_to_queue!
Gitlab::Redis::SharedState.with do |r|
set_key = service.redis_key
stored_job = service.send(:deserialize, r.lrange(set_key, 0, -1).first)
expect(stored_job['context']).to eq(stored_context)
end
end
end
describe '#has_jobs_in_queue?' do
it 'uses queue_size' do
expect { service.add_to_queue!(worker_args, worker_context) }
.to change { service.has_jobs_in_queue? }
.from(false).to(true)
end
end
describe '#resume_processing!' do
let(:jobs) { [[1], [2], [3]] }
let(:setter) { instance_double('Sidekiq::Job::Setter') }
it 'puts jobs back into the queue and respects order' do
jobs.each do |j|
service.add_to_queue!(j, worker_context)
end
expect(worker_class).to receive(:concurrency_limit_resume).twice.and_return(setter)
expect(setter).to receive(:perform_async).with(1).ordered
expect(setter).to receive(:perform_async).with(2).ordered
expect(setter).not_to receive(:perform_async).with(3).ordered
expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
.to receive(:resumed_log)
.with(worker_class_name, [1])
expect(Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance)
.to receive(:resumed_log)
.with(worker_class_name, [2])
service.resume_processing!(limit: 2)
end
it 'drops a set after execution' do
jobs.each do |j|
service.add_to_queue!(j, worker_context)
end
expect(Gitlab::ApplicationContext).to receive(:with_raw_context)
.with(stored_context)
.exactly(jobs.count).times.and_call_original
expect(worker_class).to receive(:concurrency_limit_resume).exactly(3).times.and_return(setter)
expect(setter).to receive(:perform_async).exactly(jobs.count).times
expect { service.resume_processing!(limit: jobs.count) }
.to change { service.has_jobs_in_queue? }.from(true).to(false)
end
end
end
Loading
@@ -22,9 +22,14 @@ def self.work; end
Loading
@@ -22,9 +22,14 @@ def self.work; end
end end
   
before do before do
Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
stub_const('TestConcurrencyLimitWorker', worker_class) stub_const('TestConcurrencyLimitWorker', worker_class)
end end
   
after do
Thread.current[:sidekiq_capsule] = nil
end
around do |example| around do |example|
with_sidekiq_server_middleware do |chain| with_sidekiq_server_middleware do |chain|
chain.add described_class chain.add described_class
Loading
@@ -32,6 +37,17 @@ def self.work; end
Loading
@@ -32,6 +37,17 @@ def self.work; end
end end
end end
   
shared_examples 'skip execution tracking' do
it do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.not_to receive(:track_execution_start)
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.not_to receive(:track_execution_end)
TestConcurrencyLimitWorker.perform_async('foo')
end
end
describe '#call' do describe '#call' do
context 'when feature flag is disabled' do context 'when feature flag is disabled' do
before do before do
Loading
@@ -45,6 +61,8 @@ def self.work; end
Loading
@@ -45,6 +61,8 @@ def self.work; end
   
TestConcurrencyLimitWorker.perform_async('foo') TestConcurrencyLimitWorker.perform_async('foo')
end end
it_behaves_like 'skip execution tracking'
end end
   
context 'when there are jobs in the queue' do context 'when there are jobs in the queue' do
Loading
@@ -116,6 +134,30 @@ def self.work; end
Loading
@@ -116,6 +134,30 @@ def self.work; end
   
TestConcurrencyLimitWorker.perform_async('foo') TestConcurrencyLimitWorker.perform_async('foo')
end end
it 'tracks execution concurrency' do
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService)
.to receive(:track_execution_start)
expect(Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService).to receive(:track_execution_end)
TestConcurrencyLimitWorker.perform_async('foo')
end
context 'when limit is set to zero' do
before do
allow(::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap).to receive(:limit_for).and_return(0)
end
it_behaves_like 'skip execution tracking'
end
context 'when limit is not defined' do
before do
::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.remove_instance_variable(:@data)
end
it_behaves_like 'skip execution tracking'
end
end end
   
context 'when over the limit' do context 'when over the limit' do
Loading
Loading
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkerExecutionTracker,
:clean_gitlab_redis_queues_metadata, feature_category: :global_search do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
end
end
let(:worker_class_name) { worker_class.name }
let(:redis_key_prefix) { 'random_prefix' }
let(:sidekiq_pid) { 'proc-abc' }
let(:sidekiq_tid) { 'proc-abc' }
let(:tracking_hash) { "#{redis_key_prefix}:{#{worker_class_name.underscore}}:executing" }
let(:tracking_elem) { "#{sidekiq_pid}:tid:#{sidekiq_tid}" }
subject(:service) { described_class.new(worker_name: worker_class_name, prefix: redis_key_prefix) }
before do
stub_const(worker_class_name, worker_class)
Thread.current[:sidekiq_capsule] = Sidekiq::Capsule.new('test', Sidekiq.default_configuration)
allow(Thread.current[:sidekiq_capsule]).to receive(:identity).and_return(sidekiq_pid)
allow(service).to receive(:sidekiq_tid).and_return(sidekiq_tid)
end
describe '#track_execution_start' do
subject(:track_execution_start) { service.track_execution_start }
it 'writes to Redis hash and string' do
track_execution_start
Gitlab::Redis::QueuesMetadata.with do |c|
expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
end
end
context 'when Thread.current[:sidekiq_capsule] is missing' do
before do
Thread.current[:sidekiq_capsule] = nil
end
it 'exits early without writing to redis' do
track_execution_start
Gitlab::Redis::QueuesMetadata.with do |c|
expect(c.hexists(tracking_hash, tracking_elem)).to eq(false)
end
end
end
end
describe '#track_execution_end' do
subject(:track_execution_end) { service.track_execution_end }
before do
service.track_execution_start
end
it 'clears to Redis hash and string' do
Gitlab::Redis::QueuesMetadata.with do |c|
expect { track_execution_end }
.to change { c.hexists(tracking_hash, tracking_elem) }.from(true).to(false)
end
end
context 'when Thread.current[:sidekiq_capsule] is missing' do
before do
Thread.current[:sidekiq_capsule] = nil
end
it 'exits early without writing to redis' do
Gitlab::Redis::QueuesMetadata.with do |c|
expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
track_execution_end
expect(c.hexists(tracking_hash, tracking_elem)).to eq(true)
end
end
end
end
describe '#concurrent_worker_count' do
let(:size) { 10 }
subject(:concurrent_worker_count) { service.concurrent_worker_count }
before do
Gitlab::Redis::QueuesMetadata.with do |c|
c.hset(tracking_hash, (1..size).flat_map { |i| [i, i] })
end
end
it 'returns hash size' do
expect(concurrent_worker_count).to eq(size)
end
context 'with empty hash' do
before do
Gitlab::Redis::QueuesMetadata.with { |c| c.del(tracking_hash) }
end
it 'returns 0' do
expect(concurrent_worker_count).to eq(0)
end
end
end
describe '#cleanup_stale_trackers' do
let(:dangling_tid) { 4567 }
let(:long_running_tid) { 5678 }
let(:invalid_process_thread_id) { 'proc-abc::4567' }
let(:dangling_process_thread_id) { 'proc-abc:tid:4567' }
let(:long_running_process_thread_id) { 'proc-abc:tid:5678' }
# Format from https://github.com/sidekiq/sidekiq/blob/v7.2.4/lib/sidekiq/api.rb#L1180
# The tid field in the `{pid}:work` hash contains a hash of 'payload' -> job hash.
def generate_sidekiq_hash(worker)
job_hash = { 'payload' => ::Gitlab::Json.dump({
'class' => worker,
'created_at' => Time.now.to_f - described_class::TRACKING_KEY_TTL
}) }
Sidekiq.dump_json(job_hash)
end
subject(:cleanup_stale_trackers) { service.cleanup_stale_trackers }
context 'when hash is valid' do
before do
Gitlab::Redis::QueuesMetadata.with do |r|
# element should not be deleted since it is within the ttl
r.hset(tracking_hash, tracking_elem, Time.now.utc.tv_sec - (0.1 * described_class::TRACKING_KEY_TTL.to_i))
# element should not be deleted since it is a long running process
r.hset(tracking_hash, long_running_process_thread_id,
Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
# element should be deleted since hash value is invalid
r.hset(tracking_hash, invalid_process_thread_id,
Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
# element should be deleted since it is a long running process
# but stale as the thread is executing another worker now
r.hset(tracking_hash, dangling_process_thread_id,
Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
end
Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
Sidekiq.redis do |r|
r.hset("proc-abc:work", long_running_tid, generate_sidekiq_hash(worker_class_name))
r.hset("proc-abc:work", dangling_process_thread_id, generate_sidekiq_hash('otherworker'))
end
end
end
it 'only cleans up dangling keys' do
expect { cleanup_stale_trackers }.to change { service.concurrent_worker_count }.from(4).to(2)
end
end
context 'when hash is invalid' do
let(:invalid_hash) { 'invalid' }
before do
Gitlab::Redis::QueuesMetadata.with do |r|
r.hset(tracking_hash, long_running_process_thread_id,
Time.now.utc.tv_sec - (2 * described_class::TRACKING_KEY_TTL.to_i))
end
Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
Sidekiq.redis do |r|
r.hset("proc-abc:work", long_running_tid, invalid_hash)
end
end
end
it 'tracks exception' do
expect(Gitlab::ErrorTracking).to receive(:track_exception).with(instance_of(JSON::ParserError),
worker_class: 'DummyWorker')
expect { cleanup_stale_trackers }.to change { service.concurrent_worker_count }.from(1).to(0)
end
end
end
end
Loading
@@ -30,9 +30,17 @@ def perform(*); end
Loading
@@ -30,9 +30,17 @@ def perform(*); end
] ]
end end
   
let(:concurrency_tracking_hash_content) do
(1..current_concurrency).flat_map { |i| [i, i] }
end
before do before do
stub_const('TestConcurrencyLimitWorker', worker_class) stub_const('TestConcurrencyLimitWorker', worker_class)
allow(described_class).to receive(:sidekiq_workers).and_return([sidekiq_worker] * current_concurrency) allow(described_class).to receive(:sidekiq_workers).and_return([sidekiq_worker] * current_concurrency)
Gitlab::Redis::QueuesMetadata.with do |c|
prefix = Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService::REDIS_KEY_PREFIX
c.hset("#{prefix}:{#{worker_class.name.underscore}}:executing", *concurrency_tracking_hash_content)
end
end end
   
describe '.current_for' do describe '.current_for' do
Loading
@@ -41,10 +49,21 @@ def perform(*); end
Loading
@@ -41,10 +49,21 @@ def perform(*); end
context 'without cache' do context 'without cache' do
let(:skip_cache) { true } let(:skip_cache) { true }
   
it 'returns the current concurrency', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/451677' do it 'looks up current concurrency from hash' do
expect(described_class).to receive(:workers_uncached).and_call_original expect(described_class).not_to receive(:workers_uncached)
expect(current_for).to eq(current_concurrency) expect(current_for).to eq(current_concurrency)
end end
context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
before do
stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
end
it 'returns the current concurrency', quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/451677' do
expect(described_class).to receive(:workers_uncached).and_call_original
expect(current_for).to eq(current_concurrency)
end
end
end end
   
context 'with cache', :clean_gitlab_redis_cache do context 'with cache', :clean_gitlab_redis_cache do
Loading
@@ -55,10 +74,21 @@ def perform(*); end
Loading
@@ -55,10 +74,21 @@ def perform(*); end
cache_setup!(tally: cached_value, lease: true) cache_setup!(tally: cached_value, lease: true)
end end
   
it 'returns cached current_for' do it 'looks up current concurrency from hash' do
expect(described_class).not_to receive(:workers_uncached) expect(described_class).not_to receive(:workers)
expect(current_for).to eq(current_concurrency)
end
   
expect(current_for).to eq(20) context 'when sidekiq_concurrency_limit_optimized_count feature flag is disabled' do
before do
stub_feature_flags(sidekiq_concurrency_limit_optimized_count: false)
end
it 'returns cached current_for' do
expect(described_class).not_to receive(:workers_uncached)
expect(current_for).to eq(20)
end
end end
end end
end end
Loading
Loading
Loading
@@ -27,21 +27,21 @@ def perform(*); end
Loading
@@ -27,21 +27,21 @@ def perform(*); end
let(:expected_limit) { 60 } let(:expected_limit) { 60 }
   
it 'accepts worker instance' do it 'accepts worker instance' do
expect(described_class.limit_for(worker: worker_class.new).call).to eq(expected_limit) expect(described_class.limit_for(worker: worker_class.new)).to eq(expected_limit)
end end
   
it 'accepts worker class' do it 'accepts worker class' do
expect(described_class.limit_for(worker: worker_class).call).to eq(expected_limit) expect(described_class.limit_for(worker: worker_class)).to eq(expected_limit)
end end
   
it 'returns nil for unknown worker' do it 'returns 0 for unknown worker' do
expect(described_class.limit_for(worker: described_class)).to be_nil expect(described_class.limit_for(worker: described_class)).to eq(0)
end end
   
it 'returns nil if the feature flag is disabled' do it 'returns 0 if the feature flag is disabled' do
stub_feature_flags(sidekiq_concurrency_limit_middleware: false) stub_feature_flags(sidekiq_concurrency_limit_middleware: false)
   
expect(described_class.limit_for(worker: worker_class)).to be_nil expect(described_class.limit_for(worker: worker_class)).to eq(0)
end end
end end
   
Loading
@@ -49,18 +49,14 @@ def perform(*); end
Loading
@@ -49,18 +49,14 @@ def perform(*); end
subject(:over_the_limit?) { described_class.over_the_limit?(worker: worker_class) } subject(:over_the_limit?) { described_class.over_the_limit?(worker: worker_class) }
   
where(:limit, :current, :result) do where(:limit, :current, :result) do
nil | 0 | false 0 | 0 | false
nil | 5 | false 0 | 10 | false
-> { nil } | 0 | false 5 | 10 | true
-> { nil } | 5 | false 10 | 0 | false
-> { 0 } | 0 | false 10 | 5 | false
-> { 0 } | 10 | false -1 | 0 | true
-> { 5 } | 10 | true -1 | 1 | true
-> { 10 } | 0 | false -10 | 10 | true
-> { 10 } | 5 | false
-> { -1 } | 0 | true
-> { -1 } | 1 | true
-> { -10 } | 10 | true
end end
   
with_them do with_them do
Loading
Loading
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqProcess, feature_category: :scalability do
let(:cap) { Sidekiq::Capsule.new('test', Sidekiq.default_configuration) }
before do
Thread.current[:sidekiq_capsule] = cap
end
after do
Thread.current[:sidekiq_capsule] = nil
end
describe '#tid' do
it 'matches sidekiq internals' do
expect(described_class.tid).to eq(cap.tid)
end
end
describe '#pid' do
it 'matches sidekiq internals' do
expect(described_class.pid).to eq(cap.identity)
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment