Skip to content
Snippets Groups Projects
Commit e1d12ba9 authored by Kamil Trzcińśki's avatar Kamil Trzcińśki Committed by Shinya Maeda
Browse files

Refactor Clusters to be consisted from GcpProvider and KubernetesPlatform

parent c4cbf115
No related branches found
No related tags found
No related merge requests found
Showing
with 523 additions and 339 deletions
Loading
Loading
@@ -31,7 +31,7 @@ class Projects::ClustersController < Projects::ApplicationController
end
 
def create
@cluster = Ci::CreateClusterService
@cluster = Ci::CreateService
.new(project, current_user, create_params)
.execute(token_in_session)
 
Loading
Loading
@@ -88,19 +88,27 @@ class Projects::ClustersController < Projects::ApplicationController
 
def create_params
params.require(:cluster).permit(
:gcp_project_id,
:gcp_cluster_zone,
:gcp_cluster_name,
:gcp_cluster_size,
:gcp_machine_type,
:project_namespace,
:enabled)
:enabled,
:platform_type,
:provider_type,
kubernetes_platform: [
:namespace
],
gcp_provider: [
:project_id,
:cluster_zone,
:cluster_name,
:cluster_size,
:machine_type
])
end
 
def update_params
params.require(:cluster).permit(
:project_namespace,
:enabled)
:enabled,
kubernetes_platform: [
:namespace
])
end
 
def authorize_google_api
Loading
Loading
module Clusters
class Cluster < ActiveRecord::Base
include Presentable
belongs_to :user
belongs_to :service
enum :platform_type {
kubernetes: 1
}
enum :provider_type {
user: 0,
gcp: 1
}
has_many :cluster_projects
has_many :projects, through: :cluster_projects
has_one :gcp_provider
has_one :kubernetes_platform
accepts_nested_attributes_for :gcp_provider
accepts_nested_attributes_for :kubernetes_platform
validates :kubernetes_platform, presence: true, if: :kubernetes?
validates :gcp_provider, presence: true, if: :gcp?
validate :restrict_modification, on: :update
delegate :status, to: :provider, allow_nil: true
delegate :status_reason, to: :provider, allow_nil: true
def restrict_modification
if provider&.on_creation?
errors.add(:base, "cannot modify during creation")
return false
end
true
end
def provider
return gcp_provider if gcp?
end
def platform
return kubernetes_platform if kubernetes?
end
def first_project
return @first_project if defined?(@first_project)
@first_project = projects.first
end
end
end
module Clusters
class ClusterProject < ActiveRecord::Base
belongs_to :cluster
belongs_to :project
end
end
module Clusters
module Platforms
class Kubernetes < ActiveRecord::Base
include Gitlab::Kubernetes
include ReactiveCaching
TEMPLATE_PLACEHOLDER = 'Kubernetes namespace'.freeze
self.reactive_cache_key = ->(service) { [service.class.model_name.singular, service.project_id] }
belongs_to :cluster
attr_encrypted :password,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
attr_encrypted :token,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
validates :namespace,
allow_blank: true,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
validates :api_url, url: true, presence: true
validates :token, presence: true
after_save :clear_reactive_cache!
before_validation :enforce_namespace_to_lower_case
def actual_namespace
if namespace.present?
namespace
else
default_namespace
end
end
def predefined_variables
config = YAML.dump(kubeconfig)
variables = [
{ key: 'KUBE_URL', value: api_url, public: true },
{ key: 'KUBE_TOKEN', value: token, public: false },
{ key: 'KUBE_NAMESPACE', value: actual_namespace, public: true },
{ key: 'KUBECONFIG', value: config, public: false, file: true }
]
if ca_pem.present?
variables << { key: 'KUBE_CA_PEM', value: ca_pem, public: true }
variables << { key: 'KUBE_CA_PEM_FILE', value: ca_pem, public: true, file: true }
end
variables
end
# Constructs a list of terminals from the reactive cache
#
# Returns nil if the cache is empty, in which case you should try again a
# short time later
def terminals(environment)
with_reactive_cache do |data|
pods = filter_by_label(data[:pods], app: environment.slug)
terminals = pods.flat_map { |pod| terminals_for_pod(api_url, actual_namespace, pod) }
terminals.each { |terminal| add_terminal_auth(terminal, terminal_auth) }
end
end
# Caches resources in the namespace so other calls don't need to block on
# network access
def calculate_reactive_cache
return unless active? && project && !project.pending_delete?
# We may want to cache extra things in the future
{ pods: read_pods }
end
def kubeconfig
to_kubeconfig(
url: api_url,
namespace: actual_namespace,
token: token,
ca_pem: ca_pem)
end
def namespace_placeholder
default_namespace || TEMPLATE_PLACEHOLDER
end
def default_namespace
"#{cluster.first_project.path}-#{cluster.first_project.id}" if cluster.first_project
end
def read_secrets
kubeclient = build_kubeclient!
kubeclient.get_secrets.as_json
rescue KubeException => err
raise err unless err.error_code == 404
[]
end
# Returns a hash of all pods in the namespace
def read_pods
kubeclient = build_kubeclient!
kubeclient.get_pods(namespace: actual_namespace).as_json
rescue KubeException => err
raise err unless err.error_code == 404
[]
end
def kubeclient_ssl_options
opts = { verify_ssl: OpenSSL::SSL::VERIFY_PEER }
if ca_pem.present?
opts[:cert_store] = OpenSSL::X509::Store.new
opts[:cert_store].add_cert(OpenSSL::X509::Certificate.new(ca_pem))
end
opts
end
private
def build_kubeclient!(api_path: 'api', api_version: 'v1')
raise "Incomplete settings" unless api_url && actual_namespace && token
::Kubeclient::Client.new(
join_api_url(api_path),
api_version,
auth_options: kubeclient_auth_options,
ssl_options: kubeclient_ssl_options,
http_proxy_uri: ENV['http_proxy']
)
end
def kubeclient_auth_options
return { username: username, password: password } if username
return { bearer_token: token } if token
end
def join_api_url(api_path)
url = URI.parse(api_url)
prefix = url.path.sub(%r{/+\z}, '')
url.path = [prefix, api_path].join("/")
url.to_s
end
def terminal_auth
{
token: token,
ca_pem: ca_pem,
max_session_time: current_application_settings.terminal_max_session_time
}
end
def enforce_namespace_to_lower_case
self.namespace = self.namespace&.downcase
end
end
end
end
module Clusters
module Providers
class Gcp < ActiveRecord::Base
belongs_to :cluster
default_value_for :cluster_zone, 'us-central1-a'
default_value_for :cluster_size, 3
default_value_for :machine_type, 'n1-standard-4'
attr_encrypted :access_token,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
validates :project_id,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
validates :cluster_name,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
validates :cluster_zone, presence: true
validates :cluster_size,
presence: true,
numericality: {
only_integer: true,
greater_than: 0
}
state_machine :status, initial: :scheduled do
state :scheduled, value: 1
state :creating, value: 2
state :created, value: 3
state :errored, value: 4
event :make_creating do
transition any - [:creating] => :creating
end
event :make_created do
transition any - [:created] => :created
end
event :make_errored do
transition any - [:errored] => :errored
end
before_transition any => [:errored, :created] do |provider|
provider.token = nil
provider.operation_id = nil
provider.save!
end
before_transition any => [:errored] do |provider, transition|
status_reason = transition.args.first
provider.status_reason = status_reason if status_reason
end
end
def on_creation?
scheduled? || creating?
end
def api_client
return unless access_token
@api_client ||= GoogleApi::CloudPlatform::Client.new(access_token, nil)
end
end
end
end
module Gcp
class Cluster < ActiveRecord::Base
extend Gitlab::Gcp::Model
include Presentable
belongs_to :project, inverse_of: :cluster
belongs_to :user
belongs_to :service
scope :enabled, -> { where(enabled: true) }
scope :disabled, -> { where(enabled: false) }
default_value_for :gcp_cluster_zone, 'us-central1-a'
default_value_for :gcp_cluster_size, 3
default_value_for :gcp_machine_type, 'n1-standard-4'
attr_encrypted :password,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
attr_encrypted :kubernetes_token,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
attr_encrypted :gcp_token,
mode: :per_attribute_iv,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
validates :gcp_project_id,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
validates :gcp_cluster_name,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
validates :gcp_cluster_zone, presence: true
validates :gcp_cluster_size,
presence: true,
numericality: {
only_integer: true,
greater_than: 0
}
validates :project_namespace,
allow_blank: true,
length: 1..63,
format: {
with: Gitlab::Regex.kubernetes_namespace_regex,
message: Gitlab::Regex.kubernetes_namespace_regex_message
}
# if we do not do status transition we prevent change
validate :restrict_modification, on: :update, unless: :status_changed?
state_machine :status, initial: :scheduled do
state :scheduled, value: 1
state :creating, value: 2
state :created, value: 3
state :errored, value: 4
event :make_creating do
transition any - [:creating] => :creating
end
event :make_created do
transition any - [:created] => :created
end
event :make_errored do
transition any - [:errored] => :errored
end
before_transition any => [:errored, :created] do |cluster|
cluster.gcp_token = nil
cluster.gcp_operation_id = nil
end
before_transition any => [:errored] do |cluster, transition|
status_reason = transition.args.first
cluster.status_reason = status_reason if status_reason
end
end
def project_namespace_placeholder
"#{project.path}-#{project.id}"
end
def on_creation?
scheduled? || creating?
end
def api_url
'https://' + endpoint if endpoint
end
def restrict_modification
if on_creation?
errors.add(:base, "cannot modify during creation")
return false
end
true
end
end
end
Loading
Loading
@@ -177,7 +177,9 @@ class Project < ActiveRecord::Base
has_one :import_data, class_name: 'ProjectImportData', inverse_of: :project, autosave: true
has_one :project_feature, inverse_of: :project
has_one :statistics, class_name: 'ProjectStatistics'
has_one :cluster, class_name: 'Gcp::Cluster', inverse_of: :project
has_many :cluster_projects, class_name: 'Clusters::ClusterProject'
has_one :cluster, through: :cluster_projects
 
# Container repositories need to remove data from the container registry,
# which is not managed by the DB. Hence we're still using dependent: :destroy
Loading
Loading
module Ci
class FetchGcpOperationService
def execute(cluster)
api_client =
GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
operation = api_client.projects_zones_operations(
cluster.gcp_project_id,
cluster.gcp_cluster_zone,
cluster.gcp_operation_id)
yield(operation) if block_given?
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return cluster.make_errored!("Failed to request to CloudPlatform; #{e.message}")
end
end
end
##
# TODO:
# Almost components in this class were copied from app/models/project_services/kubernetes_service.rb
# We should dry up those classes not to repeat the same code.
# Maybe we should have a special facility (e.g. lib/kubernetes_api) to maintain all Kubernetes API caller.
module Ci
class FetchKubernetesTokenService
attr_reader :api_url, :ca_pem, :username, :password
def initialize(api_url, ca_pem, username, password)
@api_url = api_url
@ca_pem = ca_pem
@username = username
@password = password
end
def execute
read_secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
return Base64.decode64(token_base64) if token_base64
end
end
nil
end
private
def read_secrets
kubeclient = build_kubeclient!
kubeclient.get_secrets.as_json
rescue KubeException => err
raise err unless err.error_code == 404
[]
end
def build_kubeclient!(api_path: 'api', api_version: 'v1')
raise "Incomplete settings" unless api_url && username && password
::Kubeclient::Client.new(
join_api_url(api_path),
api_version,
auth_options: { username: username, password: password },
ssl_options: kubeclient_ssl_options,
http_proxy_uri: ENV['http_proxy']
)
end
def join_api_url(api_path)
url = URI.parse(api_url)
prefix = url.path.sub(%r{/+\z}, '')
url.path = [prefix, api_path].join("/")
url.to_s
end
def kubeclient_ssl_options
opts = { verify_ssl: OpenSSL::SSL::VERIFY_PEER }
if ca_pem.present?
opts[:cert_store] = OpenSSL::X509::Store.new
opts[:cert_store].add_cert(OpenSSL::X509::Certificate.new(ca_pem))
end
opts
end
end
end
module Ci
class FinalizeClusterCreationService
def execute(cluster)
api_client =
GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
begin
gke_cluster = api_client.projects_zones_clusters_get(
cluster.gcp_project_id,
cluster.gcp_cluster_zone,
cluster.gcp_cluster_name)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return cluster.make_errored!("Failed to request to CloudPlatform; #{e.message}")
end
endpoint = gke_cluster.endpoint
api_url = 'https://' + endpoint
ca_cert = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
username = gke_cluster.master_auth.username
password = gke_cluster.master_auth.password
kubernetes_token = Ci::FetchKubernetesTokenService.new(
api_url, ca_cert, username, password).execute
unless kubernetes_token
return cluster.make_errored!('Failed to get a default token of kubernetes')
end
Ci::IntegrateClusterService.new.execute(
cluster, endpoint, ca_cert, kubernetes_token, username, password)
end
end
end
module Ci
class IntegrateClusterService
def execute(cluster, endpoint, ca_cert, token, username, password)
Gcp::Cluster.transaction do
cluster.update!(
enabled: true,
endpoint: endpoint,
ca_cert: ca_cert,
kubernetes_token: token,
username: username,
password: password,
service: cluster.project.find_or_initialize_service('kubernetes'),
status_event: :make_created)
cluster.service.update!(
active: true,
api_url: cluster.api_url,
ca_pem: ca_cert,
namespace: cluster.project_namespace,
token: token)
end
rescue ActiveRecord::RecordInvalid => e
cluster.make_errored!("Failed to integrate cluster into kubernetes_service: #{e.message}")
end
end
end
module Ci
class ProvisionClusterService
def execute(cluster)
api_client =
GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
begin
operation = api_client.projects_zones_clusters_create(
cluster.gcp_project_id,
cluster.gcp_cluster_zone,
cluster.gcp_cluster_name,
cluster.gcp_cluster_size,
machine_type: cluster.gcp_machine_type)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return cluster.make_errored!("Failed to request to CloudPlatform; #{e.message}")
end
unless operation.status == 'RUNNING' || operation.status == 'PENDING'
return cluster.make_errored!("Operation status is unexpected; #{operation.status_message}")
end
cluster.gcp_operation_id = api_client.parse_operation_id(operation.self_link)
unless cluster.gcp_operation_id
return cluster.make_errored!('Can not find operation_id from self_link')
end
if cluster.make_creating
WaitForClusterCreationWorker.perform_in(
WaitForClusterCreationWorker::INITIAL_INTERVAL, cluster.id)
else
return cluster.make_errored!("Failed to update cluster record; #{cluster.errors}")
end
end
end
end
module Ci
class UpdateClusterService < BaseService
def execute(cluster)
Gcp::Cluster.transaction do
cluster.update!(params)
if params['enabled'] == 'true'
cluster.service.update!(
active: true,
api_url: cluster.api_url,
ca_pem: cluster.ca_cert,
namespace: cluster.project_namespace,
token: cluster.kubernetes_token)
else
cluster.service.update!(active: false)
end
end
rescue ActiveRecord::RecordInvalid => e
cluster.errors.add(:base, e.message)
end
end
end
module Ci
class CreateClusterService < BaseService
module Clusters
class CreateService < BaseService
def execute(access_token)
params['gcp_machine_type'] ||= GoogleApi::CloudPlatform::Client::DEFAULT_MACHINE_TYPE
 
cluster_params =
params.merge(user: current_user,
gcp_token: access_token)
params.merge(user: current_user)
 
project.create_cluster(cluster_params).tap do |cluster|
ClusterProvisionWorker.perform_async(cluster.id) if cluster.persisted?
Loading
Loading
module Clusters
module Gcp
class FetchOperationService
def execute(provider)
operation = provider.api_client.projects_zones_operations(
provider.project_id,
provider.cluster_zone,
provider.operation_id)
yield(operation) if block_given?
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return provider.make_errored!("Failed to request to CloudPlatform; #{e.message}")
end
end
end
end
module Clusters
module Gcp
class FinalizeCreationService
attr_reader :provider
def execute(provider)
@provider = provider
configure_provider
configure_kubernetes_platform
request_kuberenetes_platform_token
ActiveRecord::Base.transaction do
kubernetes_platform.update!
provider.make_created!
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return cluster.make_errored!("Failed to request to CloudPlatform; #{e.message}")
rescue ActiveRecord::RecordInvalid => e
cluster.make_errored!("Failed to configure GKE Cluster: #{e.message}")
end
private
def configure_provider
provider.endpoint = gke_cluster.endpoint
end
def configure_kubernetes_platform
kubernetes_platform = cluster.kubernetes_platform
kubernetes_platform.api_url = 'https://' + endpoint
kubernetes_platform.ca_cert = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
kubernetes_platform.username = gke_cluster.master_auth.username
kubernetes_platform.password = gke_cluster.master_auth.password
end
def request_kuberenetes_platform_token
kubernetes_platform.read_secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
if token_base64
kubernetes_platform.token = Base64.decode64(token_base64)
break
end
end
end
end
def gke_cluster
@gke_cluster ||= provider.api_client.projects_zones_clusters_get(
provider.gcp_project_id,
provider.gcp_cluster_zone,
provider.gcp_cluster_name)
end
def cluster
provider.cluster
end
def kubernetes_platform
cluster.kubernetes_platform
end
end
end
end
module Clusters
module Gcp
class ProvisionService
attr_reader :provider
def execute(provider)
@provider = provider
unless operation.status == 'RUNNING' || operation.status == 'PENDING'
return provider.make_errored!("Operation status is unexpected; #{operation.status_message}")
end
provider.operation_id = operation_id
unless provider.operation_id
return provider.make_errored!('Can not find operation_id from self_link')
end
if provider.make_creating
WaitForClusterCreationWorker.perform_in(
WaitForClusterCreationWorker::INITIAL_INTERVAL, provider.id)
else
return provider.make_errored!("Failed to update provider record; #{provider.errors}")
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return provider.make_errored!("Failed to request to CloudPlatform; #{e.message}")
end
private
def operation_id
api_client.parse_operation_id(operation.self_link)
end
def operation
@operation ||= api_client.projects_zones_providers_create(
provider.project_id,
provider.provider_zone,
provider.provider_name,
provider.provider_size,
machine_type: provider.machine_type)
end
def api_client
provider.api_client
end
end
end
end
module Clusters
module Gcp
class VerifyProvisionStatusService
attr_reader :provider
INITIAL_INTERVAL = 2.minutes
EAGER_INTERVAL = 10.seconds
TIMEOUT = 20.minutes
def execute(provider)
@provider = provider
request_operation do |operation|
case operation.status
when 'RUNNING'
continue_creation(operation)
when 'DONE'
finalize_creation
else
return provider.make_errored!("Unexpected operation status; #{operation.status} #{operation.status_message}")
end
end
end
private
def continue_creation(operation)
if TIMEOUT < Time.now.utc - operation.start_time.to_time.utc
return provider.make_errored!("Cluster creation time exceeds timeout; #{TIMEOUT}")
end
WaitForClusterCreationWorker.perform_in(EAGER_INTERVAL, provider.cluster_id)
end
def finalize_creation
Clusters::Gcp::FinalizeCreationService.new.execute(provider)
end
def request_operation(&blk)
Clusters::FetchGcpOperationService.new.execute(provider, &blk)
end
end
end
end
module Clusters
class UpdateService < BaseService
def execute(cluster)
cluster.update(params)
end
end
end
Loading
Loading
@@ -3,8 +3,10 @@ class ClusterProvisionWorker
include ClusterQueue
 
def perform(cluster_id)
Gcp::Cluster.find_by_id(cluster_id).try do |cluster|
Ci::ProvisionClusterService.new.execute(cluster)
Clusters::Cluster.find_by_id(cluster_id).try do |cluster|
cluster.gcp_provider.try do |provider|
Clusters::Gcp::ProvisionService.new.execute(provider)
end
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment