Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gitlab-org/build/omnibus-mirror/distribution
1 result
Show changes
Commits on Source (6)
Showing
with 628 additions and 271 deletions
Loading
Loading
@@ -88,13 +88,12 @@ func (bs *blobStore) Put(ctx context.Context, mediaType string, p []byte) (distr
}
 
func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Digest) error) error {
specPath, err := pathFor(blobsPathSpec{})
if err != nil {
return err
}
 
err = Walk(ctx, bs.driver, specPath, func(fileInfo driver.FileInfo) error {
return bs.driver.Walk(ctx, specPath, func(fileInfo driver.FileInfo) error {
// skip directories
if fileInfo.IsDir() {
return nil
Loading
Loading
@@ -114,7 +113,6 @@ func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Di
 
return ingester(digest)
})
return err
}
 
// path returns the canonical path for the blob identified by digest. The blob
Loading
Loading
Loading
Loading
@@ -10,14 +10,11 @@ import (
"github.com/docker/distribution/registry/storage/driver"
)
 
// errFinishedWalk signals an early exit to the walk when the current query
// is satisfied.
var errFinishedWalk = errors.New("finished walk")
// Returns a list, or partial list, of repositories in the registry.
// Because it's a quite expensive operation, it should only be used when building up
// an initial set of repositories.
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
var finishedWalk bool
var foundRepos []string
 
if len(repos) == 0 {
Loading
Loading
@@ -29,7 +26,7 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
return 0, err
}
 
err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
foundRepos = append(foundRepos, repoPath)
return nil
Loading
Loading
@@ -40,7 +37,8 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
 
// if we've filled our array, no need to walk any further
if len(foundRepos) == len(repos) {
return errFinishedWalk
finishedWalk = true
return driver.ErrSkipDir
}
 
return nil
Loading
Loading
@@ -48,14 +46,11 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
 
n = copy(repos, foundRepos)
 
switch err {
case nil:
// nil means that we completed walk and didn't fill buffer. No more
// records are available.
err = io.EOF
case errFinishedWalk:
// more records are available.
err = nil
if err != nil {
return n, err
} else if !finishedWalk {
// We didn't fill buffer. No more records are available.
return n, io.EOF
}
 
return n, err
Loading
Loading
@@ -68,7 +63,7 @@ func (reg *registry) Enumerate(ctx context.Context, ingester func(string) error)
return err
}
 
err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error {
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
return handleRepository(fileInfo, root, "", ingester)
})
 
Loading
Loading
@@ -144,9 +139,9 @@ func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoP
return err
}
}
return ErrSkipDir
return driver.ErrSkipDir
} else if strings.HasPrefix(file, "_") {
return ErrSkipDir
return driver.ErrSkipDir
}
 
return nil
Loading
Loading
Loading
Loading
@@ -336,6 +336,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and
Loading
Loading
Loading
Loading
@@ -197,3 +197,15 @@ func (base *Base) URLFor(ctx context.Context, path string, options map[string]in
str, e := base.StorageDriver.URLFor(ctx, path, options)
return str, base.setDriverName(e)
}
// Walk wraps Walk of underlying storage driver.
func (base *Base) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
ctx, done := dcontext.WithTrace(ctx)
defer done("%s.Walk(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()}
}
return base.setDriverName(base.StorageDriver.Walk(ctx, path, f))
}
Loading
Loading
@@ -315,6 +315,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return "", storagedriver.ErrUnsupportedMethod{}
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
Loading
Loading
Loading
Loading
@@ -779,6 +779,12 @@ func (d *driver) URLFor(context context.Context, path string, options map[string
return storage.SignedURL(d.bucket, name, opts)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
u := &url.URL{
Scheme: "https",
Loading
Loading
Loading
Loading
@@ -240,6 +240,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return "", storagedriver.ErrUnsupportedMethod{}
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
type writer struct {
d *driver
f *file
Loading
Loading
Loading
Loading
@@ -479,6 +479,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return signedURL, nil
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func (d *driver) ossPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
Loading
Loading
Loading
Loading
@@ -34,6 +34,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
 
dcontext "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/client/transport"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
Loading
Loading
@@ -874,6 +875,136 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return req.Presign(expiresIn)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error {
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil {
return err
}
// S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects
if objectCount == 0 {
return storagedriver.PathNotFoundError{Path: from}
}
return nil
}
type walkInfoContainer struct {
storagedriver.FileInfoFields
prefix *string
}
// Path provides the full path of the target of this file info.
func (wi walkInfoContainer) Path() string {
return wi.FileInfoFields.Path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (wi walkInfoContainer) Size() int64 {
return wi.FileInfoFields.Size
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (wi walkInfoContainer) ModTime() time.Time {
return wi.FileInfoFields.ModTime
}
// IsDir returns true if the path is a directory.
func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
var retError error
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
}
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
*objectCount += *objects.KeyCount
walkInfos := make([]walkInfoContainer, 0, *objects.KeyCount)
for _, dir := range objects.CommonPrefixes {
commonPrefix := *dir.Prefix
walkInfos = append(walkInfos, walkInfoContainer{
prefix: dir.Prefix,
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
},
})
}
for _, file := range objects.Contents {
walkInfos = append(walkInfos, walkInfoContainer{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
ModTime: *file.LastModified,
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
},
})
}
sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })
for _, walkInfo := range walkInfos {
err := f(walkInfo)
if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
continue
} else {
break
}
} else if err != nil {
retError = err
return false
}
if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
retError = err
return false
}
}
}
return true
})
if retError != nil {
return retError
}
if listObjectErr != nil {
return listObjectErr
}
return nil
}
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
Loading
Loading
Loading
Loading
@@ -546,6 +546,12 @@ func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).s3Path(path)
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func parseError(path string, err error) error {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}
Loading
Loading
Loading
Loading
@@ -83,6 +83,13 @@ type StorageDriver interface {
// May return an ErrUnsupportedMethod in certain StorageDriver
// implementations.
URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file.
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
Walk(ctx context.Context, path string, f WalkFn) error
}
 
// FileWriter provides an abstraction for an opened writable file-like object in
Loading
Loading
Loading
Loading
@@ -644,6 +644,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
return tempURL, nil
}
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
func (d *driver) swiftPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
}
Loading
Loading
package storage
package driver
 
import (
"context"
"errors"
"fmt"
"sort"
storageDriver "github.com/docker/distribution/registry/storage/driver"
)
 
// ErrSkipDir is used as a return value from onFileFunc to indicate that
Loading
Loading
@@ -15,14 +12,14 @@ import (
var ErrSkipDir = errors.New("skip this directory")
 
// WalkFn is called once per file by Walk
// If the returned error is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. Otherwise Walk will return
type WalkFn func(fileInfo storageDriver.FileInfo) error
type WalkFn func(fileInfo FileInfo) error
 
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f WalkFn) error {
// WalkFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
Loading
Loading
@@ -38,22 +35,18 @@ func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string,
return err
}
err = f(fileInfo)
skipDir := (err == ErrSkipDir)
if err != nil && !skipDir {
return err
}
if fileInfo.IsDir() && !skipDir {
if err := Walk(ctx, driver, child, f); err != nil {
if err == nil && fileInfo.IsDir() {
if err := WalkFallback(ctx, driver, child, f); err != nil {
return err
}
} else if err == ErrSkipDir {
// Stop iteration if it's a file, otherwise noop if it's a directory
if !fileInfo.IsDir() {
return nil
}
} else if err != nil {
return err
}
}
return nil
}
// pushError formats an error type given a path and an error
// and pushes it to a slice of errors
func pushError(errors []error, path string, err error) []error {
return append(errors, fmt.Errorf("%s: %s", path, err))
}
package storage
import "fmt"
// pushError formats an error type given a path and an error
// and pushes it to a slice of errors
func pushError(errors []error, path string, err error) []error {
return append(errors, fmt.Errorf("%s: %s", path, err))
}
Loading
Loading
@@ -237,7 +237,7 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.
if err != nil {
return err
}
err = Walk(ctx, lbs.blobStore.driver, rootPath, func(fileInfo driver.FileInfo) error {
return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
// exit early if directory...
if fileInfo.IsDir() {
return nil
Loading
Loading
@@ -273,12 +273,6 @@ func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.
 
return nil
})
if err != nil {
return err
}
return nil
}
 
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) {
Loading
Loading
Loading
Loading
@@ -67,7 +67,7 @@ func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriv
return uploads, append(errors, err)
}
 
err = Walk(ctx, driver, root, func(fileInfo storageDriver.FileInfo) error {
err = driver.Walk(ctx, root, func(fileInfo storageDriver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if file[0] == '_' {
Loading
Loading
@@ -75,7 +75,7 @@ func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriv
inUploadDir = (file == "_uploads")
 
if fileInfo.IsDir() && !inUploadDir {
return ErrSkipDir
return storageDriver.ErrSkipDir
}
 
}
Loading
Loading
Loading
Loading
@@ -142,7 +142,7 @@ func TestPurgeMissingStartedAt(t *testing.T) {
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs, ctx := testUploadFS(t, 1, "test-repo", oneHourAgo)
 
err := Walk(ctx, fs, "/", func(fileInfo driver.FileInfo) error {
err := fs.Walk(ctx, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
 
Loading
Loading
package storage
import (
"context"
"fmt"
"sort"
"testing"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func testFS(t *testing.T) (driver.StorageDriver, map[string]string, context.Context) {
d := inmemory.New()
ctx := context.Background()
expected := map[string]string{
"/a": "dir",
"/a/b": "dir",
"/a/b/c": "dir",
"/a/b/c/d": "file",
"/a/b/c/e": "file",
"/a/b/f": "dir",
"/a/b/f/g": "file",
"/a/b/f/h": "file",
"/a/b/f/i": "file",
"/z": "dir",
"/z/y": "file",
}
for p, typ := range expected {
if typ != "file" {
continue
}
if err := d.PutContent(ctx, p, []byte(p)); err != nil {
t.Fatalf("unable to put content into fixture: %v", err)
}
}
return d, expected, ctx
}
func TestWalkErrors(t *testing.T) {
d, expected, ctx := testFS(t)
fileCount := len(expected)
err := Walk(ctx, d, "", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
t.Error("Expected invalid root err")
}
errEarlyExpected := fmt.Errorf("Early termination")
err = Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
// error on the 2nd file
if fileInfo.Path() == "/a/b" {
return errEarlyExpected
}
delete(expected, fileInfo.Path())
return nil
})
if len(expected) != fileCount-1 {
t.Error("Walk failed to terminate with error")
}
if err != errEarlyExpected {
if err == nil {
t.Fatalf("expected an error due to early termination")
} else {
t.Error(err.Error())
}
}
err = Walk(ctx, d, "/nonexistent", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
t.Errorf("Expected missing file err")
}
}
func TestWalk(t *testing.T) {
d, expected, ctx := testFS(t)
var traversed []string
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
filetype, ok := expected[filePath]
if !ok {
t.Fatalf("Unexpected file in walk: %q", filePath)
}
if fileInfo.IsDir() {
if filetype != "dir" {
t.Errorf("Unexpected file type: %q", filePath)
}
} else {
if filetype != "file" {
t.Errorf("Unexpected file type: %q", filePath)
}
// each file has its own path as the contents. If the length
// doesn't match the path length, fail.
if fileInfo.Size() != int64(len(fileInfo.Path())) {
t.Fatalf("unexpected size for %q: %v != %v",
fileInfo.Path(), fileInfo.Size(), len(fileInfo.Path()))
}
}
delete(expected, filePath)
traversed = append(traversed, filePath)
return nil
})
if len(expected) > 0 {
t.Errorf("Missed files in walk: %q", expected)
}
if !sort.StringsAreSorted(traversed) {
t.Errorf("result should be sorted: %v", traversed)
}
if err != nil {
t.Fatalf(err.Error())
}
}
func TestWalkSkipDir(t *testing.T) {
d, expected, ctx := testFS(t)
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
if filePath == "/a/b" {
// skip processing /a/b/c and /a/b/c/d
return ErrSkipDir
}
delete(expected, filePath)
return nil
})
if err != nil {
t.Fatalf(err.Error())
}
if _, ok := expected["/a/b/c"]; !ok {
t.Errorf("/a/b/c not skipped")
}
if _, ok := expected["/a/b/c/d"]; !ok {
t.Errorf("/a/b/c/d not skipped")
}
if _, ok := expected["/a/b/c/e"]; !ok {
t.Errorf("/a/b/c/e not skipped")
}
}
github.com/Azure/azure-sdk-for-go 088007b3b08cc02b27f2eadfdcd870958460ce7e
github.com/Azure/go-autorest ec5f4903f77ed9927ac95b19ab8e44ada64c1356
github.com/sirupsen/logrus 3d4380f53a34dcdc95f0c1db702615992b38d9a4
github.com/aws/aws-sdk-go c6fc52983ea2375810aa38ddb5370e9cdf611716
github.com/aws/aws-sdk-go 5bcc0a238d880469f949fc7cd24e35f32ab80cbd
github.com/bshuster-repo/logrus-logstash-hook d2c0ecc1836d91814e15e23bb5dc309c3ef51f4a
github.com/bugsnag/bugsnag-go b1d153021fcd90ca3f080db36bec96dc690fb274
github.com/bugsnag/osext 0dd3f918b21bec95ace9dc86c7e70266cfc5c702
Loading
Loading