Skip to content
Snippets Groups Projects
Unverified Commit 669c9b06 authored by John Mason's avatar John Mason
Browse files

Merge branch '419781-force-reindex-commits-for-leftover-projects' into 'master'

parents 2d5a2e97 bcfb4667
No related branches found
No related tags found
No related merge requests found
---
name: ForceReindexCommitsFromMainIndex
version: '20230901120542'
description: This will force reindex commits for all the projects from the main index
for commit type
group: group::global search
milestone: '16.4'
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/130718
obsolete: false
marked_obsolete_by_url:
marked_obsolete_in_milestone:
# frozen_string_literal: true
class ForceReindexCommitsFromMainIndex < Elastic::Migration
include Elastic::MigrationHelper
batched!
throttle_delay 3.minutes
batch_size 200
retry_on_failure
ELASTIC_TIMEOUT = '5m'
SCHEMA_VERSION = 23_09
def migrate
if completed?
log 'Migration Completed', total_remaining: 0
return
end
batch_of_rids_to_reindex.each do |rid|
ElasticCommitIndexerWorker.perform_in(rand(throttle_delay), rid, false, force: true)
update_schema_version(rid)
end
end
def completed?
total_remaining = remaining_documents_count
set_migration_state(documents_remaining: total_remaining)
log('Checking if migration is finished', total_remaining: total_remaining)
total_remaining == 0
end
private
def batch_of_rids_to_reindex
results = client.search(index: index_name, body: {
size: 0, query: query, aggs: { rids: { terms: { size: batch_size, field: 'commit.rid' } } }
})
rids_hist = results.dig('aggregations', 'rids', 'buckets') || []
rids_hist.pluck('key') # rubocop: disable CodeReuse/ActiveRecord
end
def update_schema_version(rid)
q = query
q[:bool][:filter] << { term: { 'commit.rid' => rid } }
client.update_by_query(index: index_name, routing: "project_#{rid}",
wait_for_completion: false, timeout: ELASTIC_TIMEOUT, conflicts: 'proceed',
body: { query: q, script: { source: "ctx._source.schema_version = #{SCHEMA_VERSION}" } }
)
end
def remaining_documents_count
helper.refresh_index(index_name: index_name)
client.count(index: index_name, body: { query: query })['count']
end
def query
{ bool: { filter: [{ term: { type: 'commit' } }], must_not: { exists: { field: 'schema_version' } } } }
end
def index_name
Elastic::Latest::Config.index_name
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20230901120542_force_reindex_commits_from_main_index.rb')
RSpec.describe ForceReindexCommitsFromMainIndex, :elastic_delete_by_query, :sidekiq_inline, feature_category: :global_search do
let(:version) { 20230901120542 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
let(:client) { ::Gitlab::Search::Client.new }
let_it_be_with_reload(:projects) { create_list(:project, 3, :repository) }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
allow(migration).to receive(:client).and_return(client)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched
expect(migration.batch_size).to eq 200
expect(migration.throttle_delay).to eq(3.minutes)
expect(migration).to be_retry_on_failure
end
end
describe '.completed?' do
before do
Project.all.each { |p| populate_commits_in_main_index!(p) }
end
context 'when no commits documents are in the main index' do
before do
client.delete_by_query(index: helper.target_name, conflicts: 'proceed', refresh: true,
body: { query: { bool: { filter: { term: { type: 'commit' } } } } }
)
end
it 'returns true' do
expect(migration).to be_completed
end
end
context 'when commits documents exists in the main index' do
context 'and schema_version is set for all documents' do
before do
Project.all.each { |p| set_schema_version(p) }
end
it 'returns true' do
expect(migration).to be_completed
end
end
context 'and schema_version is not set for some documents' do
before do
set_schema_version(Project.first)
end
it 'returns false' do
expect(migration).not_to be_completed
end
end
end
end
describe '.migrate' do
let(:batch_size) { 1 }
before do
allow(migration).to receive(:batch_size).and_return(batch_size)
Project.all.each { |p| populate_commits_in_main_index!(p) }
end
context 'if migration is completed' do
before do
Project.all.each { |p| set_schema_version(p) }
end
it 'performs logging and does not call ElasticCommitIndexerWorker' do
expect(migration).to receive(:log).with("Setting migration_state to #{{ documents_remaining: 0 }.to_json}").once
expect(migration).to receive(:log).with('Checking if migration is finished', { total_remaining: 0 }).once
expect(migration).to receive(:log).with('Migration Completed', { total_remaining: 0 }).once
expect(ElasticCommitIndexerWorker).not_to receive(:perform_in)
migration.migrate
end
end
context 'if migration is not completed' do
it 'calls ElasticCommitIndexerWorker and performs force indexing' do
delay = a_value_between(0, migration.throttle_delay.seconds.to_i)
expect(indexed_documents_count).to be_zero
expect(ElasticCommitIndexerWorker).to receive(:perform_in).with(delay, anything, false, force: true)
expect(migration).not_to be_completed
migration.migrate
expect(indexed_documents_count).to eq batch_size
expect(migration).not_to be_completed
expect(ElasticCommitIndexerWorker).to receive(:perform_in).with(delay, anything, false, force: true).twice
10.times do
break if migration.completed?
migration.migrate
end
expect(indexed_documents_count).to eq Project.count
expect(migration).to be_completed
end
end
end
def populate_commits_in_main_index!(project)
client.index(index: helper.target_name, routing: "project_#{project.id}", refresh: true,
body: { commit: { type: 'commit',
author: { name: 'F L', email: 't@t.com', time: Time.now.strftime('%Y%m%dT%H%M%S+0000') },
committer: { name: 'F L', email: 't@t.com', time: Time.now.strftime('%Y%m%dT%H%M%S+0000') },
rid: project.id, message: 'test' },
join_field: { name: 'commit', parent: "project_#{project.id}" },
repository_access_level: project.repository_access_level, type: 'commit',
visibility_level: project.visibility_level })
end
def set_schema_version(project)
query = { bool: { filter: [{ term: { type: 'commit' } }, { term: { 'commit.rid' => project.id.to_s } }],
must_not: { exists: { field: 'schema_version' } } } }
script = { source: "ctx._source.schema_version = #{described_class::SCHEMA_VERSION}" }
client.update_by_query(index: helper.target_name, routing: "project_#{project.id}", conflicts: 'proceed',
body: { query: query, script: script }, refresh: true
)
end
def indexed_documents_count
query = { bool: { filter: [{ term: { type: 'commit' } },
{ term: { schema_version: described_class::SCHEMA_VERSION } }] } }
refresh_index!
client.count(index: helper.target_name, body: { query: query })['count']
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment