Skip to content
Snippets Groups Projects
Verified Commit 4dfe26cd authored by Yorick Peterse's avatar Yorick Peterse
Browse files

Rewrite the GitHub importer from scratch

Prior to this MR there were two GitHub related importers:

* Github::Import: the main importer used for GitHub projects
* Gitlab::GithubImport: importer that's somewhat confusingly used for
  importing Gitea projects (apparently they have a compatible API)

This MR renames the Gitea importer to Gitlab::LegacyGithubImport and
introduces a new GitHub importer in the Gitlab::GithubImport namespace.
This new GitHub importer uses Sidekiq for importing multiple resources
in parallel, though it also has the ability to import data sequentially
should this be necessary.

The new code is spread across the following directories:

* lib/gitlab/github_import: this directory contains most of the importer
  code such as the classes used for importing resources.
* app/workers/gitlab/github_import: this directory contains the Sidekiq
  workers, most of which simply use the code from the directory above.
* app/workers/concerns/gitlab/github_import: this directory provides a
  few modules that are included in every GitHub importer worker.

== Stages

The import work is divided into separate stages, with each stage
importing a specific set of data. Stages will schedule the work that
needs to be performed, followed by scheduling a job for the
"AdvanceStageWorker" worker. This worker will periodically check if all
work is completed and schedule the next stage if this is the case. If
work is not yet completed this worker will reschedule itself.

Using this approach we don't have to block threads by calling `sleep()`,
as doing so for large projects could block the thread from doing any
work for many hours.

== Retrying Work

Workers will reschedule themselves whenever necessary. For example,
hitting the GitHub API's rate limit will result in jobs rescheduling
themselves. These jobs are not processed until the rate limit has been
reset.

== User Lookups

Part of the importing process involves looking up user details in the
GitHub API so we can map them to GitLab users. The old importer used
an in-memory cache, but this obviously doesn't work when the work is
spread across different threads.

The new importer uses a Redis cache and makes sure we only perform
API/database calls if absolutely necessary.  Frequently used keys are
refreshed, and lookup misses are also cached; removing the need for
performing API/database calls if we know we don't have the data we're
looking for.

== Performance & Models

The new importer in various places uses raw INSERT statements (as
generated by `Gitlab::Database.bulk_insert`) instead of using Rails
models. This allows us to bypass any validations and callbacks,
drastically reducing the number of SQL queries and Gitaly RPC calls
necessary to import projects.

To ensure the code produces valid data the corresponding tests check if
the produced rows are valid according to the model validation rules.
parent 90be53c5
No related branches found
No related tags found
No related merge requests found
Showing
with 586 additions and 3 deletions
Loading
Loading
@@ -43,7 +43,7 @@ class Import::GithubController < Import::BaseController
@target_namespace = find_or_create_namespace(namespace_path, current_user.namespace_path)
 
if can?(current_user, :create_projects, @target_namespace)
@project = Gitlab::GithubImport::ProjectCreator.new(repo, @project_name, @target_namespace, current_user, access_params, type: provider).execute
@project = Gitlab::LegacyGithubImport::ProjectCreator.new(repo, @project_name, @target_namespace, current_user, access_params, type: provider).execute
else
render 'unauthorized'
end
Loading
Loading
@@ -52,7 +52,7 @@ class Import::GithubController < Import::BaseController
private
 
def client
@client ||= Gitlab::GithubImport::Client.new(session[access_token_key], client_options)
@client ||= Gitlab::LegacyGithubImport::Client.new(session[access_token_key], client_options)
end
 
def verify_import_enabled
Loading
Loading
Loading
Loading
@@ -365,6 +365,7 @@ class Project < ActiveRecord::Base
scope :abandoned, -> { where('projects.last_activity_at < ?', 6.months.ago) }
 
scope :excluding_project, ->(project) { where.not(id: project) }
scope :import_started, -> { where(import_status: 'started') }
 
state_machine :import_status, initial: :none do
event :import_schedule do
Loading
Loading
@@ -1190,6 +1191,10 @@ class Project < ActiveRecord::Base
!!repository.exists?
end
 
def wiki_repository_exists?
wiki.repository_exists?
end
# update visibility_level of forks
def update_forks_visibility_level
return unless visibility_level < visibility_level_was
Loading
Loading
@@ -1433,6 +1438,31 @@ class Project < ActiveRecord::Base
reload_repository!
end
 
def after_import
repository.after_import
import_finish
remove_import_jid
update_project_counter_caches
end
def update_project_counter_caches
classes = [
Projects::OpenIssuesCountService,
Projects::OpenMergeRequestsCountService
]
classes.each do |klass|
klass.new(self).refresh_cache
end
end
def remove_import_jid
return unless import_jid
Gitlab::SidekiqStatus.unset(import_jid)
update_column(:import_jid, nil)
end
def running_or_pending_build_count(force: false)
Rails.cache.fetch(['projects', id, 'running_or_pending_build_count'], force: force) do
builds.running_or_pending.count(:all)
Loading
Loading
@@ -1690,6 +1720,17 @@ class Project < ActiveRecord::Base
Gitlab::ReferenceCounter.new(gl_repository(is_wiki: wiki))
end
 
# Refreshes the expiration time of the associated import job ID.
#
# This method can be used by asynchronous importers to refresh the status,
# preventing the StuckImportJobsWorker from marking the import as failed.
def refresh_import_jid_expiration
return unless import_jid
Gitlab::SidekiqStatus
.set(import_jid, StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION)
end
private
 
def storage
Loading
Loading
Loading
Loading
@@ -973,6 +973,10 @@ class Repository
raw_repository.fetch_source_branch!(source_repository.raw_repository, source_branch, local_ref)
end
 
def remote_exists?(name)
raw_repository.remote_exists?(name)
end
def compare_source_branch(target_branch_name, source_repository, source_branch_name, straight:)
raw_repository.compare_source_branch(target_branch_name, source_repository.raw_repository, source_branch_name, straight: straight)
end
Loading
Loading
Loading
Loading
@@ -267,6 +267,11 @@ class User < ActiveRecord::Base
end
end
 
def for_github_id(id)
joins(:identities)
.where(identities: { provider: :github, extern_uid: id.to_s })
end
# Find a User by their primary email or any associated secondary email
def find_by_any_email(email)
by_any_email(email).take
Loading
Loading
Loading
Loading
@@ -4,6 +4,18 @@ module Projects
 
Error = Class.new(StandardError)
 
# Returns true if this importer is supposed to perform its work in the
# background.
#
# This method will only return `true` if async importing is explicitly
# supported by an importer class (`Gitlab::GithubImport::ParallelImporter`
# for example).
def async?
return false unless has_importer?
!!importer_class.try(:async?)
end
def execute
add_repository_to_project unless project.gitlab_project_import?
 
Loading
Loading
@@ -80,7 +92,11 @@ module Projects
end
 
def importer
Gitlab::ImportSources.importer(project.import_type).new(project)
importer_class.new(project)
end
def importer_class
Gitlab::ImportSources.importer(project.import_type)
end
 
def unknown_url?
Loading
Loading
# frozen_string_literal: true
module Gitlab
module GithubImport
# NotifyUponDeath can be included into a GitHub worker class if it should
# notify any JobWaiter instances upon being moved to the Sidekiq dead queue.
#
# Note that this will only notify the waiter upon graceful termination, a
# SIGKILL will still result in the waiter _not_ being notified.
#
# Workers including this module must have jobs passed where the last
# argument is the key to notify, as a String.
module NotifyUponDeath
extend ActiveSupport::Concern
included do
# If a job is being exhausted we still want to notify the
# AdvanceStageWorker. This prevents the entire import from getting stuck
# just because 1 job threw too many errors.
sidekiq_retries_exhausted do |job|
args = job['args']
jid = job['jid']
if args.length == 3 && (key = args.last) && key.is_a?(String)
JobWaiter.notify(key, jid)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
# ObjectImporter defines the base behaviour for every Sidekiq worker that
# imports a single resource such as a note or pull request.
module ObjectImporter
extend ActiveSupport::Concern
included do
include Sidekiq::Worker
include GithubImport::Queue
include ReschedulingMethods
include NotifyUponDeath
end
# project - An instance of `Project` to import the data into.
# client - An instance of `Gitlab::GithubImport::Client`
# hash - A Hash containing the details of the object to import.
def import(project, client, hash)
object = representation_class.from_json_hash(hash)
importer_class.new(object, project, client).execute
counter.increment(project: project.path_with_namespace)
end
def counter
@counter ||= Gitlab::Metrics.counter(counter_name, counter_description)
end
# Returns the representation class to use for the object. This class must
# define the class method `from_json_hash`.
def representation_class
raise NotImplementedError
end
# Returns the class to use for importing the object.
def importer_class
raise NotImplementedError
end
# Returns the name (as a Symbol) of the Prometheus counter.
def counter_name
raise NotImplementedError
end
# Returns the description (as a String) of the Prometheus counter.
def counter_description
raise NotImplementedError
end
end
end
end
module Gitlab
module GithubImport
module Queue
extend ActiveSupport::Concern
included do
# If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state
# forever.
sidekiq_options queue: 'github_importer', dead: false, retry: 5
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
# Module that provides methods shared by the various workers used for
# importing GitHub projects.
module ReschedulingMethods
# project_id - The ID of the GitLab project to import the note into.
# hash - A Hash containing the details of the GitHub object to imoprt.
# notify_key - The Redis key to notify upon completion, if any.
def perform(project_id, hash, notify_key = nil)
project = Project.find_by(id: project_id)
return notify_waiter(notify_key) unless project
client = GithubImport.new_client_for(project, parallel: true)
if try_import(project, client, hash)
notify_waiter(notify_key)
else
# In the event of hitting the rate limit we want to reschedule the job
# so its retried after our rate limit has been reset.
self.class
.perform_in(client.rate_limit_resets_in, project.id, hash, notify_key)
end
end
def try_import(*args)
import(*args)
true
rescue RateLimitError
false
end
def notify_waiter(key = nil)
JobWaiter.notify(key, jid) if key
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
module StageMethods
# project_id - The ID of the GitLab project to import the data into.
def perform(project_id)
return unless (project = find_project(project_id))
client = GithubImport.new_client_for(project)
try_import(client, project)
end
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def try_import(client, project)
import(client, project)
rescue RateLimitError
self.class.perform_in(client.rate_limit_resets_in, project.id)
end
def find_project(id)
# If the project has been marked as failed we want to bail out
# automatically.
Project.import_started.find_by(id: id)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
# AdvanceStageWorker is a worker used by the GitHub importer to wait for a
# number of jobs to complete, without blocking a thread. Once all jobs have
# been completed this worker will advance the import process to the next
# stage.
class AdvanceStageWorker
include Sidekiq::Worker
sidekiq_options queue: 'github_importer_advance_stage', dead: false
INTERVAL = 30.seconds.to_i
# The number of seconds to wait (while blocking the thread) before
# continueing to the next waiter.
BLOCKING_WAIT_TIME = 5
# The known importer stages and their corresponding Sidekiq workers.
STAGES = {
issues_and_diff_notes: Stage::ImportIssuesAndDiffNotesWorker,
notes: Stage::ImportNotesWorker,
finish: Stage::FinishImportWorker
}.freeze
# project_id - The ID of the project being imported.
# waiters - A Hash mapping Gitlab::JobWaiter keys to the number of
# remaining jobs.
# next_stage - The name of the next stage to start when all jobs have been
# completed.
def perform(project_id, waiters, next_stage)
return unless (project = find_project(project_id))
new_waiters = wait_for_jobs(waiters)
if new_waiters.empty?
# We refresh the import JID here so workers importing individual
# resources (e.g. notes) don't have to do this all the time, reducing
# the pressure on Redis. We _only_ do this once all jobs are done so
# we don't get stuck forever if one or more jobs failed to notify the
# JobWaiter.
project.refresh_import_jid_expiration
STAGES.fetch(next_stage.to_sym).perform_async(project_id)
else
self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage)
end
end
def wait_for_jobs(waiters)
waiters.each_with_object({}) do |(key, remaining), new_waiters|
waiter = JobWaiter.new(remaining, key)
# We wait for a brief moment of time so we don't reschedule if we can
# complete the work fast enough.
waiter.wait(BLOCKING_WAIT_TIME)
next unless waiter.jobs_remaining.positive?
new_waiters[waiter.key] = waiter.jobs_remaining
end
end
def find_project(id)
# We only care about the import JID so we can refresh it. We also only
# want the project if it hasn't been marked as failed yet. It's possible
# the import gets marked as stuck when jobs of the current stage failed
# somehow.
Project.select(:import_jid).import_started.find_by(id: id)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
class ImportDiffNoteWorker
include ObjectImporter
def representation_class
Representation::DiffNote
end
def importer_class
Importer::DiffNoteImporter
end
def counter_name
:github_importer_imported_diff_notes
end
def counter_description
'The number of imported GitHub pull request review comments'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
class ImportIssueWorker
include ObjectImporter
def representation_class
Representation::Issue
end
def importer_class
Importer::IssueAndLabelLinksImporter
end
def counter_name
:github_importer_imported_issues
end
def counter_description
'The number of imported GitHub issues'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
class ImportNoteWorker
include ObjectImporter
def representation_class
Representation::Note
end
def importer_class
Importer::NoteImporter
end
def counter_name
:github_importer_imported_notes
end
def counter_description
'The number of imported GitHub comments'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
class ImportPullRequestWorker
include ObjectImporter
def representation_class
Representation::PullRequest
end
def importer_class
Importer::PullRequestImporter
end
def counter_name
:github_importer_imported_pull_requests
end
def counter_description
'The number of imported GitHub pull requests'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
class RefreshImportJidWorker
include Sidekiq::Worker
include GithubImport::Queue
# The interval to schedule new instances of this job at.
INTERVAL = 1.minute.to_i
def self.perform_in_the_future(*args)
perform_in(INTERVAL, *args)
end
# project_id - The ID of the project that is being imported.
# check_job_id - The ID of the job for which to check the status.
def perform(project_id, check_job_id)
return unless (project = find_project(project_id))
if SidekiqStatus.running?(check_job_id)
# As long as the repository is being cloned we want to keep refreshing
# the import JID status.
project.refresh_import_jid_expiration
self.class.perform_in_the_future(project_id, check_job_id)
end
# If the job is no longer running there's nothing else we need to do. If
# the clone job completed successfully it will have scheduled the next
# stage, if it died there's nothing we can do anyway.
end
def find_project(id)
Project.select(:import_jid).import_started.find_by(id: id)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
module Stage
class FinishImportWorker
include Sidekiq::Worker
include GithubImport::Queue
include StageMethods
# project - An instance of Project.
def import(_, project)
project.after_import
report_import_time(project)
end
def report_import_time(project)
duration = Time.zone.now - project.created_at
path = project.path_with_namespace
histogram.observe({ project: path }, duration)
counter.increment
logger.info("GitHub importer finished for #{path} in #{duration.round(2)} seconds")
end
def histogram
@histogram ||= Gitlab::Metrics.histogram(
:github_importer_total_duration_seconds,
'Total time spent importing GitHub projects, in seconds'
)
end
def counter
@counter ||= Gitlab::Metrics.counter(
:github_importer_imported_projects,
'The number of imported GitHub projects'
)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
module Stage
class ImportBaseDataWorker
include Sidekiq::Worker
include GithubImport::Queue
include StageMethods
# These importers are fast enough that we can just run them in the same
# thread.
IMPORTERS = [
Importer::LabelsImporter,
Importer::MilestonesImporter,
Importer::ReleasesImporter
].freeze
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
IMPORTERS.each do |klass|
klass.new(project, client).execute
end
project.refresh_import_jid_expiration
ImportPullRequestsWorker.perform_async(project.id)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
module Stage
class ImportIssuesAndDiffNotesWorker
include Sidekiq::Worker
include GithubImport::Queue
include StageMethods
# The importers to run in this stage. Issues can't be imported earlier
# on as we also use these to enrich pull requests with assigned labels.
IMPORTERS = [
Importer::IssuesImporter,
Importer::DiffNotesImporter
].freeze
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
waiters = IMPORTERS.each_with_object({}) do |klass, hash|
waiter = klass.new(project, client).execute
hash[waiter.key] = waiter.jobs_remaining
end
AdvanceStageWorker.perform_async(project.id, waiters, :notes)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module GithubImport
module Stage
class ImportNotesWorker
include Sidekiq::Worker
include GithubImport::Queue
include StageMethods
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
waiter = Importer::NotesImporter
.new(project, client)
.execute
AdvanceStageWorker.perform_async(
project.id,
{ waiter.key => waiter.jobs_remaining },
:finish
)
end
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment