Skip to content
Snippets Groups Projects
Commit 91d15f77 authored by Kamil Trzciński's avatar Kamil Trzciński Committed by Nick Thomas
Browse files

Add internal upload to external storage

parent eacd5b7a
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -66,6 +66,15 @@ func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTrippe
 
type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
 
type RemoteObjectStore struct {
// StoreURL is the temporary URL to which upload the first found file
StoreURL string
// ObjectID is a unique identifier of object storage upload
ObjectID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL
Timeout int
}
type Response struct {
// GL_ID is an environment variable used by gitlab-shell hooks during 'git
// push' and 'git pull'
Loading
Loading
@@ -83,6 +92,9 @@ type Response struct {
// TmpPath is the path where we should store temporary files
// This is set by authorization middleware
TempPath string
// ObjectStore is provided by the GitLab Rails application
// and defines a way to store object on remote storage
ObjectStore RemoteObjectStore
// Archive is the path where the artifacts archive is stored
Archive string `json:"archive"`
// Entry is a filename inside the archive point to file that needs to be extracted
Loading
Loading
package artifacts
import (
"fmt"
"mime/multipart"
"net/http"
"os"
"time"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/client_golang/prometheus"
)
var (
DefaultObjectStoreTimeoutSeconds = 360
)
var (
objectStorageUploadRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_requests",
Help: "How many object storage requests have been processed",
},
[]string{"status"},
)
objectStorageUploadsOpen = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_object_storage_upload_open",
Help: "Describes many object storage requests are open now",
},
)
objectStorageUploadBytes = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_object_storage_upload_bytes",
Help: "How many bytes were sent to object storage",
},
)
objectStorageUploadTime = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "gitlab_workhorse_object_storage_upload_time",
Help: "How long it took to upload objects",
Buckets: objectStorageUploadTimeBuckets,
})
objectStorageUploadRequestsFileFailed = objectStorageUploadRequests.WithLabelValues("file-failed")
objectStorageUploadRequestsRequestFailed = objectStorageUploadRequests.WithLabelValues("request-failed")
objectStorageUploadRequestsInvalidStatus = objectStorageUploadRequests.WithLabelValues("invalid-status")
objectStorageUploadRequestsSucceeded = objectStorageUploadRequests.WithLabelValues("succeeded")
objectStorageUploadRequestsMultipleUploads = objectStorageUploadRequests.WithLabelValues("multiple-uploads")
objectStorageUploadTimeBuckets = []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100}
)
func init() {
prometheus.MustRegister(
objectStorageUploadRequests,
objectStorageUploadsOpen,
objectStorageUploadBytes)
}
func (a *artifactsUploadProcessor) storeFile(formName, fileName string, writer *multipart.Writer) error {
if a.ObjectStore.StoreURL == "" {
return nil
}
if a.stored {
objectStorageUploadRequestsMultipleUploads.Inc()
return nil
}
started := time.Now()
defer func() {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}()
file, err := os.Open(fileName)
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
objectStorageUploadRequestsFileFailed.Inc()
return err
}
req, err := http.NewRequest("PUT", a.ObjectStore.StoreURL, file)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT %q: %v", a.ObjectStore.StoreURL, err)
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = fi.Size()
objectStorageUploadsOpen.Inc()
defer objectStorageUploadsOpen.Dec()
timeout := DefaultObjectStoreTimeoutSeconds
if a.ObjectStore.Timeout != 0 {
timeout = a.ObjectStore.Timeout
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancelFn()
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
objectStorageUploadRequestsRequestFailed.Inc()
return fmt.Errorf("PUT request %q: %v", a.ObjectStore.StoreURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
objectStorageUploadRequestsInvalidStatus.Inc()
return fmt.Errorf("PUT request %v returned: %d %s", a.ObjectStore.StoreURL, resp.StatusCode, resp.Status)
}
writer.WriteField(formName+".store_url", a.ObjectStore.StoreURL)
writer.WriteField(formName+".object_id", a.ObjectStore.ObjectID)
objectStorageUploadRequestsSucceeded.Inc()
objectStorageUploadBytes.Add(float64(fi.Size()))
// Allow to upload only once using given credentials
a.stored = true
return nil
}
package artifacts
import (
"archive/zip"
"bytes"
"fmt"
"io/ioutil"
"mime/multipart"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
)
func createTestZipArchive(t *testing.T) []byte {
var buffer bytes.Buffer
archive := zip.NewWriter(&buffer)
fileInArchive, err := archive.Create("test.file")
require.NoError(t, err)
fmt.Fprint(fileInArchive, "test")
archive.Close()
return buffer.Bytes()
}
func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) {
var buffer bytes.Buffer
writer := multipart.NewWriter(&buffer)
file, err := writer.CreateFormFile("file", "my.file")
require.NoError(t, err)
file.Write(data)
writer.Close()
return buffer, writer.FormDataContentType()
}
func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
storeServerCalled := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "PUT", r.Method)
receivedData, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Equal(t, archiveData, receivedData)
storeServerCalled++
w.WriteHeader(200)
})
responseProcessorCalled := 0
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "store-id", r.FormValue("file.object_id"))
assert.NotEmpty(t, r.FormValue("file.store_url"))
w.WriteHeader(200)
responseProcessorCalled++
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 200)
assert.Equal(t, 1, storeServerCalled, "store should be called only once")
assert.Equal(t, 1, responseProcessorCalled, "response processor should be called only once")
}
func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: "http://localhost:12323/invalid/url",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
}
func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: "htt:////invalid-url",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
}
func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
putCalledTimes := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
putCalledTimes++
assert.Equal(t, "PUT", r.Method)
w.WriteHeader(510)
})
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempPath)
putCalledTimes := 0
storeServerMux := http.NewServeMux()
storeServerMux.HandleFunc("/url/put", func(w http.ResponseWriter, r *http.Request) {
putCalledTimes++
assert.Equal(t, "PUT", r.Method)
time.Sleep(10 * time.Second)
w.WriteHeader(510)
})
responseProcessor := func(w http.ResponseWriter, r *http.Request) {
t.Fatal("it should not be called")
}
storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close()
authResponse := api.Response{
TempPath: tempPath,
ObjectStore: api.RemoteObjectStore{
StoreURL: storeServer.URL + "/url/put",
ObjectID: "store-id",
Timeout: 1,
},
}
ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close()
archiveData := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
}
Loading
Loading
@@ -2,6 +2,7 @@ package artifacts
 
import (
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
Loading
Loading
@@ -17,7 +18,30 @@ import (
 
type artifactsUploadProcessor struct {
TempPath string
ObjectStore api.RemoteObjectStore
metadataFile string
stored bool
}
func (a *artifactsUploadProcessor) generateMetadataFromZip(fileName string, metadataFile io.Writer) (bool, error) {
// Generate metadata and save to file
zipMd := exec.Command("gitlab-zip-metadata", fileName)
zipMd.Stderr = os.Stderr
zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
zipMd.Stdout = metadataFile
if err := zipMd.Start(); err != nil {
return false, err
}
defer helper.CleanUpProcessGroup(zipMd)
if err := zipMd.Wait(); err != nil {
if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip {
return false, nil
}
return false, err
}
return true, nil
}
 
func (a *artifactsUploadProcessor) ProcessFile(formName, fileName string, writer *multipart.Writer) error {
Loading
Loading
@@ -36,28 +60,23 @@ func (a *artifactsUploadProcessor) ProcessFile(formName, fileName string, writer
return err
}
defer tempFile.Close()
a.metadataFile = tempFile.Name()
 
// Generate metadata and save to file
zipMd := exec.Command("gitlab-zip-metadata", fileName)
zipMd.Stderr = os.Stderr
zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
zipMd.Stdout = tempFile
a.metadataFile = tempFile.Name()
 
if err := zipMd.Start(); err != nil {
return err
generatedMetadata, err := a.generateMetadataFromZip(fileName, tempFile)
if err != nil {
return fmt.Errorf("generateMetadataFromZip: %v", err)
}
defer helper.CleanUpProcessGroup(zipMd)
if err := zipMd.Wait(); err != nil {
if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip {
return nil
}
return err
if generatedMetadata {
// Pass metadata file path to Rails
writer.WriteField("metadata.path", a.metadataFile)
writer.WriteField("metadata.name", "metadata.gz")
}
 
// Pass metadata file path to Rails
writer.WriteField("metadata.path", a.metadataFile)
writer.WriteField("metadata.name", "metadata.gz")
if err := a.storeFile(formName, fileName, writer); err != nil {
return fmt.Errorf("storeFile: %v", err)
}
return nil
}
 
Loading
Loading
@@ -86,7 +105,10 @@ func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
return
}
 
mg := &artifactsUploadProcessor{TempPath: a.TempPath}
mg := &artifactsUploadProcessor{
TempPath: a.TempPath,
ObjectStore: a.ObjectStore,
}
defer mg.Cleanup()
 
upload.HandleFileUploads(w, r, h, a.TempPath, mg)
Loading
Loading
Loading
Loading
@@ -22,7 +22,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts"
)
 
func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("/url/path/authorize", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Loading
Loading
@@ -31,9 +31,7 @@ func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
 
w.Header().Set("Content-Type", api.ResponseContentType)
 
data, err := json.Marshal(&api.Response{
TempPath: tempPath,
})
data, err := json.Marshal(&authResponse)
if err != nil {
t.Fatal("Expected to marshal")
}
Loading
Loading
@@ -79,7 +77,11 @@ func testArtifactsUploadServer(t *testing.T, tempPath string) *httptest.Server {
return
}
 
w.WriteHeader(200)
if bodyProcessor != nil {
bodyProcessor(w, r)
} else {
w.WriteHeader(200)
}
})
return testhelper.TestServerWithHandler(nil, mux.ServeHTTP)
}
Loading
Loading
@@ -107,7 +109,7 @@ func TestUploadHandlerAddingMetadata(t *testing.T) {
}
defer os.RemoveAll(tempPath)
 
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
 
var buffer bytes.Buffer
Loading
Loading
@@ -138,7 +140,7 @@ func TestUploadHandlerForUnsupportedArchive(t *testing.T) {
}
defer os.RemoveAll(tempPath)
 
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
 
var buffer bytes.Buffer
Loading
Loading
@@ -162,7 +164,7 @@ func TestUploadFormProcessing(t *testing.T) {
}
defer os.RemoveAll(tempPath)
 
ts := testArtifactsUploadServer(t, tempPath)
ts := testArtifactsUploadServer(t, api.Response{TempPath: tempPath}, nil)
defer ts.Close()
 
var buffer bytes.Buffer
Loading
Loading
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
func nop() {}
var (
testHookContextDoneBeforeHeaders = nop
testHookDoReturned = nop
testHookDidBodyClose = nop
)
// Do sends an HTTP request with the provided http.Client and returns an HTTP response.
// If the client is nil, http.DefaultClient is used.
// If the context is canceled or times out, ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
// TODO(djd): Respect any existing value of req.Cancel.
cancel := make(chan struct{})
req.Cancel = cancel
type responseAndError struct {
resp *http.Response
err error
}
result := make(chan responseAndError, 1)
// Make local copies of test hooks closed over by goroutines below.
// Prevents data races in tests.
testHookDoReturned := testHookDoReturned
testHookDidBodyClose := testHookDidBodyClose
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
close(cancel)
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
c := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(cancel)
case <-c:
// The response's Body is closed.
}
}()
resp.Body = &notifyingReader{resp.Body, c}
return resp, nil
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// notifyingReader is an io.ReadCloser that closes the notify channel after
// Close is called or a Read fails on the underlying ReadCloser.
type notifyingReader struct {
io.ReadCloser
notify chan<- struct{}
}
func (r *notifyingReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err != nil && r.notify != nil {
close(r.notify)
r.notify = nil
}
return n, err
}
func (r *notifyingReader) Close() error {
err := r.ReadCloser.Close()
if r.notify != nil {
close(r.notify)
r.notify = nil
}
return err
}
Loading
Loading
@@ -148,6 +148,12 @@
"path": "golang.org/x/net/context",
"revision": "f2499483f923065a842d38eb4c7f1927e6fc6e6d"
},
{
"checksumSHA1": "WHc3uByvGaMcnSoI21fhzYgbOgg=",
"path": "golang.org/x/net/context/ctxhttp",
"revision": "a6577fac2d73be281a500b310739095313165611",
"revisionTime": "2017-03-08T20:54:49Z"
},
{
"checksumSHA1": "N1akwAdrHVfPPrsFOhG2ouP21VA=",
"path": "golang.org/x/net/http2",
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment