Skip to content
Snippets Groups Projects
Commit 37389823 authored by Stan Hu's avatar Stan Hu
Browse files

Refactor handlePostRPC to be more clear

parent 232cd84c
No related branches found
No related tags found
1 merge request!110Fix stalled HTTP fetches with large payloads
Pipeline #
Loading
Loading
@@ -127,9 +127,6 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
}
 
func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
var err error
var body io.Reader
var isShallowClone bool
var writtenIn int64
 
w := NewGitHttpResponseWriter(rw)
Loading
Loading
@@ -144,110 +141,165 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
return
}
 
if action == "git-upload-pack" {
buffer := &bytes.Buffer{}
// Only sniff on the first 4096 bytes: we assume that if we find no
// 'deepen' message in the first 4096 bytes there won't be one later
// either.
_, err = io.Copy(buffer, io.LimitReader(r.Body, 4096))
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handlePostRPC: buffer git-upload-pack body: %v", err)})
return
}
if action == "git-receive-pack" {
writtenIn, _ = handleReceivePack(action, w, r, a)
} else {
writtenIn, _ = handleUploadPack(action, w, r, a)
}
}
 
isShallowClone = scanDeepen(bytes.NewReader(buffer.Bytes()))
body = io.MultiReader(buffer, r.Body)
func handleReceivePack(action string, w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
body := r.Body
cmd, stdin, stdout, err := setupGitCommand(action, a, w, r)
 
// Read out the full HTTP request body so that we can reply
buf, err := ioutil.ReadAll(body)
if err != nil {
return
}
 
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handlePostRPC: full buffer git-upload-pack body: %v", err)})
return
}
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
 
body = ioutil.NopCloser(bytes.NewBuffer(buf))
r.Body.Close()
} else {
body = r.Body
// Write the client request body to Git's standard input
writtenIn, err = io.Copy(stdin, body)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handleReceivePack: write to %v: %v", cmd.Args, err))
return
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
writePostRPCHeader(w, action)
// This io.Copy may take a long time, both for Git push and pull.
_, err = io.Copy(w, stdout)
 
// Prepare our Git subprocess
cmd := gitCommand(a.GL_ID, "git", subCommand(action), "--stateless-rpc", a.RepoPath)
stdout, err := cmd.StdoutPipe()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: stdout: %v", err))
helper.LogError(
r,
&copyError{fmt.Errorf("handleReceivePack: copy output of %v: %v", cmd.Args, err)},
)
return
}
defer stdout.Close()
stdin, err := cmd.StdinPipe()
err = cmd.Wait()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: stdin: %v", err))
helper.LogError(r, fmt.Errorf("handleReceivePack: wait for %v: %v", cmd.Args, err))
return
}
defer stdin.Close()
if err := cmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: start %v: %v", cmd.Args, err))
return writtenIn, nil
}
func handleUploadPack(action string, w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) {
var isShallowClone bool
var body io.Reader
buffer := &bytes.Buffer{}
// Only sniff on the first 4096 bytes: we assume that if we find no
// 'deepen' message in the first 4096 bytes there won't be one later
// either.
_, err = io.Copy(buffer, io.LimitReader(r.Body, 4096))
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handleUploadPack: buffer git-upload-pack body: %v", err)})
return
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
 
stdoutError := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
isShallowClone = scanDeepen(bytes.NewReader(buffer.Bytes()))
body = io.MultiReader(buffer, r.Body)
 
if action == "git-upload-pack" {
// Start writing the response
writePostRPCHeader(w, action)
go func() {
defer wg.Done()
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)},
)
stdoutError <- err
return
}
}()
// Read out the full HTTP request body so that we can reply
buf, err := ioutil.ReadAll(body)
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handleUploadPack: full buffer git-upload-pack body: %v", err)})
return
}
 
// Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, body); err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: write to %v: %v", cmd.Args, err))
body = ioutil.NopCloser(bytes.NewBuffer(buf))
r.Body.Close()
cmd, stdin, stdout, err := setupGitCommand(action, a, w, r)
if err != nil {
return
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
 
if action == "git-upload-pack" {
wg.Wait()
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
 
if len(stdoutError) > 0 {
return
}
} else {
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
stdoutError := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
// Start writing the response
writePostRPCHeader(w, action)
 
writePostRPCHeader(w, action)
go func() {
defer wg.Done()
 
// This io.Copy may take a long time, both for Git push and pull.
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)},
&copyError{fmt.Errorf("handleUploadPack: copy output of %v: %v", cmd.Args, err)},
)
stdoutError <- err
return
}
}()
// Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, body); err != nil {
helper.Fail500(w, r, fmt.Errorf("handleUploadPack: write to %v: %v", cmd.Args, err))
return
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
wg.Wait()
if len(stdoutError) > 0 {
return
}
if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) {
helper.LogError(r, fmt.Errorf("handlePostRPC: wait for %v: %v", cmd.Args, err))
err = cmd.Wait()
if err != nil && !(isExitError(err) && isShallowClone) {
helper.LogError(r, fmt.Errorf("handleUploadPack: wait for %v: %v", cmd.Args, err))
return
}
return writtenIn, nil
}
func setupGitCommand(action string, a *api.Response, w *GitHttpResponseWriter, r *http.Request) (cmd *exec.Cmd, stdin io.WriteCloser, stdout io.ReadCloser, err error) {
// Prepare our Git subprocess
cmd = gitCommand(a.GL_ID, "git", subCommand(action), "--stateless-rpc", a.RepoPath)
stdout, err = cmd.StdoutPipe()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("setupGitCommand: stdout: %v", err))
return nil, nil, nil, err
}
stdin, err = cmd.StdinPipe()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("setupGitCommand: stdin: %v", err))
return nil, nil, nil, err
}
if err = cmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("setupGitCommand: start %v: %v", cmd.Args, err))
return nil, nil, nil, err
}
return cmd, stdin, stdout, nil
}
 
func writePostRPCHeader(w http.ResponseWriter, action string) {
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment