Skip to content
Snippets Groups Projects
Commit e499c1c3 authored by Shinya Maeda's avatar Shinya Maeda
Browse files

Replace reactive_cache by multipel sidekiq workers

parent c30546f4
No related branches found
No related tags found
No related merge requests found
Showing
with 324 additions and 190 deletions
module GoogleApi
class AuthorizationsController < ApplicationController
def callback
session[GoogleApi::CloudPlatform::Client.token_in_session] =
GoogleApi::Authentication.new(nil, callback_google_api_authorizations_url)
.get_token(params[:code])
session[GoogleApi::CloudPlatform::Client.session_key_for_token] =
GoogleApi::CloudPlatform::Client.new(nil, callback_google_api_authorizations_url)
.get_token(params[:code])
 
if params[:state]
redirect_to params[:state]
Loading
Loading
class Projects::ClustersController < Projects::ApplicationController
before_action :cluster, except: [:login, :index, :new, :create]
before_action :authorize_google_api, except: [:login]
# before_action :cluster_creation_lock, only: [:update, :destroy]
# before_action :authorize_admin_clusters! # TODO: Authentication
 
def login
begin
@authorize_url = api_client.authorize_url
rescue GoogleApi::Authentication::ConfigMissingError
@authorize_url = GoogleApi::CloudPlatform::Client.new(
nil,
callback_google_api_authorizations_url,
state: namespace_project_clusters_url.to_s
).authorize_url
rescue GoogleApi::Auth::ConfigMissingError
# Show an alert message that gitlab.yml is not configured properly
end
end
Loading
Loading
@@ -20,28 +25,29 @@ class Projects::ClustersController < Projects::ApplicationController
end
 
def new
@cluster = project.clusters.new
end
 
def create
begin
Ci::CreateClusterService.new(project, current_user, params)
.create_cluster_on_gke(api_client)
rescue Ci::CreateClusterService::UnexpectedOperationError => e
# TODO: error
puts "#{self.class.name} - #{__callee__}: e: #{e}"
end
@cluster = Ci::CreateClusterService
.new(project, current_user, cluster_params)
.execute(token_in_session)
 
redirect_to project_clusters_path(project)
if @cluster.persisted?
ClusterCreationWorker.perform_async(@cluster.id)
redirect_to project_clusters_path(project)
else
render :new
end
end
 
##
# Return
# @status: The current status of the operation.
# @status_message: If an error has occurred, a textual description of the error.
def creation_status
def status
respond_to do |format|
format.json do
render json: cluster.creation_status(session[GoogleApi::CloudPlatform::Client.token_in_session])
render json: {
status: cluster.status, # The current status of the operation.
status_reason: cluster.status_reason # If an error has occurred, a textual description of the error.
}
end
end
end
Loading
Loading
@@ -50,24 +56,9 @@ class Projects::ClustersController < Projects::ApplicationController
end
 
def update
Ci::Cluster.transaction do
if params['enabled'] == 'true'
cluster.service.attributes = {
active: true,
api_url: cluster.endpoint,
ca_pem: cluster.ca_cert,
namespace: cluster.project_namespace,
token: cluster.token
}
cluster.service.save!
else
cluster.service.update(active: false)
end
cluster.update(enabled: params['enabled'])
end
Ci::UpdateClusterService
.new(project, current_user, cluster_params)
.execute(cluster)
 
render :edit
end
Loading
Loading
@@ -88,18 +79,27 @@ class Projects::ClustersController < Projects::ApplicationController
@cluster ||= project.clusters.find(params[:id])
end
 
def api_client
@api_client ||=
GoogleApi::CloudPlatform::Client.new(
session[GoogleApi::CloudPlatform::Client.token_in_session],
callback_google_api_authorizations_url,
state: namespace_project_clusters_url.to_s
)
def cluster_params
params.require(:cluster)
.permit(:gcp_project_id, :cluster_zone, :cluster_name, :cluster_size,
:machine_type, :project_namespace, :enabled)
end
 
def authorize_google_api
unless session[GoogleApi::CloudPlatform::Client.token_in_session]
unless token_in_session
redirect_to action: 'login'
end
end
def token_in_session
@token_in_session ||= session[GoogleApi::CloudPlatform::Client.session_key_for_token]
end
def cluster_creation_lock
if cluster.on_creation?
redirect_to edit_project_cluster_path(project, cluster),
status: :forbidden,
alert: _("You can not modify cluster during creation")
end
end
end
module Ci
class Cluster < ActiveRecord::Base
extend Gitlab::Ci::Model
include ReactiveCaching
self.reactive_cache_key = ->(cluster) { [cluster.class.model_name.singular, cluster.project_id, cluster.id] }
 
belongs_to :project
belongs_to :user
belongs_to :service
 
attr_encrypted :password,
mode: :per_attribute_iv_and_salt,
insecure_mode: true,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
# after_save :clear_reactive_cache!
def creation_status(access_token)
with_reactive_cache(access_token) do |operation|
{
status: operation[:status],
status_message: operation[:status_message]
}
end
end
def calculate_reactive_cache(access_token)
return { status: 'INTEGRATED' } if service # If it's already done, we don't need to continue the following process
api_client = GoogleApi::CloudPlatform::Client.new(access_token, nil)
operation = api_client.projects_zones_operations(gcp_project_id, cluster_zone, gcp_operation_id)
return { status_message: 'Failed to get a status' } unless operation
if operation.status == 'DONE'
# Get cluster details (end point, etc)
gke_cluster = api_client.projects_zones_clusters_get(
gcp_project_id, cluster_zone, cluster_name
)
return { status_message: 'Failed to get a cluster info on gke' } unless gke_cluster
# Get k8s token
token = ''
KubernetesService.new.tap do |ks|
ks.api_url = 'https://' + gke_cluster.endpoint
ks.ca_pem = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
ks.username = gke_cluster.master_auth.username
ks.password = gke_cluster.master_auth.password
secrets = ks.read_secrets
secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
token = Base64.decode64(token_base64)
break
end
end
end
return { status_message: 'Failed to get a default token on kubernetes' } unless token
# k8s endpoint, ca_cert
endpoint = 'https://' + gke_cluster.endpoint
cluster_ca_certificate = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
begin
Ci::Cluster.transaction do
# Update service
kubernetes_service.attributes = {
active: true,
api_url: endpoint,
ca_pem: cluster_ca_certificate,
namespace: project_namespace,
token: token
}
kubernetes_service.save!
# Save info in cluster record
update(
enabled: true,
service: kubernetes_service,
username: gke_cluster.master_auth.username,
password: gke_cluster.master_auth.password,
token: token,
ca_cert: cluster_ca_certificate,
endpoint: endpoint,
)
end
rescue ActiveRecord::RecordInvalid => exception
return { status_message: 'Failed to setup integration' }
end
end
operation.to_h
mode: :per_attribute_iv_and_salt,
insecure_mode: true,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
attr_encrypted :kubernetes_token,
mode: :per_attribute_iv_and_salt,
insecure_mode: true,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
attr_encrypted :gcp_token,
mode: :per_attribute_iv_and_salt,
insecure_mode: true,
key: Gitlab::Application.secrets.db_key_base,
algorithm: 'aes-256-cbc'
enum status: {
unknown: nil,
scheduled: 1,
creating: 2,
created: 3,
errored: 4
}
def error!(reason)
update!(status: statuses[:errored],
status_reason: reason,
gcp_token: nil)
end
 
def kubernetes_service
@kubernetes_service ||= project.find_or_initialize_service('kubernetes')
def on_creation?
scheduled? || creating?
end
end
end
module Ci
class CreateClusterService < BaseService
UnexpectedOperationError = Class.new(StandardError)
def create_cluster_on_gke(api_client)
# Create a cluster on GKE
operation = api_client.projects_zones_clusters_create(
params['gcp_project_id'], params['cluster_zone'], params['cluster_name'],
cluster_size: params['cluster_size'], machine_type: params['machine_type']
)
if operation&.status != ('RUNNING' || 'PENDING')
raise UnexpectedOperationError.new(operation&.status_message)
end
api_client.parse_self_link(operation.self_link).tap do |project_id, zone, operation_id|
project.clusters.create(user: current_user,
gcp_project_id: params['gcp_project_id'],
cluster_zone: params['cluster_zone'],
cluster_name: params['cluster_name'],
project_namespace: params['project_namespace'],
gcp_operation_id: operation_id).tap do |cluster|
# Start status polling. When the operation finish, create KubernetesService.
cluster.creation_status(api_client.access_token)
end
end
def execute(access_token)
project.clusters.create(
params.merge(user: current_user,
status: Ci::Cluster.statuses[:scheduled],
gcp_token: access_token))
end
end
end
module Ci
class IntegrateClusterService
def execute(cluster, endpoint, ca_cert, token, username, password)
kubernetes_service ||= cluster.project.find_or_initialize_service('kubernetes')
Ci::Cluster.transaction do
# Update service
kubernetes_service.attributes = {
active: true,
api_url: endpoint,
ca_pem: ca_cert,
namespace: cluster.project_namespace,
token: token
}
kubernetes_service.save!
# Save info in cluster record
cluster.update!(
enabled: true,
service: kubernetes_service,
username: username,
password: password,
kubernetes_token: token,
ca_cert: ca_cert,
endpoint: endpoint,
gcp_token: nil,
status: Ci::Cluster.statuses[:created]
)
end
rescue ActiveRecord::RecordInvalid => e
cluster.error!("Failed to integrate cluster into kubernetes_service: #{e.message}")
end
end
end
module Ci
class UpdateClusterService < BaseService
def execute(cluster)
Ci::Cluster.transaction do
if params['enabled'] == 'true'
cluster.service.attributes = {
active: true,
api_url: cluster.endpoint,
ca_pem: cluster.ca_cert,
namespace: cluster.project_namespace,
token: cluster.kubernetes_token
}
cluster.service.save!
else
cluster.service.update(active: false)
end
cluster.update!(enabled: params['enabled'])
end
rescue ActiveRecord::RecordInvalid => e
cluster.errors.add(:base, e.message)
end
end
end
Loading
Loading
@@ -4,7 +4,7 @@
-link_to_help_page = link_to(s_('ClusterIntegration|help page'), help_page_path('TODO'), target: '_blank', rel: 'noopener noreferrer')
= s_('ClusterIntegration|Use our %{link_to_help_page} on cluster integration.').html_safe % { link_to_help_page: link_to_help_page}
 
= form_for 'cluster' do |field|
= form_for [@project.namespace.becomes(Namespace), @project, @cluster] do |field|
.form-group
= field.label :cluster_name
= field.text_field :cluster_name, class: 'form-control'
Loading
Loading
@@ -32,4 +32,5 @@
.form-group
= field.submit s_('ClusterIntegration|Create cluster'), class: 'btn btn-save'
 
= link_to "Create on Google Container Engine", namespace_project_clusters_path(@project.namespace, @project, cluster_name: "gke-test-creation#{Random.rand(100)}", gcp_project_id: 'gitlab-internal-153318', cluster_zone: 'us-central1-a', cluster_size: '1', project_namespace: 'aaa', machine_type: '???'), method: :post
\ No newline at end of file
-# TODO: Remove before merge
= link_to "Create on Google Container Engine", namespace_project_clusters_path(@project.namespace, @project, cluster: {cluster_name: "gke-test-creation#{Random.rand(100)}", gcp_project_id: 'gitlab-internal-153318', cluster_zone: 'us-central1-a', cluster_size: '1', project_namespace: 'aaa', machine_type: '???'}), method: :post
\ No newline at end of file
Loading
Loading
@@ -2,7 +2,15 @@ edit/show cluster
%br
= @cluster.inspect
= @cluster.service.inspect
= link_to "Enable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'true'), method: :put
= link_to "Disable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, enabled: 'false'), method: :put
= link_to "Soft-delete the cluster", namespace_project_cluster_path(@project.namespace, @project, @cluster.id), method: :delete
= link_to 'Check status', creation_status_namespace_project_cluster_path(@cluster.project.namespace, @cluster.project, @cluster.id), :remote => true
- unless @cluster.on_creation?
= link_to "Enable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, cluster: {enabled: 'true'}), method: :put
= link_to "Disable", namespace_project_cluster_path(@project.namespace, @project, @cluster.id, cluster: {enabled: 'false'}), method: :put
= link_to "Soft-delete the cluster", namespace_project_cluster_path(@project.namespace, @project, @cluster.id), method: :delete
-# status GET
-# status: The current status of the operation.
-# status_reason: If an error has occurred, a textual description of the error.
= link_to 'Check status', status_namespace_project_cluster_path(@cluster.project.namespace, @cluster.project, @cluster.id), :remote => true
-# simply rendering error, if it happened
- if @cluster.status_reason
%p= status_reason
-# Even if we got an error during the creation process, we don't delete cluster objects automatically, because we don't have a method to delete the cluster on gke. So users move to edit page from new page **regardless of validation errors**, and they have to delete the record manually. This is for giving users headsup that a new cluster might remain in gke. /cc @ayufan
class ClusterCreationWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(cluster_id)
cluster = Ci::Cluster.find_by_id(cluster_id)
unless cluster
return Rails.logger.error "Cluster object is not found; #{cluster_id}"
end
api_client =
GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
operation = api_client.projects_zones_clusters_create(
cluster.gcp_project_id,
cluster.cluster_zone,
cluster.cluster_name,
cluster.cluster_size,
machine_type: cluster.machine_type
)
if operation.is_a?(StandardError)
return cluster.error!("Failed to request to CloudPlatform; #{operation.message}")
end
unless operation.status == 'RUNNING' || operation.status == 'PENDING'
return cluster.error!("Operation status is unexpected; #{operation.status_message}")
end
operation_id = api_client.parse_operation_id(operation.self_link)
unless operation_id
return cluster.error!('Can not find operation_id from self_link')
end
if cluster.update(status: Ci::Cluster.statuses[:creating],
gcp_operation_id: operation_id)
WaitForClusterCreationWorker.perform_in(
WaitForClusterCreationWorker::INITIAL_INTERVAL,
cluster.id
)
else
return cluster.error!("Failed to update cluster record; #{cluster.errors}")
end
end
end
class WaitForClusterCreationWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
INITIAL_INTERVAL = 2.minutes
EAGER_INTERVAL = 10.seconds
TIMEOUT = 20.minutes
def perform(cluster_id)
cluster = Ci::Cluster.find_by_id(cluster_id)
unless cluster
return Rails.logger.error "Cluster object is not found; #{cluster_id}"
end
api_client =
GoogleApi::CloudPlatform::Client.new(cluster.gcp_token, nil)
operation = api_client.projects_zones_operations(
cluster.gcp_project_id,
cluster.cluster_zone,
cluster.gcp_operation_id
)
if operation.is_a?(StandardError)
return cluster.error!("Failed to request to CloudPlatform; #{operation.message}")
end
case operation.status
when 'DONE'
integrate(api_client, cluster)
when 'RUNNING'
if Time.now < operation.start_time.to_time + TIMEOUT
WaitForClusterCreationWorker.perform_in(EAGER_INTERVAL, cluster.id)
else
return cluster.error!("Cluster creation time exceeds timeout; #{TIMEOUT}")
end
else
return cluster.error!("Unexpected operation status; #{operation.status_message}")
end
end
def integrate(api_client, cluster)
# Get cluster details (end point, etc)
gke_cluster = api_client.projects_zones_clusters_get(
cluster.gcp_project_id,
cluster.cluster_zone,
cluster.cluster_name
)
if gke_cluster.is_a?(StandardError)
return cluster.error!("Failed to request to CloudPlatform; #{gke_cluster.message}")
end
# Get k8s token
kubernetes_token = ''
KubernetesService.new.tap do |ks|
ks.api_url = 'https://' + gke_cluster.endpoint
ks.ca_pem = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
ks.username = gke_cluster.master_auth.username
ks.password = gke_cluster.master_auth.password
secrets = ks.read_secrets
secrets.each do |secret|
name = secret.dig('metadata', 'name')
if /default-token/ =~ name
token_base64 = secret.dig('data', 'token')
kubernetes_token = Base64.decode64(token_base64)
break
end
end
end
unless kubernetes_token
return cluster.error!('Failed to get a default token of kubernetes')
end
# k8s endpoint, ca_cert
endpoint = 'https://' + gke_cluster.endpoint
ca_cert = Base64.decode64(gke_cluster.master_auth.cluster_ca_certificate)
username = gke_cluster.master_auth.username
password = gke_cluster.master_auth.password
Ci::IntegrateClusterService.new.execute(cluster, endpoint, ca_cert, kubernetes_token, username, password)
end
end
Loading
Loading
@@ -189,7 +189,7 @@ constraints(ProjectUrlConstrainer.new) do
end
 
member do
get :creation_status, format: :json
get :status, format: :json
end
end
 
Loading
Loading
Loading
Loading
@@ -62,3 +62,5 @@
- [update_user_activity, 1]
- [propagate_service_template, 1]
- [background_migration, 1]
- [cluster_creation, 1]
- [wait_for_cluster_creation, 1]
Loading
Loading
@@ -9,6 +9,8 @@ class CreateCiClusters < ActiveRecord::Migration
 
# General
t.boolean :enabled, default: true
t.integer :status
t.string :status_reason
 
# k8s integration specific
t.string :project_namespace
Loading
Loading
@@ -16,9 +18,10 @@ class CreateCiClusters < ActiveRecord::Migration
# Cluster details
t.string :endpoint
t.text :ca_cert
t.string :token
t.string :encrypted_kubernetes_token
t.string :encrypted_kubernetes_token_salt
t.string :encrypted_kubernetes_token_iv
t.string :username
t.string :password
t.string :encrypted_password
t.string :encrypted_password_salt
t.string :encrypted_password_iv
Loading
Loading
@@ -27,7 +30,12 @@ class CreateCiClusters < ActiveRecord::Migration
t.string :gcp_project_id
t.string :cluster_zone
t.string :cluster_name
t.string :cluster_size
t.string :machine_type
t.string :gcp_operation_id
t.string :encrypted_gcp_token
t.string :encrypted_gcp_token_salt
t.string :encrypted_gcp_token_iv
 
t.datetime_with_timezone :created_at, null: false
t.datetime_with_timezone :updated_at, null: false
Loading
Loading
Loading
Loading
@@ -272,19 +272,27 @@ ActiveRecord::Schema.define(version: 20170924094327) do
t.integer "user_id", null: false
t.integer "service_id"
t.boolean "enabled", default: true
t.integer "status"
t.string "status_reason"
t.string "project_namespace"
t.string "endpoint"
t.text "ca_cert"
t.string "token"
t.string "encrypted_kubernetes_token"
t.string "encrypted_kubernetes_token_salt"
t.string "encrypted_kubernetes_token_iv"
t.string "username"
t.string "password"
t.string "encrypted_password"
t.string "encrypted_password_salt"
t.string "encrypted_password_iv"
t.string "gcp_project_id"
t.string "cluster_zone"
t.string "cluster_name"
t.string "cluster_size"
t.string "machine_type"
t.string "gcp_operation_id"
t.string "encrypted_gcp_token"
t.string "encrypted_gcp_token_salt"
t.string "encrypted_gcp_token_iv"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end
Loading
Loading
module GoogleApi
class Authentication
class Auth
attr_reader :access_token, :redirect_uri, :state
 
ConfigMissingError = Class.new(StandardError)
Loading
Loading
Loading
Loading
@@ -2,9 +2,9 @@ require 'google/apis/container_v1'
 
module GoogleApi
module CloudPlatform
class Client < GoogleApi::Authentication
class Client < GoogleApi::Auth
class << self
def token_in_session
def session_key_for_token
:cloud_platform_access_token
end
end
Loading
Loading
@@ -13,20 +13,14 @@ module GoogleApi
'https://www.googleapis.com/auth/cloud-platform'
end
 
##
# Exception
# Google::Apis::ClientError:
# Google::Apis::AuthorizationError:
##
def projects_zones_clusters_get(project_id, zone, cluster_id)
service = Google::Apis::ContainerV1::ContainerService.new
service.authorization = access_token
 
begin
cluster = service.get_zone_cluster(project_id, zone, cluster_id)
rescue Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return nil
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return e
end
 
puts "#{self.class.name} - #{__callee__}: cluster: #{cluster.inspect}"
Loading
Loading
@@ -35,7 +29,7 @@ module GoogleApi
 
# Responce exmaple
# TODO: machine_type : Defailt 3.75 GB
def projects_zones_clusters_create(project_id, zone, cluster_name, cluster_size:, machine_type:)
def projects_zones_clusters_create(project_id, zone, cluster_name, cluster_size, machine_type:)
service = Google::Apis::ContainerV1::ContainerService.new
service.authorization = access_token
 
Loading
Loading
@@ -50,8 +44,8 @@ module GoogleApi
 
begin
operation = service.create_cluster(project_id, zone, request_body)
rescue Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return nil
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return e
end
 
puts "#{self.class.name} - #{__callee__}: operation: #{operation.inspect}"
Loading
Loading
@@ -65,17 +59,15 @@ module GoogleApi
begin
operation = service.get_zone_operation(project_id, zone, operation_id)
rescue Google::Apis::ClientError, Google::Apis::AuthorizationError => e
return nil
return e
end
 
puts "#{self.class.name} - #{__callee__}: operation: #{operation.inspect}"
operation
end
 
def parse_self_link(self_link)
ret = self_link.match(/projects\/(.*)\/zones\/(.*)\/operations\/(.*)/)
return ret[1], ret[2], ret[3] # project_id, zone, operation_id
def parse_operation_id(self_link)
self_link.match(/projects\/.*\/zones\/.*\/operations\/(.*)/)[1]
end
end
end
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment