Skip to content
Snippets Groups Projects
Unverified Commit 5cb406d5 authored by Derek McGowan's avatar Derek McGowan Committed by GitHub
Browse files

Merge pull request #2455 from sargun/make-walk-faster

s3: improve walk performance
parents b5db8eee cbcbcb02
No related branches found
No related tags found
No related merge requests found
Showing
with 628 additions and 271 deletions
Loading
Loading
@@ -88,13 +88,12 @@ func (bs *blobStore) Put(ctx context.Context, mediaType string, p []byte) (distr
}
 
func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Digest) error) error {
specPath, err := pathFor(blobsPathSpec{})
if err != nil {
return err
}
 
err = Walk(ctx, bs.driver, specPath, func(fileInfo driver.FileInfo) error {
return bs.driver.Walk(ctx, specPath, func(fileInfo driver.FileInfo) error {
// skip directories
if fileInfo.IsDir() {
return nil
Loading
Loading
@@ -114,7 +113,6 @@ func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Di
 
return ingester(digest)
})
return err
}
 
// path returns the canonical path for the blob identified by digest. The blob
Loading
Loading
Loading
Loading
@@ -10,14 +10,11 @@ import (
"github.com/docker/distribution/registry/storage/driver"
)
 
// errFinishedWalk signals an early exit to the walk when the current query
// is satisfied.
var errFinishedWalk = errors.New("finished walk")
// Returns a list, or partial list, of repositories in the registry.
// Because it's a quite expensive operation, it should only be used when building up
// an initial set of repositories.
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
var finishedWalk bool
var foundRepos []string
 
if len(repos) == 0 {
Loading
Loading
@@ -29,7 +26,7 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
return 0, err
}
 
err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
foundRepos = append(foundRepos, repoPath)
return nil
Loading
Loading
@@ -40,7 +37,8 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
 
// if we've filled our array, no need to walk any further
if len(foundRepos) == len(repos) {
return errFinishedWalk
finishedWalk = true
return driver.ErrSkipDir
}
 
return nil
Loading
Loading
@@ -48,14 +46,11 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
 
n = copy(repos, foundRepos)
 
switch err {
case nil:
// nil means that we completed walk and didn't fill buffer. No more
// records are available.
err = io.EOF
case errFinishedWalk:
// more records are available.
err = nil
if err != nil {
return n, err
} else if !finishedWalk {
// We didn't fill buffer. No more records are available.
return n, io.EOF
}
 
return n, err
Loading
Loading
@@ -68,7 +63,7 @@ func (reg *registry) Enumerate(ctx context.Context, ingester func(string) error)
return err
}
 
err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
return handleRepository(fileInfo, root, "", ingester)
})
 
Loading
Loading
@@ -144,9 +139,9 @@ func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoP
return err
}
}
return ErrSkipDir
return driver.ErrSkipDir
} else if strings.HasPrefix(file, "_") {
return ErrSkipDir
return driver.ErrSkipDir
}
 
return nil
Loading
Loading
Loading
Loading
@@ -336,6 +336,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and
Loading
Loading
Loading
Loading
@@ -197,3 +197,15 @@ func (base *Base) URLFor(ctx context.Context, path string, options map[string]in
str, e := base.StorageDriver.URLFor(ctx, path, options)
return str, base.setDriverName(e)
}
// Walk wraps Walk of underlying storage driver.
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Walk(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
}
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
}
Loading
Loading
@@ -315,6 +315,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return "", storagedriver.ErrUnsupportedMethod{}
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
Loading
Loading
Loading
Loading
@@ -779,6 +779,12 @@ func (d *driver) URLFor(context context.Context, path string, options map[string
return storage.SignedURL(d.bucket, name, opts)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
u := &url.URL{
Scheme: "https",
Loading
Loading
Loading
Loading
@@ -240,6 +240,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return "", storagedriver.ErrUnsupportedMethod{}
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
type writer struct {
d *driver
f *file
Loading
Loading
Loading
Loading
@@ -479,6 +479,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return signedURL, nil
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func (d *driver) ossPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
Loading
Loading
Loading
Loading
@@ -34,6 +34,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
 
dcontext "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client/transport"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
Loading
Loading
@@ -874,6 +875,136 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return req.Presign(expiresIn)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
return err
}
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}
return nil
}
type walkInfoContainer struct {
storagedriver.FileInfoFields
prefix *string
}
// Path provides the full path of the target of this file info.
func (wi walkInfoContainer) Path() string {
return wi.FileInfoFields.Path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (wi walkInfoContainer) Size() int64 {
return wi.FileInfoFields.Size
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (wi walkInfoContainer) ModTime() time.Time {
return wi.FileInfoFields.ModTime
}
// IsDir returns true if the path is a directory.
func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
var retError error
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
}
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
*objectCount += *objects.KeyCount
walkInfos := make([]walkInfoContainer, 0, *objects.KeyCount)
for _, dir := range objects.CommonPrefixes {
commonPrefix := *dir.Prefix
walkInfos = append(walkInfos, walkInfoContainer{
prefix: dir.Prefix,
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
},
})
}
for _, file := range objects.Contents {
walkInfos = append(walkInfos, walkInfoContainer{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
ModTime: *file.LastModified,
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
},
})
}
sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })
for _, walkInfo := range walkInfos {
err := f(walkInfo)
if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
continue
} else {
break
}
} else if err != nil {
retError = err
return false
}
if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
retError = err
return false
}
}
}
return true
})
if retError != nil {
return retError
}
if listObjectErr != nil {
return listObjectErr
}
return nil
}
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
Loading
Loading
Loading
Loading
@@ -546,6 +546,12 @@ func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).s3Path(path)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func parseError(path string, err error) error {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}
Loading
Loading
Loading
Loading
@@ -83,6 +83,13 @@ type StorageDriver interface {
// May return an ErrUnsupportedMethod in certain StorageDriver
// implementations.
URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file.
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
Walk(ctx context.Context, path string, f WalkFn) error
}
 
// FileWriter provides an abstraction for an opened writable file-like object in
Loading
Loading
Loading
Loading
@@ -644,6 +644,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return tempURL, nil
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func (d *driver) swiftPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
}
Loading
Loading
package storage
package driver
 
import (
"context"
"errors"
"fmt"
"sort"
storageDriver "github.com/docker/distribution/registry/storage/driver"
)
 
// ErrSkipDir is used as a return value from onFileFunc to indicate that
Loading
Loading
@@ -15,14 +12,14 @@ import (
var ErrSkipDir = errors.New("skip this directory")
 
// WalkFn is called once per file by Walk
// If the returned error is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. Otherwise Walk will return
type WalkFn func(fileInfo storageDriver.FileInfo) error
type WalkFn func(fileInfo FileInfo) error
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f WalkFn) error {
// WalkFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
Loading
Loading
@@ -38,22 +35,18 @@ func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string,
return err
}
err = f(fileInfo)
skipDir := (err == ErrSkipDir)
if err != nil && !skipDir {
return err
}
if fileInfo.IsDir() && !skipDir {
if err := Walk(ctx, driver, child, f); err != nil {
if err == nil && fileInfo.IsDir() {
if err := WalkFallback(ctx, driver, child, f); err != nil {
return err
}
} else if err == ErrSkipDir {
// Stop iteration if it's a file, otherwise noop if it's a directory
if !fileInfo.IsDir() {
return nil
}
} else if err != nil {
return err
}
}
return nil
}
// pushError formats an error type given a path and an error
// and pushes it to a slice of errors
func pushError(errors []error, path string, err error) []error {
return append(errors, fmt.Errorf("%s: %s", path, err))
}
package storage
import "fmt"
// pushError formats an error type given a path and an error
// and pushes it to a slice of errors
func pushError(errors []error, path string, err error) []error {
return append(errors, fmt.Errorf("%s: %s", path, err))
}
Loading
Loading
@@ -237,7 +237,7 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.
if err != nil {
return err
}
err = Walk(ctx, lbs.blobStore.driver, rootPath, func(fileInfo driver.FileInfo) error {
return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
// exit early if directory...
if fileInfo.IsDir() {
return nil
Loading
Loading
@@ -273,12 +273,6 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.
 
return nil
})
if err != nil {
return err
}
return nil
}
 
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) {
Loading
Loading
Loading
Loading
@@ -67,7 +67,7 @@ func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriv
return uploads, append(errors, err)
}
 
err = Walk(ctx, driver, root, func(fileInfo storageDriver.FileInfo) error {
err = driver.Walk(ctx, root, func(fileInfo storageDriver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if file[0] == '_' {
Loading
Loading
@@ -75,7 +75,7 @@ func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriv
inUploadDir = (file == "_uploads")
 
if fileInfo.IsDir() && !inUploadDir {
return ErrSkipDir
return storageDriver.ErrSkipDir
}
 
}
Loading
Loading
Loading
Loading
@@ -142,7 +142,7 @@ func TestPurgeMissingStartedAt(t *testing.T) {
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs, ctx := testUploadFS(t, 1, "test-repo", oneHourAgo)
 
err := Walk(ctx, fs, "/", func(fileInfo driver.FileInfo) error {
err := fs.Walk(ctx, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
 
Loading
Loading
package storage
import (
"context"
"fmt"
"sort"
"testing"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func testFS(t *testing.T) (driver.StorageDriver, map[string]string, context.Context) {
d := inmemory.New()
ctx := context.Background()
expected := map[string]string{
"/a": "dir",
"/a/b": "dir",
"/a/b/c": "dir",
"/a/b/c/d": "file",
"/a/b/c/e": "file",
"/a/b/f": "dir",
"/a/b/f/g": "file",
"/a/b/f/h": "file",
"/a/b/f/i": "file",
"/z": "dir",
"/z/y": "file",
}
for p, typ := range expected {
if typ != "file" {
continue
}
if err := d.PutContent(ctx, p, []byte(p)); err != nil {
t.Fatalf("unable to put content into fixture: %v", err)
}
}
return d, expected, ctx
}
func TestWalkErrors(t *testing.T) {
d, expected, ctx := testFS(t)
fileCount := len(expected)
err := Walk(ctx, d, "", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
t.Error("Expected invalid root err")
}
errEarlyExpected := fmt.Errorf("Early termination")
err = Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
// error on the 2nd file
if fileInfo.Path() == "/a/b" {
return errEarlyExpected
}
delete(expected, fileInfo.Path())
return nil
})
if len(expected) != fileCount-1 {
t.Error("Walk failed to terminate with error")
}
if err != errEarlyExpected {
if err == nil {
t.Fatalf("expected an error due to early termination")
} else {
t.Error(err.Error())
}
}
err = Walk(ctx, d, "/nonexistent", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
t.Errorf("Expected missing file err")
}
}
func TestWalk(t *testing.T) {
d, expected, ctx := testFS(t)
var traversed []string
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
filetype, ok := expected[filePath]
if !ok {
t.Fatalf("Unexpected file in walk: %q", filePath)
}
if fileInfo.IsDir() {
if filetype != "dir" {
t.Errorf("Unexpected file type: %q", filePath)
}
} else {
if filetype != "file" {
t.Errorf("Unexpected file type: %q", filePath)
}
// each file has its own path as the contents. If the length
// doesn't match the path length, fail.
if fileInfo.Size() != int64(len(fileInfo.Path())) {
t.Fatalf("unexpected size for %q: %v != %v",
fileInfo.Path(), fileInfo.Size(), len(fileInfo.Path()))
}
}
delete(expected, filePath)
traversed = append(traversed, filePath)
return nil
})
if len(expected) > 0 {
t.Errorf("Missed files in walk: %q", expected)
}
if !sort.StringsAreSorted(traversed) {
t.Errorf("result should be sorted: %v", traversed)
}
if err != nil {
t.Fatalf(err.Error())
}
}
func TestWalkSkipDir(t *testing.T) {
d, expected, ctx := testFS(t)
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
if filePath == "/a/b" {
// skip processing /a/b/c and /a/b/c/d
return ErrSkipDir
}
delete(expected, filePath)
return nil
})
if err != nil {
t.Fatalf(err.Error())
}
if _, ok := expected["/a/b/c"]; !ok {
t.Errorf("/a/b/c not skipped")
}
if _, ok := expected["/a/b/c/d"]; !ok {
t.Errorf("/a/b/c/d not skipped")
}
if _, ok := expected["/a/b/c/e"]; !ok {
t.Errorf("/a/b/c/e not skipped")
}
}
github.com/Azure/azure-sdk-for-go 088007b3b08cc02b27f2eadfdcd870958460ce7e
github.com/Azure/go-autorest ec5f4903f77ed9927ac95b19ab8e44ada64c1356
github.com/sirupsen/logrus 3d4380f53a34dcdc95f0c1db702615992b38d9a4
github.com/aws/aws-sdk-go c6fc52983ea2375810aa38ddb5370e9cdf611716
github.com/aws/aws-sdk-go 5bcc0a238d880469f949fc7cd24e35f32ab80cbd
github.com/bshuster-repo/logrus-logstash-hook d2c0ecc1836d91814e15e23bb5dc309c3ef51f4a
github.com/bugsnag/bugsnag-go b1d153021fcd90ca3f080db36bec96dc690fb274
github.com/bugsnag/osext 0dd3f918b21bec95ace9dc86c7e70266cfc5c702
Loading
Loading
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment