Skip to content
Snippets Groups Projects
Commit bcfb4667 authored by Ravi Kumar's avatar Ravi Kumar
Browse files

Add a migration to force commit reindex some projects

There are few commits missing from separate index but present in main
index. So, doing commit force reindexing of all the projects which are
present in the main index for commit type.

Changelog: other
MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/130718
EE: true
parent bd9d17ff
Branches 419781-force-reindex-commits-for-leftover-projects
No related tags found
No related merge requests found
---
name: ForceReindexCommitsFromMainIndex
version: '20230901120542'
description: This will force reindex commits for all the projects from the main index
for commit type
group: group::global search
milestone: '16.4'
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/130718
obsolete: false
marked_obsolete_by_url:
marked_obsolete_in_milestone:
# frozen_string_literal: true
class ForceReindexCommitsFromMainIndex < Elastic::Migration
include Elastic::MigrationHelper
batched!
throttle_delay 3.minutes
batch_size 200
retry_on_failure
ELASTIC_TIMEOUT = '5m'
SCHEMA_VERSION = 23_09
def migrate
if completed?
log 'Migration Completed', total_remaining: 0
return
end
batch_of_rids_to_reindex.each do |rid|
ElasticCommitIndexerWorker.perform_in(rand(throttle_delay), rid, false, force: true)
update_schema_version(rid)
end
end
def completed?
total_remaining = remaining_documents_count
set_migration_state(documents_remaining: total_remaining)
log('Checking if migration is finished', total_remaining: total_remaining)
total_remaining == 0
end
private
def batch_of_rids_to_reindex
results = client.search(index: index_name, body: {
size: 0, query: query, aggs: { rids: { terms: { size: batch_size, field: 'commit.rid' } } }
})
rids_hist = results.dig('aggregations', 'rids', 'buckets') || []
rids_hist.pluck('key') # rubocop: disable CodeReuse/ActiveRecord
end
def update_schema_version(rid)
q = query
q[:bool][:filter] << { term: { 'commit.rid' => rid } }
client.update_by_query(index: index_name, routing: "project_#{rid}",
wait_for_completion: false, timeout: ELASTIC_TIMEOUT, conflicts: 'proceed',
body: { query: q, script: { source: "ctx._source.schema_version = #{SCHEMA_VERSION}" } }
)
end
def remaining_documents_count
helper.refresh_index(index_name: index_name)
client.count(index: index_name, body: { query: query })['count']
end
def query
{ bool: { filter: [{ term: { type: 'commit' } }], must_not: { exists: { field: 'schema_version' } } } }
end
def index_name
Elastic::Latest::Config.index_name
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20230901120542_force_reindex_commits_from_main_index.rb')
RSpec.describe ForceReindexCommitsFromMainIndex, :elastic_delete_by_query, :sidekiq_inline, feature_category: :global_search do
let(:version) { 20230901120542 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
let(:client) { ::Gitlab::Search::Client.new }
let_it_be_with_reload(:projects) { create_list(:project, 3, :repository) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
allow(migration).to receive(:client).and_return(client)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched
expect(migration.batch_size).to eq 200
expect(migration.throttle_delay).to eq(3.minutes)
expect(migration).to be_retry_on_failure
end
end
describe '.completed?' do
before do
Project.all.each { |p| populate_commits_in_main_index!(p) }
end
context 'when no commits documents are in the main index' do
before do
client.delete_by_query(index: helper.target_name, conflicts: 'proceed', refresh: true,
body: { query: { bool: { filter: { term: { type: 'commit' } } } } }
)
end
it 'returns true' do
expect(migration).to be_completed
end
end
context 'when commits documents exists in the main index' do
context 'and schema_version is set for all documents' do
before do
Project.all.each { |p| set_schema_version(p) }
end
it 'returns true' do
expect(migration).to be_completed
end
end
context 'and schema_version is not set for some documents' do
before do
set_schema_version(Project.first)
end
it 'returns false' do
expect(migration).not_to be_completed
end
end
end
end
describe '.migrate' do
let(:batch_size) { 1 }
before do
allow(migration).to receive(:batch_size).and_return(batch_size)
Project.all.each { |p| populate_commits_in_main_index!(p) }
end
context 'if migration is completed' do
before do
Project.all.each { |p| set_schema_version(p) }
end
it 'performs logging and does not call ElasticCommitIndexerWorker' do
expect(migration).to receive(:log).with("Setting migration_state to #{{ documents_remaining: 0 }.to_json}").once
expect(migration).to receive(:log).with('Checking if migration is finished', { total_remaining: 0 }).once
expect(migration).to receive(:log).with('Migration Completed', { total_remaining: 0 }).once
expect(ElasticCommitIndexerWorker).not_to receive(:perform_in)
migration.migrate
end
end
context 'if migration is not completed' do
it 'calls ElasticCommitIndexerWorker and performs force indexing' do
delay = a_value_between(0, migration.throttle_delay.seconds.to_i)
expect(indexed_documents_count).to be_zero
expect(ElasticCommitIndexerWorker).to receive(:perform_in).with(delay, anything, false, force: true)
expect(migration).not_to be_completed
migration.migrate
expect(indexed_documents_count).to eq batch_size
expect(migration).not_to be_completed
expect(ElasticCommitIndexerWorker).to receive(:perform_in).with(delay, anything, false, force: true).twice
10.times do
break if migration.completed?
migration.migrate
end
expect(indexed_documents_count).to eq Project.count
expect(migration).to be_completed
end
end
end
def populate_commits_in_main_index!(project)
client.index(index: helper.target_name, routing: "project_#{project.id}", refresh: true,
body: { commit: { type: 'commit',
author: { name: 'F L', email: 't@t.com', time: Time.now.strftime('%Y%m%dT%H%M%S+0000') },
committer: { name: 'F L', email: 't@t.com', time: Time.now.strftime('%Y%m%dT%H%M%S+0000') },
rid: project.id, message: 'test' },
join_field: { name: 'commit', parent: "project_#{project.id}" },
repository_access_level: project.repository_access_level, type: 'commit',
visibility_level: project.visibility_level })
end
def set_schema_version(project)
query = { bool: { filter: [{ term: { type: 'commit' } }, { term: { 'commit.rid' => project.id.to_s } }],
must_not: { exists: { field: 'schema_version' } } } }
script = { source: "ctx._source.schema_version = #{described_class::SCHEMA_VERSION}" }
client.update_by_query(index: helper.target_name, routing: "project_#{project.id}", conflicts: 'proceed',
body: { query: query, script: script }, refresh: true
)
end
def indexed_documents_count
query = { bool: { filter: [{ term: { type: 'commit' } },
{ term: { schema_version: described_class::SCHEMA_VERSION } }] } }
refresh_index!
client.count(index: helper.target_name, body: { query: query })['count']
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment