Skip to content
Snippets Groups Projects
Commit 71976ff2 authored by Patrick Bajao's avatar Patrick Bajao
Browse files

Merge branch 'jv-workhorse-mime-lazy' into 'master'

Workhorse: prepare for lazy upload preauth

See merge request gitlab-org/gitlab!87800
parents 3e14a276 fdc6f90b
No related branches found
No related tags found
No related merge requests found
Showing
with 139 additions and 189 deletions
Loading
@@ -43,16 +43,12 @@ type artifactsUploadProcessor struct {
Loading
@@ -43,16 +43,12 @@ type artifactsUploadProcessor struct {
// Artifacts is like a Multipart but specific for artifacts upload. // Artifacts is like a Multipart but specific for artifacts upload.
func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler { func Artifacts(myAPI *api.API, h http.Handler, p Preparer) http.Handler {
return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
opts, err := p.Prepare(a)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("UploadArtifacts: error preparing file storage options"))
return
}
format := r.URL.Query().Get(ArtifactFormatKey) format := r.URL.Query().Get(ArtifactFormatKey)
mg := &artifactsUploadProcessor{
mg := &artifactsUploadProcessor{format: format, SavedFileTracker: SavedFileTracker{Request: r}} format: format,
interceptMultipartFiles(w, r, h, a, mg, opts) SavedFileTracker: SavedFileTracker{Request: r},
}
interceptMultipartFiles(w, r, h, mg, &eagerAuthorizer{a}, p)
}, "/authorize") }, "/authorize")
} }
   
Loading
@@ -61,8 +57,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
Loading
@@ -61,8 +57,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
defer metaWriter.Close() defer metaWriter.Close()
   
metaOpts := &destination.UploadOpts{ metaOpts := &destination.UploadOpts{
LocalTempPath: os.TempDir(), LocalTempPath: os.TempDir(),
TempFilePrefix: "metadata.gz",
} }
   
fileName := file.LocalPath fileName := file.LocalPath
Loading
@@ -87,7 +82,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
Loading
@@ -87,7 +82,7 @@ func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context,
done := make(chan saveResult) done := make(chan saveResult)
go func() { go func() {
var result saveResult var result saveResult
result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, metaOpts) result.FileHandler, result.error = destination.Upload(ctx, metaReader, -1, "metadata.gz", metaOpts)
   
done <- result done <- result
}() }()
Loading
Loading
Loading
@@ -23,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
Loading
@@ -23,7 +23,7 @@ func RequestBody(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return return
} }
   
fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, opts) fh, err := destination.Upload(r.Context(), r.Body, r.ContentLength, "upload", opts)
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err)) helper.Fail500(w, r, fmt.Errorf("RequestBody: upload failed: %v", err))
return return
Loading
Loading
Loading
@@ -113,9 +113,9 @@ type consumer interface {
Loading
@@ -113,9 +113,9 @@ type consumer interface {
   
// Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done // Upload persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails. // Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts) (*FileHandler, error) { func Upload(ctx context.Context, reader io.Reader, size int64, name string, opts *UploadOpts) (*FileHandler, error) {
fh := &FileHandler{ fh := &FileHandler{
Name: opts.TempFilePrefix, Name: name,
RemoteID: opts.RemoteID, RemoteID: opts.RemoteID,
RemoteURL: opts.RemoteURL, RemoteURL: opts.RemoteURL,
} }
Loading
@@ -199,13 +199,13 @@ func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts)
Loading
@@ -199,13 +199,13 @@ func Upload(ctx context.Context, reader io.Reader, size int64, opts *UploadOpts)
} }
   
logger := log.WithContextFields(ctx, log.Fields{ logger := log.WithContextFields(ctx, log.Fields{
"copied_bytes": fh.Size, "copied_bytes": fh.Size,
"is_local": opts.IsLocalTempFile(), "is_local": opts.IsLocalTempFile(),
"is_multipart": opts.IsMultipart(), "is_multipart": opts.IsMultipart(),
"is_remote": !opts.IsLocalTempFile(), "is_remote": !opts.IsLocalTempFile(),
"remote_id": opts.RemoteID, "remote_id": opts.RemoteID,
"temp_file_prefix": opts.TempFilePrefix, "client_mode": clientMode,
"client_mode": clientMode, "filename": fh.Name,
}) })
   
if opts.IsLocalTempFile() { if opts.IsLocalTempFile() {
Loading
@@ -226,7 +226,7 @@ func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (cons
Loading
@@ -226,7 +226,7 @@ func (fh *FileHandler) newLocalFile(ctx context.Context, opts *UploadOpts) (cons
return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err) return nil, fmt.Errorf("newLocalFile: mkdir %q: %v", opts.LocalTempPath, err)
} }
   
file, err := ioutil.TempFile(opts.LocalTempPath, opts.TempFilePrefix) file, err := ioutil.TempFile(opts.LocalTempPath, "gitlab-workhorse-upload")
if err != nil { if err != nil {
return nil, fmt.Errorf("newLocalFile: create file: %v", err) return nil, fmt.Errorf("newLocalFile: create file: %v", err)
} }
Loading
Loading
Loading
@@ -47,8 +47,8 @@ func TestUploadWrongSize(t *testing.T) {
Loading
@@ -47,8 +47,8 @@ func TestUploadWrongSize(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpFolder)
   
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file"} opts := &destination.UploadOpts{LocalTempPath: tmpFolder}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize+1, "upload", opts)
require.Error(t, err) require.Error(t, err)
_, isSizeError := err.(destination.SizeError) _, isSizeError := err.(destination.SizeError)
require.True(t, isSizeError, "Should fail with SizeError") require.True(t, isSizeError, "Should fail with SizeError")
Loading
@@ -63,8 +63,8 @@ func TestUploadWithKnownSizeExceedLimit(t *testing.T) {
Loading
@@ -63,8 +63,8 @@ func TestUploadWithKnownSizeExceedLimit(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpFolder)
   
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
require.Error(t, err) require.Error(t, err)
_, isSizeError := err.(destination.SizeError) _, isSizeError := err.(destination.SizeError)
require.True(t, isSizeError, "Should fail with SizeError") require.True(t, isSizeError, "Should fail with SizeError")
Loading
@@ -79,8 +79,8 @@ func TestUploadWithUnknownSizeExceedLimit(t *testing.T) {
Loading
@@ -79,8 +79,8 @@ func TestUploadWithUnknownSizeExceedLimit(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpFolder) defer os.RemoveAll(tmpFolder)
   
opts := &destination.UploadOpts{LocalTempPath: tmpFolder, TempFilePrefix: "test-file", MaximumSize: test.ObjectSize - 1} opts := &destination.UploadOpts{LocalTempPath: tmpFolder, MaximumSize: test.ObjectSize - 1}
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), -1, "upload", opts)
require.Equal(t, err, destination.ErrEntityTooLarge) require.Equal(t, err, destination.ErrEntityTooLarge)
require.Nil(t, fh) require.Nil(t, fh)
} }
Loading
@@ -117,7 +117,7 @@ func TestUploadWrongETag(t *testing.T) {
Loading
@@ -117,7 +117,7 @@ func TestUploadWrongETag(t *testing.T) {
osStub.InitiateMultipartUpload(test.ObjectPath) osStub.InitiateMultipartUpload(test.ObjectPath)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", opts)
require.Nil(t, fh) require.Nil(t, fh)
require.Error(t, err) require.Error(t, err)
require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded") require.Equal(t, 1, osStub.PutsCnt(), "File not uploaded")
Loading
@@ -191,13 +191,12 @@ func TestUpload(t *testing.T) {
Loading
@@ -191,13 +191,12 @@ func TestUpload(t *testing.T) {
   
if spec.local { if spec.local {
opts.LocalTempPath = tmpFolder opts.LocalTempPath = tmpFolder
opts.TempFilePrefix = "test-file"
} }
   
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
   
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, fh) require.NotNil(t, fh)
   
Loading
@@ -211,9 +210,6 @@ func TestUpload(t *testing.T) {
Loading
@@ -211,9 +210,6 @@ func TestUpload(t *testing.T) {
   
dir := path.Dir(fh.LocalPath) dir := path.Dir(fh.LocalPath)
require.Equal(t, opts.LocalTempPath, dir) require.Equal(t, opts.LocalTempPath, dir)
filename := path.Base(fh.LocalPath)
beginsWithPrefix := strings.HasPrefix(filename, opts.TempFilePrefix)
require.True(t, beginsWithPrefix, fmt.Sprintf("LocalPath filename %q do not begin with TempFilePrefix %q", filename, opts.TempFilePrefix))
} else { } else {
require.Empty(t, fh.LocalPath, "LocalPath must be empty for non local uploads") require.Empty(t, fh.LocalPath, "LocalPath must be empty for non local uploads")
} }
Loading
@@ -291,7 +287,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) {
Loading
@@ -291,7 +287,7 @@ func TestUploadWithS3WorkhorseClient(t *testing.T) {
MaximumSize: tc.maxSize, MaximumSize: tc.maxSize,
} }
   
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, &opts) _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), tc.objectSize, "upload", &opts)
   
if tc.expectedErr == nil { if tc.expectedErr == nil {
require.NoError(t, err) require.NoError(t, err)
Loading
@@ -324,7 +320,7 @@ func TestUploadWithAzureWorkhorseClient(t *testing.T) {
Loading
@@ -324,7 +320,7 @@ func TestUploadWithAzureWorkhorseClient(t *testing.T) {
}, },
} }
   
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.NoError(t, err) require.NoError(t, err)
   
test.GoCloudObjectExists(t, bucketDir, remoteObject) test.GoCloudObjectExists(t, bucketDir, remoteObject)
Loading
@@ -349,7 +345,7 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
Loading
@@ -349,7 +345,7 @@ func TestUploadWithUnknownGoCloudScheme(t *testing.T) {
}, },
} }
   
_, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) _, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.Error(t, err) require.Error(t, err)
} }
   
Loading
@@ -375,7 +371,7 @@ func TestUploadMultipartInBodyFailure(t *testing.T) {
Loading
@@ -375,7 +371,7 @@ func TestUploadMultipartInBodyFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
   
fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, &opts) fh, err := destination.Upload(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, "upload", &opts)
require.Nil(t, fh) require.Nil(t, fh)
require.Error(t, err) require.Error(t, err)
require.EqualError(t, err, test.MultipartUploadInternalError().Error()) require.EqualError(t, err, test.MultipartUploadInternalError().Error())
Loading
@@ -468,7 +464,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) {
Loading
@@ -468,7 +464,7 @@ func TestUploadRemoteFileWithLimit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
   
fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, &opts) fh, err := destination.Upload(ctx, strings.NewReader(tc.testData), tc.objectSize, "upload", &opts)
   
if tc.expectedErr == nil { if tc.expectedErr == nil {
require.NoError(t, err) require.NoError(t, err)
Loading
Loading
Loading
@@ -22,7 +22,8 @@ type partsEtagMap map[int]string
Loading
@@ -22,7 +22,8 @@ type partsEtagMap map[int]string
// Instead of storing objects it will just save md5sum. // Instead of storing objects it will just save md5sum.
type ObjectstoreStub struct { type ObjectstoreStub struct {
// bucket contains md5sum of uploaded objects // bucket contains md5sum of uploaded objects
bucket map[string]string bucket map[string]string
contents map[string][]byte
// overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash // overwriteMD5 contains overwrites for md5sum that should be return instead of the regular hash
overwriteMD5 map[string]string overwriteMD5 map[string]string
// multipart is a map of MultipartUploads // multipart is a map of MultipartUploads
Loading
@@ -48,6 +49,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu
Loading
@@ -48,6 +49,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu
multipart: make(map[string]partsEtagMap), multipart: make(map[string]partsEtagMap),
overwriteMD5: make(map[string]string), overwriteMD5: make(map[string]string),
headers: make(map[string]*http.Header), headers: make(map[string]*http.Header),
contents: make(map[string][]byte),
} }
   
for k, v := range md5Hashes { for k, v := range md5Hashes {
Loading
@@ -82,6 +84,15 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string {
Loading
@@ -82,6 +84,15 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string {
return o.bucket[path] return o.bucket[path]
} }
   
// GetObject returns the contents of the uploaded object. The caller must
// not modify the byte slice.
func (o *ObjectstoreStub) GetObject(path string) []byte {
o.m.Lock()
defer o.m.Unlock()
return o.contents[path]
}
// GetHeader returns a given HTTP header of the object uploaded to the path // GetHeader returns a given HTTP header of the object uploaded to the path
func (o *ObjectstoreStub) GetHeader(path, key string) string { func (o *ObjectstoreStub) GetHeader(path, key string) string {
o.m.Lock() o.m.Lock()
Loading
@@ -154,11 +165,11 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
Loading
@@ -154,11 +165,11 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
   
etag, overwritten := o.overwriteMD5[objectPath] etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten { if !overwritten {
buf, _ := io.ReadAll(r.Body)
o.contents[objectPath] = buf
hasher := md5.New() hasher := md5.New()
io.Copy(hasher, r.Body) hasher.Write(buf)
etag = hex.EncodeToString(hasher.Sum(nil))
checksum := hasher.Sum(nil)
etag = hex.EncodeToString(checksum)
} }
   
o.headers[objectPath] = &r.Header o.headers[objectPath] = &r.Header
Loading
Loading
Loading
@@ -29,8 +29,6 @@ type ObjectStorageConfig struct {
Loading
@@ -29,8 +29,6 @@ type ObjectStorageConfig struct {
   
// UploadOpts represents all the options available for saving a file to object store // UploadOpts represents all the options available for saving a file to object store
type UploadOpts struct { type UploadOpts struct {
// TempFilePrefix is the prefix used to create temporary local file
TempFilePrefix string
// LocalTempPath is the directory where to write a local copy of the file // LocalTempPath is the directory where to write a local copy of the file
LocalTempPath string LocalTempPath string
// RemoteID is the remote ObjectID provided by GitLab // RemoteID is the remote ObjectID provided by GitLab
Loading
Loading
package upload package upload
   
import ( import (
"fmt"
"net/http" "net/http"
   
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
) )
   
// Multipart is a request middleware. If the request has a MIME multipart // Multipart is a request middleware. If the request has a MIME multipart
Loading
@@ -17,12 +15,19 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
Loading
@@ -17,12 +15,19 @@ func Multipart(rails PreAuthorizer, h http.Handler, p Preparer) http.Handler {
return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { return rails.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
s := &SavedFileTracker{Request: r} s := &SavedFileTracker{Request: r}
   
opts, err := p.Prepare(a) interceptMultipartFiles(w, r, h, s, &eagerAuthorizer{a}, p)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("Multipart: error preparing file storage options"))
return
}
interceptMultipartFiles(w, r, h, a, s, opts)
}, "/authorize") }, "/authorize")
} }
// SkipRailsPreAuthMultipart behaves like Multipart except it does not
// pre-authorize with Rails. It is intended for use on catch-all routes
// where we cannot pre-authorize both because we don't know which Rails
// endpoint to call, and because eagerly pre-authorizing would add too
// much overhead.
func SkipRailsPreAuthMultipart(tempPath string, h http.Handler, p Preparer) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s := &SavedFileTracker{Request: r}
fa := &eagerAuthorizer{&api.Response{TempPath: tempPath}}
interceptMultipartFiles(w, r, h, s, fa, p)
})
}
Loading
@@ -62,13 +62,14 @@ var (
Loading
@@ -62,13 +62,14 @@ var (
) )
   
type rewriter struct { type rewriter struct {
writer *multipart.Writer writer *multipart.Writer
preauth *api.Response fileAuthorizer
Preparer
filter MultipartFormProcessor filter MultipartFormProcessor
finalizedFields map[string]bool finalizedFields map[string]bool
} }
   
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) error { func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, filter MultipartFormProcessor, fa fileAuthorizer, preparer Preparer) error {
// Create multipart reader // Create multipart reader
reader, err := r.MultipartReader() reader, err := r.MultipartReader()
if err != nil { if err != nil {
Loading
@@ -83,7 +84,8 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
Loading
@@ -83,7 +84,8 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
   
rew := &rewriter{ rew := &rewriter{
writer: writer, writer: writer,
preauth: preauth, fileAuthorizer: fa,
Preparer: preparer,
filter: filter, filter: filter,
finalizedFields: make(map[string]bool), finalizedFields: make(map[string]bool),
} }
Loading
@@ -108,7 +110,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
Loading
@@ -108,7 +110,7 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, pr
} }
   
if filename != "" { if filename != "" {
err = rew.handleFilePart(r.Context(), name, p, opts) err = rew.handleFilePart(r, name, p)
} else { } else {
err = rew.copyPart(r.Context(), name, p) err = rew.copyPart(r.Context(), name, p)
} }
Loading
@@ -128,7 +130,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s
Loading
@@ -128,7 +130,7 @@ func parseAndNormalizeContentDisposition(header textproto.MIMEHeader) (string, s
return params["name"], params["filename"] return params["name"], params["filename"]
} }
   
func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipart.Part, opts *destination.UploadOpts) error { func (rew *rewriter) handleFilePart(r *http.Request, name string, p *multipart.Part) error {
if rew.filter.Count() >= maxFilesAllowed { if rew.filter.Count() >= maxFilesAllowed {
return ErrTooManyFilesUploaded return ErrTooManyFilesUploaded
} }
Loading
@@ -141,30 +143,34 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
Loading
@@ -141,30 +143,34 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
return fmt.Errorf("illegal filename: %q", filename) return fmt.Errorf("illegal filename: %q", filename)
} }
   
opts.TempFilePrefix = filename apiResponse, err := rew.AuthorizeFile(r)
if err != nil {
return err
}
opts, err := rew.Prepare(apiResponse)
if err != nil {
return err
}
   
var inputReader io.ReadCloser var inputReader io.ReadCloser
var err error ctx := r.Context()
if imageType := exif.FileTypeFromSuffix(filename); imageType != exif.TypeUnknown {
imageType := exif.FileTypeFromSuffix(filename)
switch {
case imageType != exif.TypeUnknown:
inputReader, err = handleExifUpload(ctx, p, filename, imageType) inputReader, err = handleExifUpload(ctx, p, filename, imageType)
if err != nil { if err != nil {
return err return err
} }
case rew.preauth.ProcessLsif: } else if apiResponse.ProcessLsif {
inputReader, err = handleLsifUpload(ctx, p, opts.LocalTempPath, filename, rew.preauth) inputReader, err = handleLsifUpload(ctx, p, opts.LocalTempPath, filename)
if err != nil { if err != nil {
return err return err
} }
default: } else {
inputReader = ioutil.NopCloser(p) inputReader = ioutil.NopCloser(p)
} }
   
defer inputReader.Close() defer inputReader.Close()
   
fh, err := destination.Upload(ctx, inputReader, -1, opts) fh, err := destination.Upload(ctx, inputReader, -1, filename, opts)
if err != nil { if err != nil {
switch err { switch err {
case destination.ErrEntityTooLarge, exif.ErrRemovingExif: case destination.ErrEntityTooLarge, exif.ErrRemovingExif:
Loading
@@ -267,7 +273,7 @@ func isJPEG(r io.Reader) bool {
Loading
@@ -267,7 +273,7 @@ func isJPEG(r io.Reader) bool {
return http.DetectContentType(buf) == "image/jpeg" return http.DetectContentType(buf) == "image/jpeg"
} }
   
func handleLsifUpload(ctx context.Context, reader io.Reader, tempPath, filename string, preauth *api.Response) (io.ReadCloser, error) { func handleLsifUpload(ctx context.Context, reader io.Reader, tempPath, filename string) (io.ReadCloser, error) {
parserConfig := parser.Config{ parserConfig := parser.Config{
TempPath: tempPath, TempPath: tempPath,
} }
Loading
@@ -291,3 +297,15 @@ func (rew *rewriter) copyPart(ctx context.Context, name string, p *multipart.Par
Loading
@@ -291,3 +297,15 @@ func (rew *rewriter) copyPart(ctx context.Context, name string, p *multipart.Par
   
return nil return nil
} }
type fileAuthorizer interface {
AuthorizeFile(*http.Request) (*api.Response, error)
}
type eagerAuthorizer struct{ response *api.Response }
func (ea *eagerAuthorizer) AuthorizeFile(r *http.Request) (*api.Response, error) {
return ea.response, nil
}
var _ fileAuthorizer = &eagerAuthorizer{}
package upload
import (
"net/http"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
)
// SkipRailsAuthorizer implements a fake PreAuthorizer that does not call
// the gitlab-rails API. It must be fast because it gets called on each
// request proxied to Rails.
type SkipRailsAuthorizer struct {
// TempPath is a directory where workhorse can store files that can later
// be accessed by gitlab-rails.
TempPath string
}
func (l *SkipRailsAuthorizer) PreAuthorizeHandler(next api.HandleFunc, _ string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next(w, r, &api.Response{TempPath: l.TempPath})
})
}
Loading
@@ -40,13 +40,13 @@ type MultipartFormProcessor interface {
Loading
@@ -40,13 +40,13 @@ type MultipartFormProcessor interface {
   
// interceptMultipartFiles is the core of the implementation of // interceptMultipartFiles is the core of the implementation of
// Multipart. // Multipart.
func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor, opts *destination.UploadOpts) { func interceptMultipartFiles(w http.ResponseWriter, r *http.Request, h http.Handler, filter MultipartFormProcessor, fa fileAuthorizer, p Preparer) {
var body bytes.Buffer var body bytes.Buffer
writer := multipart.NewWriter(&body) writer := multipart.NewWriter(&body)
defer writer.Close() defer writer.Close()
   
// Rewrite multipart form data // Rewrite multipart form data
err := rewriteFormFilesFromMultipart(r, writer, preauth, filter, opts) err := rewriteFormFilesFromMultipart(r, writer, filter, fa, p)
if err != nil { if err != nil {
switch err { switch err {
case ErrInjectedClientParam: case ErrInjectedClientParam:
Loading
Loading
Loading
@@ -12,6 +12,7 @@ import (
Loading
@@ -12,6 +12,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/textproto" "net/textproto"
"os" "os"
"path"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
Loading
@@ -66,19 +67,14 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
Loading
@@ -66,19 +67,14 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
httpRequest, err := http.NewRequest("PATCH", ts.URL+"/url/path", bytes.NewBufferString("REQUEST")) httpRequest, err := http.NewRequest("PATCH", ts.URL+"/url/path", bytes.NewBufferString("REQUEST"))
require.NoError(t, err) require.NoError(t, err)
   
tempPath, err := ioutil.TempDir("", "uploads") tempPath := t.TempDir()
require.NoError(t, err)
defer os.RemoveAll(tempPath)
response := httptest.NewRecorder() response := httptest.NewRecorder()
   
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath} fa := &eagerAuthorizer{&api.Response{TempPath: tempPath}}
preparer := &DefaultPreparer{} preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, handler, apiResponse, nil, opts) interceptMultipartFiles(response, httpRequest, handler, nil, fa, preparer)
   
require.Equal(t, 202, response.Code) require.Equal(t, 202, response.Code)
require.Equal(t, "RESPONSE", response.Body.String(), "response body") require.Equal(t, "RESPONSE", response.Body.String(), "response body")
Loading
@@ -86,10 +82,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
Loading
@@ -86,10 +82,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
   
func TestUploadHandlerRewritingMultiPartData(t *testing.T) { func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
var filePath string var filePath string
tempPath := t.TempDir()
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
   
ts := testhelper.TestServerWithHandler(regexp.MustCompile(`/url/path\z`), func(w http.ResponseWriter, r *http.Request) { ts := testhelper.TestServerWithHandler(regexp.MustCompile(`/url/path\z`), func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "PUT", r.Method, "method") require.Equal(t, "PUT", r.Method, "method")
Loading
@@ -144,12 +137,10 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
Loading
@@ -144,12 +137,10 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
   
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
   
apiResponse := &api.Response{TempPath: tempPath} fa := &eagerAuthorizer{&api.Response{TempPath: tempPath}}
preparer := &DefaultPreparer{} preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) interceptMultipartFiles(response, httpRequest, handler, &testFormProcessor{}, fa, preparer)
require.Equal(t, 202, response.Code) require.Equal(t, 202, response.Code)
   
cancel() // this will trigger an async cleanup cancel() // this will trigger an async cleanup
Loading
@@ -159,10 +150,6 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
Loading
@@ -159,10 +150,6 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) { func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
var filePath string var filePath string
   
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
tests := []struct { tests := []struct {
name string name string
field string field string
Loading
@@ -213,12 +200,8 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
Loading
@@ -213,12 +200,8 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
response := httptest.NewRecorder() response := httptest.NewRecorder()
   
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) testInterceptMultipartFiles(t, response, httpRequest, handler, &testFormProcessor{})
require.Equal(t, test.response, response.Code) require.Equal(t, test.response, response.Code)
   
cancel() // this will trigger an async cleanup cancel() // this will trigger an async cleanup
Loading
@@ -228,10 +211,6 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
Loading
@@ -228,10 +211,6 @@ func TestUploadHandlerDetectingInjectedMultiPartData(t *testing.T) {
} }
   
func TestUploadProcessingField(t *testing.T) { func TestUploadProcessingField(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
var buffer bytes.Buffer var buffer bytes.Buffer
   
writer := multipart.NewWriter(&buffer) writer := multipart.NewWriter(&buffer)
Loading
@@ -243,12 +222,8 @@ func TestUploadProcessingField(t *testing.T) {
Loading
@@ -243,12 +222,8 @@ func TestUploadProcessingField(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
   
response := httptest.NewRecorder() response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) testInterceptMultipartFiles(t, response, httpRequest, nilHandler, &testFormProcessor{})
   
require.Equal(t, 500, response.Code) require.Equal(t, 500, response.Code)
} }
Loading
@@ -256,15 +231,11 @@ func TestUploadProcessingField(t *testing.T) {
Loading
@@ -256,15 +231,11 @@ func TestUploadProcessingField(t *testing.T) {
func TestUploadingMultipleFiles(t *testing.T) { func TestUploadingMultipleFiles(t *testing.T) {
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
   
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
var buffer bytes.Buffer var buffer bytes.Buffer
   
writer := multipart.NewWriter(&buffer) writer := multipart.NewWriter(&buffer)
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
_, err = writer.CreateFormFile(fmt.Sprintf("file %v", i), "my.file") _, err := writer.CreateFormFile(fmt.Sprintf("file %v", i), "my.file")
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, writer.Close()) require.NoError(t, writer.Close())
Loading
@@ -274,23 +245,18 @@ func TestUploadingMultipleFiles(t *testing.T) {
Loading
@@ -274,23 +245,18 @@ func TestUploadingMultipleFiles(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
   
response := httptest.NewRecorder() response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) testInterceptMultipartFiles(t, response, httpRequest, nilHandler, &testFormProcessor{})
   
require.Equal(t, 400, response.Code) require.Equal(t, 400, response.Code)
require.Equal(t, "upload request contains more than 10 files\n", response.Body.String()) require.Equal(t, "upload request contains more than 10 files\n", response.Body.String())
} }
   
func TestUploadProcessingFile(t *testing.T) { func TestUploadProcessingFile(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads") testhelper.ConfigureSecret()
require.NoError(t, err) tempPath := t.TempDir()
defer os.RemoveAll(tempPath)
   
_, testServer := test.StartObjectStore() objectStore, testServer := test.StartObjectStore()
defer testServer.Close() defer testServer.Close()
   
storeUrl := testServer.URL + test.ObjectPath storeUrl := testServer.URL + test.ObjectPath
Loading
@@ -298,21 +264,24 @@ func TestUploadProcessingFile(t *testing.T) {
Loading
@@ -298,21 +264,24 @@ func TestUploadProcessingFile(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
preauth *api.Response preauth *api.Response
content func(t *testing.T) []byte
}{ }{
{ {
name: "FileStore Upload", name: "FileStore Upload",
preauth: &api.Response{TempPath: tempPath}, preauth: &api.Response{TempPath: tempPath},
content: func(t *testing.T) []byte {
entries, err := os.ReadDir(tempPath)
require.NoError(t, err)
require.Len(t, entries, 1)
content, err := os.ReadFile(path.Join(tempPath, entries[0].Name()))
require.NoError(t, err)
return content
},
}, },
{ {
name: "ObjectStore Upload", name: "ObjectStore Upload",
preauth: &api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}}, preauth: &api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl, ID: "123"}},
}, content: func(*testing.T) []byte { return objectStore.GetObject(test.ObjectPath) },
{
name: "ObjectStore and FileStore Upload",
preauth: &api.Response{
TempPath: tempPath,
RemoteObject: api.RemoteObject{StoreURL: storeUrl},
},
}, },
} }
   
Loading
@@ -330,26 +299,20 @@ func TestUploadProcessingFile(t *testing.T) {
Loading
@@ -330,26 +299,20 @@ func TestUploadProcessingFile(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
   
response := httptest.NewRecorder() response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath} fa := &eagerAuthorizer{test.preauth}
preparer := &DefaultPreparer{} preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &testFormProcessor{}, opts) interceptMultipartFiles(response, httpRequest, nilHandler, &testFormProcessor{}, fa, preparer)
   
require.Equal(t, 200, response.Code) require.Equal(t, 200, response.Code)
require.Equal(t, "test", string(test.content(t)))
}) })
} }
} }
   
func TestInvalidFileNames(t *testing.T) { func TestInvalidFileNames(t *testing.T) {
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
   
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
for _, testCase := range []struct { for _, testCase := range []struct {
filename string filename string
code int code int
Loading
@@ -376,24 +339,14 @@ func TestInvalidFileNames(t *testing.T) {
Loading
@@ -376,24 +339,14 @@ func TestInvalidFileNames(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
   
response := httptest.NewRecorder() response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath} testInterceptMultipartFiles(t, response, httpRequest, nilHandler, &SavedFileTracker{Request: httpRequest})
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, nilHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
require.Equal(t, testCase.code, response.Code) require.Equal(t, testCase.code, response.Code)
require.Equal(t, testCase.expectedPrefix, opts.TempFilePrefix)
} }
} }
   
func TestContentDispositionRewrite(t *testing.T) { func TestContentDispositionRewrite(t *testing.T) {
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
   
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
tests := []struct { tests := []struct {
desc string desc string
header string header string
Loading
@@ -442,12 +395,7 @@ func TestContentDispositionRewrite(t *testing.T) {
Loading
@@ -442,12 +395,7 @@ func TestContentDispositionRewrite(t *testing.T) {
}) })
   
response := httptest.NewRecorder() response := httptest.NewRecorder()
apiResponse := &api.Response{TempPath: tempPath} testInterceptMultipartFiles(t, response, httpRequest, customHandler, &SavedFileTracker{Request: httpRequest})
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
interceptMultipartFiles(response, httpRequest, customHandler, apiResponse, &SavedFileTracker{Request: httpRequest}, opts)
   
upstreamRequest, err := http.ReadRequest(bufio.NewReader(&upstreamRequestBuffer)) upstreamRequest, err := http.ReadRequest(bufio.NewReader(&upstreamRequestBuffer))
require.NoError(t, err) require.NoError(t, err)
Loading
@@ -534,10 +482,6 @@ func TestUploadHandlerRemovingExifCorruptedFile(t *testing.T) {
Loading
@@ -534,10 +482,6 @@ func TestUploadHandlerRemovingExifCorruptedFile(t *testing.T) {
} }
   
func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, tsHandler func(http.ResponseWriter, *http.Request)) { func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, tsHandler func(http.ResponseWriter, *http.Request)) {
tempPath, err := ioutil.TempDir("", "uploads")
require.NoError(t, err)
defer os.RemoveAll(tempPath)
var buffer bytes.Buffer var buffer bytes.Buffer
   
writer := multipart.NewWriter(&buffer) writer := multipart.NewWriter(&buffer)
Loading
@@ -565,12 +509,8 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts
Loading
@@ -565,12 +509,8 @@ func runUploadTest(t *testing.T, image []byte, filename string, httpCode int, ts
response := httptest.NewRecorder() response := httptest.NewRecorder()
   
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
apiResponse := &api.Response{TempPath: tempPath}
preparer := &DefaultPreparer{}
opts, err := preparer.Prepare(apiResponse)
require.NoError(t, err)
   
interceptMultipartFiles(response, httpRequest, handler, apiResponse, &testFormProcessor{}, opts) testInterceptMultipartFiles(t, response, httpRequest, handler, &testFormProcessor{})
require.Equal(t, httpCode, response.Code) require.Equal(t, httpCode, response.Code)
} }
   
Loading
@@ -587,3 +527,12 @@ func waitUntilDeleted(t *testing.T, path string) {
Loading
@@ -587,3 +527,12 @@ func waitUntilDeleted(t *testing.T, path string) {
}, 10*time.Second, 10*time.Millisecond) }, 10*time.Second, 10*time.Millisecond)
require.True(t, os.IsNotExist(err), "expected the file to be deleted") require.True(t, os.IsNotExist(err), "expected the file to be deleted")
} }
func testInterceptMultipartFiles(t *testing.T, w http.ResponseWriter, r *http.Request, h http.Handler, filter MultipartFormProcessor) {
t.Helper()
fa := &eagerAuthorizer{&api.Response{TempPath: t.TempDir()}}
preparer := &DefaultPreparer{}
interceptMultipartFiles(w, r, h, filter, fa, preparer)
}
Loading
@@ -223,7 +223,7 @@ func configureRoutes(u *upstream) {
Loading
@@ -223,7 +223,7 @@ func configureRoutes(u *upstream) {
mimeMultipartUploader := upload.Multipart(api, signingProxy, preparer) mimeMultipartUploader := upload.Multipart(api, signingProxy, preparer)
   
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
tempfileMultipartProxy := upload.Multipart(&upload.SkipRailsAuthorizer{TempPath: uploadPath}, proxy, preparer) tempfileMultipartProxy := upload.SkipRailsPreAuthMultipart(uploadPath, proxy, preparer)
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
   
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment