Skip to content
Snippets Groups Projects
Unverified Commit 49f28946 authored by Sylvester Chin's avatar Sylvester Chin Committed by GitLab
Browse files

Add helper class method in SidekiqSharding::Router for iterating

This allows devs to re-use the `.with_routed_client` method to iterate
over all shards when using Sidekiq API.
parent 41f2e3ef
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -83,16 +83,12 @@ def drain_non_geo_queues
 
def disable_non_geo_cron_jobs
# Apply Sidekiq Cron modification to all shards
# rubocop:disable Cop/RedisQueueUsage -- valid usage
Gitlab::Redis::Queues.instances.each_value do |inst|
Sidekiq::Client.via(inst.sidekiq_redis) do # scope all Sidekiq operations to use shard's redis pool
Sidekiq::Cron::Job.all.each(&:disable!) # rubocop:disable Rails/FindEach -- not an ActiveRecord::Relation
Gitlab::SidekiqSharding::Router.with_routed_client do
Sidekiq::Cron::Job.all.each(&:disable!) # rubocop:disable Rails/FindEach -- not an ActiveRecord::Relation
 
# Do not enable `geo_sidekiq_cron_config_worker`, due to https://gitlab.com/gitlab-org/gitlab/-/issues/37135
geo_primary_jobs.filter_map { |name| Sidekiq::Cron::Job.find(name) }.map(&:enable!)
end
# Do not enable `geo_sidekiq_cron_config_worker`, due to https://gitlab.com/gitlab-org/gitlab/-/issues/37135
geo_primary_jobs.filter_map { |name| Sidekiq::Cron::Job.find(name) }.map(&:enable!)
end
# rubocop:enable Cop/RedisQueueUsage
end
 
def wait_until_replicated_and_verified
Loading
Loading
@@ -181,22 +177,18 @@ def geo_primary_jobs
# true, poll its size until empty.
def poll_selected_queues_until_empty
# Watch Sidekiq Queues on all shards
# rubocop:disable Cop/RedisQueueUsage -- valid usage
Gitlab::Redis::Queues.instances.each_value do |inst|
Sidekiq::Client.via(inst.sidekiq_redis) do # scope all Sidekiq operations to use shard's redis pool
# rubocop:disable Cop/SidekiqApiUsage -- valid usage
selected_queues = Sidekiq::Queue.all.select { |queue| yield(queue) }
loop do
selected_queues = selected_queues.select { |queue| queue_has_jobs?(queue) }
break if selected_queues.empty?
sleep(1)
end
# rubocop:enable Cop/SidekiqApiUsage
Gitlab::SidekiqSharding::Router.with_routed_client do
# rubocop:disable Cop/SidekiqApiUsage -- valid usage
selected_queues = Sidekiq::Queue.all.select { |queue| yield(queue) }
loop do
selected_queues = selected_queues.select { |queue| queue_has_jobs?(queue) }
break if selected_queues.empty?
sleep(1)
end
# rubocop:enable Cop/SidekiqApiUsage
end
# rubocop:enable Cop/RedisQueueUsage
end
 
# It's possible to configure GitLab Sidekiq queues and their names. If there are no queues
Loading
Loading
@@ -206,17 +198,13 @@ def validate_geo_queues_exist!
any_geo_queues = false
 
# Watch Sidekiq Queues on all shards
# rubocop:disable Cop/RedisQueueUsage -- valid usage
Gitlab::Redis::Queues.instances.each_value do |inst|
Sidekiq::Client.via(inst.sidekiq_redis) do # scope all Sidekiq operations to use shard's redis pool
# rubocop:disable Cop/SidekiqApiUsage -- valid usage
any_geo_queues ||= Sidekiq::Queue.all.any? { |queue| geo_queue?(queue) }
# rubocop:enable Cop/SidekiqApiUsage
end
Gitlab::SidekiqSharding::Router.with_routed_client do
# rubocop:disable Cop/SidekiqApiUsage -- valid usage
any_geo_queues ||= Sidekiq::Queue.all.any? { |queue| geo_queue?(queue) }
# rubocop:enable Cop/SidekiqApiUsage
 
break if any_geo_queues
end
# rubocop:enable Cop/RedisQueueUsage
 
raise "No Geo queues detected. Unable to check if Geo or non-Geo jobs are drained" unless any_geo_queues
end
Loading
Loading
Loading
Loading
@@ -11,8 +11,8 @@ class SidekiqMetrics < ::API::Base
helpers do
def queue_metrics
hash = {}
Gitlab::Redis::Queues.instances.each_value do |v| # rubocop:disable Cop/RedisQueueUsage -- allow iteration over shard instances
queue_metrics_from_shard(v.sidekiq_redis).each do |queue_name, queue_details|
Gitlab::SidekiqSharding::Router.with_routed_client do
queue_metrics_from_shard.each do |queue_name, queue_details|
if hash[queue_name].nil?
hash[queue_name] = queue_details
else
Loading
Loading
@@ -24,38 +24,36 @@ def queue_metrics
hash
end
 
def queue_metrics_from_shard(pool)
Sidekiq::Client.via(pool) do
::Gitlab::SidekiqConfig.routing_queues.each_with_object({}) do |queue_name, hash|
queue = Sidekiq::Queue.new(queue_name)
hash[queue.name] = {
backlog: queue.size,
latency: queue.latency.to_i
}
end
def queue_metrics_from_shard
::Gitlab::SidekiqConfig.routing_queues.each_with_object({}) do |queue_name, hash|
queue = Sidekiq::Queue.new(queue_name)
hash[queue.name] = {
backlog: queue.size,
latency: queue.latency.to_i
}
end
end
 
def process_metrics
Gitlab::Redis::Queues.instances.values.flat_map do |v| # rubocop:disable Cop/RedisQueueUsage -- allow iteration over shard instances
process_metrics_from_shard(v.sidekiq_redis)
metrics = []
Gitlab::SidekiqSharding::Router.with_routed_client do
metrics << process_metrics_from_shard
end
metrics.flatten
end
 
def process_metrics_from_shard(pool)
Sidekiq::Client.via(pool) do
Sidekiq::ProcessSet.new(false).map do |process|
{
hostname: process['hostname'],
pid: process['pid'],
tag: process['tag'],
started_at: Time.at(process['started_at']),
queues: process['queues'],
labels: process['labels'],
concurrency: process['concurrency'],
busy: process['busy']
}
end
def process_metrics_from_shard
Sidekiq::ProcessSet.new(false).map do |process|
{
hostname: process['hostname'],
pid: process['pid'],
tag: process['tag'],
started_at: Time.at(process['started_at']),
queues: process['queues'],
labels: process['labels'],
concurrency: process['concurrency'],
busy: process['busy']
}
end
end
 
Loading
Loading
@@ -67,23 +65,21 @@ def job_stats
dead: 0
}
 
Gitlab::Redis::Queues.instances.each_value do |shard| # rubocop:disable Cop/RedisQueueUsage -- allow iteration over shard instances
job_stats_from_shard(shard.sidekiq_redis).each { |k, v| stats[k] += v }
Gitlab::SidekiqSharding::Router.with_routed_client do
job_stats_from_shard.each { |k, v| stats[k] += v }
end
 
stats
end
 
def job_stats_from_shard(pool)
Sidekiq::Client.via(pool) do
stats = Sidekiq::Stats.new
{
processed: stats.processed,
failed: stats.failed,
enqueued: stats.enqueued,
dead: stats.dead_size
}
end
def job_stats_from_shard
stats = Sidekiq::Stats.new
{
processed: stats.processed,
failed: stats.failed,
enqueued: stats.enqueued,
dead: stats.dead_size
}
end
end
 
Loading
Loading
Loading
Loading
@@ -54,11 +54,14 @@ def update_workers_cache(redis)
end
 
def workers_uncached
Gitlab::Redis::Queues.instances.values.flat_map do |instance| # rubocop:disable Cop/RedisQueueUsage -- iterating over instances is allowed as we pass the pool to Sidekiq
Sidekiq::Client.via(instance.sidekiq_redis) do
sidekiq_workers.map { |_process_id, _thread_id, work| ::Gitlab::Json.parse(work.payload)['class'] }
hash = []
Gitlab::SidekiqSharding::Router.with_routed_client do
workers = sidekiq_workers.map do |_process_id, _thread_id, work|
::Gitlab::Json.parse(work.payload)['class']
end
end.tally
hash.concat(workers)
end
hash.tally
end
 
def sidekiq_workers
Loading
Loading
Loading
Loading
@@ -39,6 +39,14 @@ def route(klass)
end
end
 
def with_routed_client
Gitlab::Redis::Queues.instances.each_value do |inst|
Sidekiq::Client.via(inst.sidekiq_redis) do
yield
end
end
end
def migrated_shards
@migrated_shards ||= Set.new(Gitlab::Json.parse(ENV.fetch('SIDEKIQ_MIGRATED_SHARDS', '[]')))
end
Loading
Loading
Loading
Loading
@@ -2,7 +2,7 @@
 
require 'spec_helper'
 
RSpec.describe API::SidekiqMetrics, :aggregate_failures, feature_category: :shared do
RSpec.describe API::SidekiqMetrics, :clean_gitlab_redis_queues, :aggregate_failures, feature_category: :shared do
let(:instance_count) { 1 }
let(:admin) { create(:user, :admin) }
 
Loading
Loading
@@ -15,6 +15,19 @@
end
 
shared_examples 'GET sidekiq metrics' do
before do
# ProcessSet looks up running processes in Redis.
# To ensure test coverage, stub some data so it actually performs some iteration.
Gitlab::SidekiqSharding::Validator.allow_unrouted_sidekiq_calls do
Sidekiq.redis do |r|
r.sadd('processes', 'teststub')
r.hset('teststub',
['info', Sidekiq.dump_json({ started_at: Time.now.to_i }), "busy", 1, "quiet", 1, "rss", 1, "rtt_us", 1]
)
end
end
end
it 'defines the `queue_metrics` endpoint' do
expect(Gitlab::SidekiqConfig).to receive(:routing_queues).exactly(instance_count).times.and_call_original
get api('/sidekiq/queue_metrics', admin, admin_mode: true)
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment