Skip to content
Snippets Groups Projects
Commit 37eff29d authored by GitLab Bot's avatar GitLab Bot
Browse files

Add latest changes from gitlab-org/gitlab@master

parent 9411a664
No related branches found
No related tags found
No related merge requests found
Showing
with 407 additions and 43 deletions
Loading
Loading
@@ -460,6 +460,7 @@ img.emoji {
.w-8em { width: 8em; }
.w-3rem { width: 3rem; }
.w-15p { width: 15%; }
.w-30p { width: 30%; }
.w-70p { width: 70%; }
 
.h-12em { height: 12em; }
Loading
Loading
Loading
Loading
@@ -11,6 +11,8 @@ class Board < ApplicationRecord
validates :group, presence: true, unless: :project
 
scope :with_associations, -> { preload(:destroyable_lists) }
scope :order_by_name_asc, -> { order(arel_table[:name].lower.asc) }
scope :first_board, -> { where(id: self.order_by_name_asc.limit(1).select(:id)) }
 
def project_needed?
!group
Loading
Loading
Loading
Loading
@@ -447,10 +447,6 @@ module Ci
options_retry_when.include?('always')
end
 
def latest?
!retried?
end
def any_unmet_prerequisites?
prerequisites.present?
end
Loading
Loading
Loading
Loading
@@ -515,7 +515,9 @@ module Ci
# rubocop: enable CodeReuse/ServiceClass
 
def mark_as_processable_after_stage(stage_idx)
builds.skipped.after_stage(stage_idx).find_each(&:process)
builds.skipped.after_stage(stage_idx).find_each do |build|
Gitlab::OptimisticLocking.retry_lock(build, &:process)
end
end
 
def latest?
Loading
Loading
@@ -554,6 +556,13 @@ module Ci
end
end
 
def needs_processing?
statuses
.where(processed: [false, nil])
.latest
.exists?
end
# TODO: this logic is duplicate with Pipeline::Chain::Config::Content
# we should persist this is `ci_pipelines.config_path`
def config_path
Loading
Loading
@@ -583,9 +592,8 @@ module Ci
project.notes.for_commit_id(sha)
end
 
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_builds_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
Loading
Loading
@@ -605,6 +613,10 @@ module Ci
end
end
 
def update_legacy_status
set_status(latest_builds_status.to_s)
end
def protected_ref?
strong_memoize(:protected_ref) { project.protected_for?(git_ref) }
end
Loading
Loading
Loading
Loading
@@ -8,8 +8,26 @@ module Ci
 
scope :preload_needs, -> { preload(:needs) }
 
def self.select_with_aggregated_needs(project)
return all unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
aggregated_needs_names = Ci::BuildNeed
.scoped_build
.select("ARRAY_AGG(name)")
.to_sql
all.select(
'*',
"(#{aggregated_needs_names}) as aggregated_needs_names"
)
end
validates :type, presence: true
 
def aggregated_needs_names
read_attribute(:aggregated_needs_names)
end
def schedulable?
raise NotImplementedError
end
Loading
Loading
Loading
Loading
@@ -13,9 +13,12 @@ module Ci
belongs_to :pipeline
 
has_many :statuses, class_name: 'CommitStatus', foreign_key: :stage_id
has_many :processables, class_name: 'Ci::Processable', foreign_key: :stage_id
has_many :builds, foreign_key: :stage_id
has_many :bridges, foreign_key: :stage_id
 
scope :ordered, -> { order(position: :asc) }
with_options unless: :importing? do
validates :project, presence: true
validates :pipeline, presence: true
Loading
Loading
@@ -80,9 +83,8 @@ module Ci
end
end
 
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_stage_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
Loading
Loading
@@ -102,6 +104,10 @@ module Ci
end
end
 
def update_legacy_status
set_status(latest_stage_status.to_s)
end
def groups
@groups ||= Ci::Group.fabricate(self)
end
Loading
Loading
Loading
Loading
@@ -40,6 +40,7 @@ class CommitStatus < ApplicationRecord
scope :latest, -> { where(retried: [false, nil]) }
scope :retried, -> { where(retried: true) }
scope :ordered, -> { order(:name) }
scope :ordered_by_stage, -> { order(stage_idx: :asc) }
scope :latest_ordered, -> { latest.ordered.includes(project: :namespace) }
scope :retried_ordered, -> { retried.ordered.includes(project: :namespace) }
scope :before_stage, -> (index) { where('stage_idx < ?', index) }
Loading
Loading
@@ -57,6 +58,10 @@ class CommitStatus < ApplicationRecord
preload(:project, :user)
end
 
scope :with_project_preload, -> do
preload(project: :namespace)
end
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
Loading
Loading
@@ -69,6 +74,15 @@ class CommitStatus < ApplicationRecord
where('NOT EXISTS (?)', needs)
end
 
scope :match_id_and_lock_version, -> (slice) do
# it expects that items are an array of attributes to match
# each hash needs to have `id` and `lock_version`
slice.inject(self) do |relation, item|
match = CommitStatus.where(item.slice(:id, :lock_version))
relation.or(match)
end
end
# We use `CommitStatusEnums.failure_reasons` here so that EE can more easily
# extend this `Hash` with new values.
enum_with_nil failure_reason: ::CommitStatusEnums.failure_reasons
Loading
Loading
@@ -86,6 +100,16 @@ class CommitStatus < ApplicationRecord
# rubocop: enable CodeReuse/ServiceClass
end
 
before_save if: :status_changed?, unless: :importing? do
if Feature.disabled?(:ci_atomic_processing, project)
self.processed = nil
elsif latest?
self.processed = false # force refresh of all dependent ones
elsif retried?
self.processed = true # retried are considered to be already processed
end
end
state_machine :status do
event :process do
transition [:skipped, :manual] => :created
Loading
Loading
@@ -136,19 +160,13 @@ class CommitStatus < ApplicationRecord
end
 
after_transition do |commit_status, transition|
next unless commit_status.project
next if transition.loopback?
next if commit_status.processed?
next unless commit_status.project
 
commit_status.run_after_commit do
if pipeline_id
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
end
StageUpdateWorker.perform_async(stage_id)
schedule_stage_and_pipeline_update
ExpireJobCacheWorker.perform_async(id)
end
end
Loading
Loading
@@ -177,6 +195,11 @@ class CommitStatus < ApplicationRecord
where(name: names).latest.slow_composite_status || 'success'
end
 
def self.update_as_processed!
# Marks items as processed, and increases `lock_version` (Optimisitc Locking)
update_all('processed=TRUE, lock_version=COALESCE(lock_version,0)+1')
end
def locking_enabled?
will_save_change_to_status?
end
Loading
Loading
@@ -193,6 +216,10 @@ class CommitStatus < ApplicationRecord
calculate_duration
end
 
def latest?
!retried?
end
def playable?
false
end
Loading
Loading
@@ -244,4 +271,21 @@ class CommitStatus < ApplicationRecord
v =~ /\d+/ ? v.to_i : v
end
end
private
def schedule_stage_and_pipeline_update
if Feature.enabled?(:ci_atomic_processing, project)
# Atomic Processing requires only single Worker
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
StageUpdateWorker.perform_async(stage_id)
end
end
end
Loading
Loading
@@ -4,6 +4,8 @@ require 'slack-notifier'
 
module ChatMessage
class BaseMessage
RELATIVE_LINK_REGEX = /!\[[^\]]*\]\((\/uploads\/[^\)]*)\)/.freeze
attr_reader :markdown
attr_reader :user_full_name
attr_reader :user_name
Loading
Loading
@@ -59,7 +61,11 @@ module ChatMessage
end
 
def format(string)
Slack::Notifier::LinkFormatter.format(string)
Slack::Notifier::LinkFormatter.format(format_relative_links(string))
end
def format_relative_links(string)
string.gsub(RELATIVE_LINK_REGEX, "#{project_url}\\1")
end
 
def attachment_color
Loading
Loading
Loading
Loading
@@ -4,13 +4,24 @@ module Boards
class ListService < Boards::BaseService
def execute
create_board! if parent.boards.empty?
boards
if parent.multiple_issue_boards_available?
boards
else
# When multiple issue boards are not available
# a user is only allowed to view the default shown board
first_board
end
end
 
private
 
def boards
parent.boards
parent.boards.order_by_name_asc
end
def first_board
parent.boards.first_board
end
 
def create_board!
Loading
Loading
@@ -18,5 +29,3 @@ module Boards
end
end
end
Boards::ListService.prepend_if_ee('EE::Boards::ListService')
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
attr_reader :pipeline
DEFAULT_LEASE_TIMEOUT = 1.minute
BATCH_SIZE = 20
def initialize(pipeline)
@pipeline = pipeline
@collection = AtomicProcessingService::StatusCollection.new(pipeline)
end
def execute
return unless pipeline.needs_processing?
success = try_obtain_lease { process! }
# re-schedule if we need further processing
if success && pipeline.needs_processing?
PipelineProcessWorker.perform_async(pipeline.id)
end
success
end
private
def process!
update_stages!
update_pipeline!
update_statuses_processed!
true
end
def update_stages!
pipeline.stages.ordered.each(&method(:update_stage!))
end
def update_stage!(stage)
# Update processables for a given stage in bulk/slices
ids = @collection.created_processable_ids_for_stage_position(stage.position)
ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!))
status = @collection.status_for_stage_position(stage.position)
stage.set_status(status)
end
def update_processables!(ids)
created_processables = pipeline.processables.for_ids(ids)
.with_project_preload
.created
.latest
.ordered_by_stage
.select_with_aggregated_needs(project)
created_processables.each(&method(:update_processable!))
end
def update_pipeline!
pipeline.set_status(@collection.status_of_all)
end
def update_statuses_processed!
processing = @collection.processing_processables
processing.each_slice(BATCH_SIZE) do |slice|
pipeline.statuses.match_id_and_lock_version(slice)
.update_as_processed!
end
end
def update_processable!(processable)
status = processable_status(processable)
return unless HasStatus::COMPLETED_STATUSES.include?(status)
# transition status if possible
Gitlab::OptimisticLocking.retry_lock(processable) do |subject|
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, status)
# update internal representation of status
# to make the status change of processable
# to be taken into account during further processing
@collection.set_processable_status(
processable.id, processable.status, processable.lock_version)
end
end
def processable_status(processable)
if needs_names = processable.aggregated_needs_names
# Processable uses DAG, get status of all dependent needs
@collection.status_for_names(needs_names)
else
# Processable uses Stages, get status of prior stage
@collection.status_for_prior_stage_position(processable.stage_idx.to_i)
end
end
def project
pipeline.project
end
def lease_key
"#{super}::pipeline_id:#{pipeline.id}"
end
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
end
end
end
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
class StatusCollection
include Gitlab::Utils::StrongMemoize
attr_reader :pipeline
# We use these columns to perform an efficient
# calculation of a status
STATUSES_COLUMNS = [
:id, :name, :status, :allow_failure,
:stage_idx, :processed, :lock_version
].freeze
def initialize(pipeline)
@pipeline = pipeline
@stage_statuses = {}
@prior_stage_statuses = {}
end
# This method updates internal status for given ID
def set_processable_status(id, status, lock_version)
processable = all_statuses_by_id[id]
return unless processable
processable[:status] = status
processable[:lock_version] = lock_version
end
# This methods gets composite status of all processables
def status_of_all
status_for_array(all_statuses)
end
# This methods gets composite status for processables with given names
def status_for_names(names)
name_statuses = all_statuses_by_name.slice(*names)
status_for_array(name_statuses.values)
end
# This methods gets composite status for processables before given stage
def status_for_prior_stage_position(position)
strong_memoize("status_for_prior_stage_position_#{position}") do
stage_statuses = all_statuses_grouped_by_stage_position
.select { |stage_position, _| stage_position < position }
status_for_array(stage_statuses.values.flatten)
end
end
# This methods gets a list of processables for a given stage
def created_processable_ids_for_stage_position(current_position)
all_statuses_grouped_by_stage_position[current_position]
.to_a
.select { |processable| processable[:status] == 'created' }
.map { |processable| processable[:id] }
end
# This methods gets composite status for processables at a given stage
def status_for_stage_position(current_position)
strong_memoize("status_for_stage_position_#{current_position}") do
stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a
status_for_array(stage_statuses.flatten)
end
end
# This method returns a list of all processable, that are to be processed
def processing_processables
all_statuses.lazy.reject { |status| status[:processed] }
end
private
def status_for_array(statuses)
result = Gitlab::Ci::Status::Composite
.new(statuses)
.status
result || 'success'
end
def all_statuses_grouped_by_stage_position
strong_memoize(:all_statuses_by_order) do
all_statuses.group_by { |status| status[:stage_idx].to_i }
end
end
def all_statuses_by_id
strong_memoize(:all_statuses_by_id) do
all_statuses.map do |row|
[row[:id], row]
end.to_h
end
end
def all_statuses_by_name
strong_memoize(:statuses_by_name) do
all_statuses.map do |row|
[row[:name], row]
end.to_h
end
end
# rubocop: disable CodeReuse/ActiveRecord
def all_statuses
# We fetch all relevant data in one go.
#
# This is more efficient than relying
# on PostgreSQL to calculate composite status
# for us
#
# Since we need to reprocess everything
# we can fetch all of them and do processing
# ourselves.
strong_memoize(:all_statuses) do
raw_statuses = pipeline
.statuses
.latest
.ordered_by_stage
.pluck(*STATUSES_COLUMNS)
raw_statuses.map do |row|
STATUSES_COLUMNS.zip(row).to_h
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
end
Loading
Loading
@@ -18,7 +18,7 @@ module Ci
# only when the another job has finished
success = process_builds_with_needs(trigger_build_ids) || success
 
@pipeline.update_status
@pipeline.update_legacy_status
 
success
end
Loading
Loading
Loading
Loading
@@ -11,9 +11,15 @@ module Ci
def execute(trigger_build_ids = nil)
update_retried
 
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
if Feature.enabled?(:ci_atomic_processing, pipeline.project)
Ci::PipelineProcessing::AtomicProcessingService
.new(pipeline)
.execute
else
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
end
end
 
private
Loading
Loading
Loading
Loading
@@ -11,7 +11,7 @@ module Ci
reprocess!(build).tap do |new_build|
build.pipeline.mark_as_processable_after_stage(build.stage_idx)
 
new_build.enqueue!
Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue)
 
MergeRequests::AddTodoWhenBuildFailsService
.new(project, current_user)
Loading
Loading
@@ -31,15 +31,17 @@ module Ci
 
attributes.push([:user, current_user])
 
build.retried = true
Ci::Build.transaction do
# mark all other builds of that name as retried
build.pipeline.builds.latest
.where(name: build.name)
.update_all(retried: true)
.update_all(retried: true, processed: true)
 
create_build!(attributes)
create_build!(attributes).tap do
# mark existing object as retried/processed without a reload
build.retried = true
build.processed = true
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
Loading
Loading
@@ -49,6 +51,7 @@ module Ci
def create_build!(attributes)
build = project.builds.new(Hash[attributes])
build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource
build.retried = false
build.save!
build
end
Loading
Loading
Loading
Loading
@@ -7,10 +7,7 @@ class PipelineUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
 
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:update_status)
Ci::Pipeline.find_by_id(pipeline_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
Loading
Loading
@@ -7,11 +7,7 @@ class StageUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
 
# rubocop: disable CodeReuse/ActiveRecord
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
stage.update_status
end
Ci::Stage.find_by_id(stage_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
---
title: Fix relative links in Slack message
merge_request: 22608
author:
type: fixed
---
title: Project issue board names now sorted correctly in FOSS
merge_request: 22807
author:
type: fixed
---
title: Implement Atomic Processing that updates status of builds, stages and pipelines in one go
merge_request: 20229
author:
type: performance
Loading
Loading
@@ -57,7 +57,7 @@ class Gitlab::Seeder::Pipelines
BUILDS.each { |opts| build_create!(pipeline, opts) }
EXTERNAL_JOBS.each { |opts| commit_status_create!(pipeline, opts) }
pipeline.update_duration
pipeline.update_status
pipeline.update_legacy_status
end
end
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment