Skip to content
Snippets Groups Projects
Commit 9b81c32a authored by Valery Sizov's avatar Valery Sizov
Browse files

Regularly reverify Packages on primary

Updates verification state records using batches

We need to regularly re-verify data on the primary
to protect against data corruption. In combination with
https://gitlab.com/gitlab-org/gitlab-ee/issues/13842,
this will ensure that packages are regularly re-verified
on all secondary nodes as well.
parent 6e42bbdd
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -7,11 +7,19 @@ module VerifiableReplicator
include Delay
 
DEFAULT_VERIFICATION_BATCH_SIZE = 10
DEFAULT_REVERIFICATION_BATCH_SIZE = 1000
 
class_methods do
extend Gitlab::Utils::Override
 
delegate :verification_pending_batch, :verification_failed_batch, :needs_verification_count, :fail_verification_timeouts, to: :verification_query_class
delegate :verification_pending_batch,
:verification_failed_batch,
:needs_verification_count,
:needs_reverification_count,
:fail_verification_timeouts,
:reverifiable_batch,
:reverify_batch,
to: :verification_query_class
 
# If replication is disabled, then so is verification.
override :verification_enabled?
Loading
Loading
@@ -31,6 +39,8 @@ def trigger_background_verification
::Geo::VerificationBatchWorker.perform_with_capacity(replicable_name)
 
::Geo::VerificationTimeoutWorker.perform_async(replicable_name)
::Geo::ReverificationBatchWorker.perform_async(replicable_name)
end
 
# Called by VerificationBatchWorker.
Loading
Loading
@@ -54,6 +64,18 @@ def remaining_verification_batch_count(max_batch_count:)
.ceil
end
 
# Called by ReverificationBatchWorker.
#
# - Asks the DB how many things still need to be reverified (with a limit)
# - Converts that to a number of batches
#
# @return [Integer] number of batches of reverification work remaining, up to the given maximum
def remaining_reverification_batch_count(max_batch_count:)
needs_reverification_count(limit: max_batch_count * reverification_batch_size)
.fdiv(reverification_batch_size)
.ceil
end
# @return [Array<Gitlab::Geo::Replicator>] batch of replicators which need to be verified
def replicator_batch_to_verify
model_record_id_batch_to_verify.map do |id|
Loading
Loading
@@ -74,6 +96,11 @@ def model_record_id_batch_to_verify
ids
end
 
# @return [Integer] number of records set to be re-verified
def reverify_batch!
reverify_batch(batch_size: reverification_batch_size)
end
# If primary, query the model table.
# If secondary, query the registry table.
def verification_query_class
Loading
Loading
@@ -85,6 +112,11 @@ def verification_batch_size
DEFAULT_VERIFICATION_BATCH_SIZE
end
 
# @return [Integer] number of records to reverify per batch job
def reverification_batch_size
DEFAULT_REVERIFICATION_BATCH_SIZE
end
def checksummed_count
# When verification is disabled, this returns nil.
# Bonus: This causes the progress bar to be hidden.
Loading
Loading
Loading
Loading
@@ -515,6 +515,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: geo:geo_reverification_batch
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_scheduler_primary_scheduler
:feature_category: :geo_replication
:has_external_dependencies:
Loading
Loading
# frozen_string_literal: true
module Geo
class ReverificationBatchWorker
include ApplicationWorker
include GeoQueue
include LimitedCapacity::Worker
include ::Gitlab::Geo::LogHelpers
MAX_RUNNING_JOBS = 1
idempotent!
loggable_arguments 0
def perform_work(replicable_name)
replicator_class = replicator_class_for(replicable_name)
replicator_class.reverify_batch!
end
def remaining_work_count(replicable_name)
replicator_class = replicator_class_for(replicable_name)
@remaining_work_count ||= replicator_class
.remaining_reverification_batch_count(max_batch_count: remaining_capacity)
end
def max_running_jobs
MAX_RUNNING_JOBS
end
def replicator_class_for(replicable_name)
@replicator_class ||= ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
end
end
end
---
title: Regularly reverify Packages on primary
merge_request: 53470
author:
type: changed
Loading
Loading
@@ -37,6 +37,7 @@ module VerificationState
scope :verification_timed_out, -> { verification_started.where("verification_started_at < ?", VERIFICATION_TIMEOUT.ago) }
scope :retry_due, -> { where(arel_table[:verification_retry_at].eq(nil).or(arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).retry_due) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
# rubocop:enable CodeReuse/ActiveRecord
 
state_machine :verification_state, initial: :verification_pending do
Loading
Loading
@@ -142,6 +143,16 @@ def needs_verification_relation
needs_verification
end
 
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification_relation.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# Overridden by Geo::VerifiableRegistry
def needs_reverification_relation
needs_reverification
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
Loading
Loading
@@ -213,6 +224,61 @@ def fail_verification_timeouts
relation.update_all(attrs)
end
end
# Returns IDs of records that needs to be reverified.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
reverificate_batch_for_relation(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
# rubocop:disable CodeReuse/ActiveRecord
def reverification_batch_relation(batch_size:)
verification_succeeded
.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago)
.order(:verified_at)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def reverificate_batch_for_relation(relation)
query = reverification_batch_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def reverification_batch_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{table_name}
SET "verification_state" = #{pending_enum_value},
"verification_retry_count" = 0,
"verification_retry_at" = NULL,
"verification_failure" = NULL
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
end
 
# Overridden by ReplicableRegistry
Loading
Loading
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::ReverificationBatchWorker, :geo do
include EE::GeoHelpers
let(:replicable_name) { 'widget' }
let(:replicator_class) { double('widget_replicator_class') }
let(:node) { double('node') }
before do
stub_current_geo_node(node)
allow(::Gitlab::Geo::Replicator)
.to receive(:for_replicable_name).with(replicable_name).and_return(replicator_class)
end
subject(:job) { described_class.new }
it 'uses a Geo queue' do
expect(job.sidekiq_options_hash).to include(
'queue' => 'geo:geo_reverification_batch',
'queue_namespace' => :geo
)
end
describe '#perform' do
it 'calls reverify_batch!' do
allow(replicator_class).to receive(:remaining_reverification_batch_count).and_return(1)
expect(replicator_class).to receive(:reverify_batch!)
job.perform(replicable_name)
end
end
describe '#remaining_work_count' do
it 'returns remaining_reverification_batch_count' do
expected = 7
args = { max_batch_count: 95 }
allow(job).to receive(:remaining_capacity).and_return(args[:max_batch_count])
expect(replicator_class).to receive(:remaining_reverification_batch_count).with(args).and_return(expected)
expect(job.remaining_work_count(replicable_name)).to eq(expected)
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment