Skip to content
Snippets Groups Projects
Unverified Commit f285c96b authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Number of improvements

- Support manifest signatures
- Support List command of FS
- Allow to concurrent listing of blobs
parent b9e20cbc
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -5,11 +5,14 @@ import (
"path/filepath"
"strings"
"sync"
"flag"
 
"github.com/Sirupsen/logrus"
"github.com/dustin/go-humanize"
)
 
var concurrentBlobAccess = flag.Bool("concurrent-blob-access", false, "Allow to use concurrent blob access")
type blobData struct {
name string
size int64
Loading
Loading
@@ -86,12 +89,43 @@ func (b blobsData) addBlob(segments []string, info fileInfo) error {
 
func (b blobsData) walk() error {
logrus.Infoln("Walking BLOBS...")
err := currentStorage.Walk("blobs", func(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
return err
jg := jobsRunner.group()
if *concurrentBlobAccess {
listRootPath := filepath.Join("blobs", "sha256")
err := currentStorage.List(listRootPath, func(listPath string, info fileInfo, err error) error {
if !info.directory {
return nil
}
jg.Dispatch(func() error {
walkPath := filepath.Join(listRootPath, listPath)
logrus.Infoln("BLOB DIR:", walkPath)
return currentStorage.Walk(walkPath, "blobs", func(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
})
return nil
})
if err != nil {
return err
}
} else {
err := currentStorage.Walk("blobs", "blobs", func(path string, info fileInfo, err error) error {
if path == "" || info.directory {
return nil
}
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
if err != nil {
return err
}
}
return jg.Finish()
}
 
func (b blobsData) info() {
Loading
Loading
Loading
Loading
@@ -22,13 +22,19 @@ func (f *fsStorage) fullPath(path string) string {
return filepath.Join(*fsRootDir, "docker", "registry", "v2", path)
}
 
func (f *fsStorage) Walk(rootDir string, fn walkFunc) error {
func (f *fsStorage) Walk(rootDir string, baseDir string, fn walkFunc) error {
rootDir, err := filepath.Abs(f.fullPath(rootDir))
if err != nil {
return nil
}
rootDir += "/"
 
baseDir, err = filepath.Abs(f.fullPath(baseDir))
if err != nil {
return nil
}
baseDir += "/"
return filepath.Walk(rootDir, func(fullPath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
Loading
Loading
@@ -36,8 +42,8 @@ func (f *fsStorage) Walk(rootDir string, fn walkFunc) error {
 
path := fullPath
 
if strings.HasPrefix(path, rootDir) {
path = path[len(rootDir):]
if strings.HasPrefix(path, baseDir) {
path = path[len(baseDir):]
}
 
fi := fileInfo{fullPath: fullPath, size: info.Size()}
Loading
Loading
@@ -45,6 +51,41 @@ func (f *fsStorage) Walk(rootDir string, fn walkFunc) error {
})
}
 
func (f *fsStorage) List(rootDir string, fn walkFunc) error {
rootDir, err := filepath.Abs(f.fullPath(rootDir))
if err != nil {
return nil
}
rootDir += "/"
return filepath.Walk(rootDir, func(fullPath string, info os.FileInfo, err error) error {
path := fullPath
if strings.HasPrefix(path, rootDir) {
path = path[len(rootDir):]
}
if path == "" {
return nil
}
fi := fileInfo{fullPath: fullPath, size: info.Size(), directory: info.IsDir()}
if info.IsDir() {
err = fn(path, fi, err)
if err != nil {
return err
}
return filepath.SkipDir
} else {
fi := fileInfo{fullPath: fullPath, size: info.Size()}
return fn(path, fi, err)
}
})
}
func (f *fsStorage) Read(path string, etag string) ([]byte, error) {
return ioutil.ReadFile(f.fullPath(path))
}
Loading
Loading
Loading
Loading
@@ -24,6 +24,28 @@ func analyzeLink(args []string) (string, error) {
return args[1], nil
}
 
func analyzeLinkSignature(args []string) (string, string, error) {
// sha256/8d0c94a38dfa0db8827089a036d47482aa30550510d62f8fb2021548f49b1c84/signatures/sha256/6b659c9f4d1ff9c422f7bc517a0e896bc7fadb99a00e5db4c9921ddf8b5d402c/link
if len(args) != 6 {
return "", "", fmt.Errorf("invalid args for signature link: %v", args)
}
if args[0] != "sha256" || args[3] != "sha256" {
return "", "", fmt.Errorf("only sha256 is supported: %v", args[0])
}
if args[2] != "signatures" {
return "", "", fmt.Errorf("expected signatures as path component: %v", args[2])
}
if args[5] != "link" {
return "", "", fmt.Errorf("expected link as path component: %v", args[2])
}
return args[1], args[4], nil
}
func compareEtag(data []byte, etag string) bool {
hash := md5.Sum(data)
hex := hex.EncodeToString(hash[:])
Loading
Loading
@@ -34,7 +56,7 @@ func compareEtag(data []byte, etag string) bool {
func readLink(path string, etag string) (string, error) {
data, err := currentStorage.Read(path, etag)
if err != nil {
return "", nil
return "", err
}
 
link := string(data)
Loading
Loading
@@ -50,23 +72,23 @@ func readLink(path string, etag string) (string, error) {
return link, nil
}
 
func verifyLink(link string, info fileInfo) error {
func verifyLink(link string, path string, etag string) error {
// If we have e-tag, let's verify e-tag
if info.etag != "" {
if etag != "" {
content := "sha256:" + link
if compareEtag([]byte(content), info.etag) {
if compareEtag([]byte(content), etag) {
return nil
} else {
return fmt.Errorf("etag for %s is not equal %s", link, info.etag)
return fmt.Errorf("etag for %s is not equal %s", link, etag)
}
} else {
readed, err := readLink(info.fullPath, info.etag)
readed, err := readLink(path, etag)
if err != nil {
return err
}
 
if readed != link {
return fmt.Errorf("readed link for %s is not equal %s", link, readed)
return fmt.Errorf("%s: readed link for %s is not equal %s", path, link, readed)
}
 
return nil
Loading
Loading
Loading
Loading
@@ -11,12 +11,13 @@ import (
)
 
type repositoryData struct {
name string
layers map[string]int
manifests map[string]int
tags map[string]*tag
uploads map[string]int
lock sync.Mutex
name string
layers map[string]int
manifests map[string]int
manifestSignatures map[string][]string
tags map[string]*tag
uploads map[string]int
lock sync.Mutex
}
 
type repositoriesData map[string]*repositoryData
Loading
Loading
@@ -25,11 +26,12 @@ var repositoriesLock sync.Mutex
 
func newRepositoryData(name string) *repositoryData {
return &repositoryData{
name: name,
layers: make(map[string]int),
manifests: make(map[string]int),
tags: make(map[string]*tag),
uploads: make(map[string]int),
name: name,
layers: make(map[string]int),
manifests: make(map[string]int),
manifestSignatures: make(map[string][]string),
tags: make(map[string]*tag),
uploads: make(map[string]int),
}
}
 
Loading
Loading
@@ -41,6 +43,10 @@ func (r *repositoryData) manifestRevisionPath(revision string) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", "sha256", revision, "link")
}
 
func (r *repositoryData) manifestRevisionSignaturePath(revision, signature string) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", "sha256", revision, "signatures", "sha256", signature, "link")
}
func (r *repositoryData) uploadPath(upload string) string {
return filepath.Join("repositories", r.name, "_uploads", upload, "link")
}
Loading
Loading
@@ -95,6 +101,19 @@ func (r *repositoryData) markManifestLayers(blobs blobsData, name string) error
return nil
}
 
func (r *repositoryData) markManifestSignatures(deletes deletesData, blobs blobsData, name string, signatures []string) error {
if r.manifests[name] > 0 {
for _, signature := range signatures {
blobs.mark(signature)
}
} else {
for _, signature := range signatures {
deletes.schedule(r.manifestRevisionSignaturePath(name, signature), linkFileSize)
}
}
return nil
}
func (r *repositoryData) markLayer(blobs blobsData, name string) error {
return blobs.mark(name)
}
Loading
Loading
@@ -119,6 +138,13 @@ func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
}
}
 
for name, signatures := range r.manifestSignatures {
err := r.markManifestSignatures(deletes, blobs, name, signatures)
if err != nil {
return err
}
}
for name_, used := range r.layers {
name := name_
if used > 0 {
Loading
Loading
@@ -156,7 +182,7 @@ func (r *repositoryData) addLayer(args []string, info fileInfo) error {
return err
}
 
err = verifyLink(link, info)
err = verifyLink(link, r.layerLinkPath(link), info.etag)
if err != nil {
return err
}
Loading
Loading
@@ -171,20 +197,33 @@ func (r *repositoryData) addLayer(args []string, info fileInfo) error {
func (r *repositoryData) addManifestRevision(args []string, info fileInfo) error {
// /test2/_manifests/revisions/sha256/708519982eae159899e908639f5fa22d23d247ad923f6e6ad6128894c5d497a0/link
link, err := analyzeLink(args)
if err != nil {
return err
}
if err == nil {
err = verifyLink(link, r.manifestRevisionPath(link), info.etag)
if err != nil {
return err
}
 
err = verifyLink(link, info)
if err != nil {
return err
r.lock.Lock()
defer r.lock.Unlock()
r.manifests[link] = 0
return nil
}
 
r.lock.Lock()
defer r.lock.Unlock()
link, signature, err := analyzeLinkSignature(args)
if err == nil {
err = verifyLink(link, r.manifestRevisionSignaturePath(link, signature), info.etag)
if err != nil {
return err
}
 
r.manifests[link] = 0
return nil
r.lock.Lock()
defer r.lock.Unlock()
r.manifestSignatures[link] = append(r.manifestSignatures[link], signature)
return nil
}
return err
}
 
func (r *repositoryData) addTag(args []string, info fileInfo) error {
Loading
Loading
@@ -221,7 +260,7 @@ func (r *repositoryData) addManifest(args []string, info fileInfo) error {
func (r *repositoryData) addUpload(args []string, info fileInfo) error {
// /test/_uploads/579c7fc9b0d60a19706cd6c1573fec9a28fa758bfe1ece86a1e5c68ad6f4e9d1
if len(args) != 1 {
logrus.Warningln("invalid args for uploads: %v", args)
// logrus.Warningln("invalid args for uploads: %v", args)
return nil
}
 
Loading
Loading
@@ -292,7 +331,7 @@ func (r repositoriesData) walk() error {
jg := jobsRunner.group()
 
logrus.Infoln("Walking REPOSITORIES...")
err := currentStorage.Walk("repositories", func(path string, info fileInfo, err error) error {
err := currentStorage.Walk("repositories", "repositories", func(path string, info fileInfo, err error) error {
jg.Dispatch(func() error {
err = r.process(strings.Split(path, "/"), info)
if err != nil {
Loading
Loading
Loading
Loading
@@ -45,12 +45,17 @@ func (f *s3Storage) fullPath(path string) string {
return filepath.Join(*s3RootDir, "docker", "registry", "v2", path)
}
 
func (f *s3Storage) Walk(path string, fn walkFunc) error {
func (f *s3Storage) Walk(path string, baseDir string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
 
baseDir = f.fullPath(baseDir)
if baseDir != "/" && baseDir[len(baseDir)-1] != '/' {
baseDir = baseDir + "/"
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Loading
Loading
@@ -67,8 +72,8 @@ func (f *s3Storage) Walk(path string, fn walkFunc) error {
for _, key := range resp.Contents {
lastKey = *key.Key
keyPath := *key.Key
if strings.HasPrefix(keyPath, path) {
keyPath = keyPath[len(path):]
if strings.HasPrefix(keyPath, baseDir) {
keyPath = keyPath[len(baseDir):]
}
 
fi := fileInfo{
Loading
Loading
@@ -103,6 +108,64 @@ func (f *s3Storage) Walk(path string, fn walkFunc) error {
return nil
}
 
func (f *s3Storage) List(path string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
})
if err != nil {
return err
}
for {
for _, key := range resp.Contents {
keyPath := *key.Key
if strings.HasPrefix(keyPath, path) {
keyPath = keyPath[len(path):]
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
etag: *key.ETag,
lastModified: *key.LastModified,
directory: strings.HasSuffix(*key.Key, "/"),
}
err = fn(keyPath, fi, err)
if err != nil {
return err
}
}
if *resp.IsTruncated {
atomic.AddInt64(&f.apiCalls, 1)
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
Delimiter: aws.String("/"),
Marker: resp.NextMarker,
})
if err != nil {
return err
}
} else {
break
}
}
return nil
}
func (f *s3Storage) Read(path string, etag string) ([]byte, error) {
cachePath := filepath.Join(*s3CacheStorage, path)
if etag != "" && *s3CacheStorage != "" {
Loading
Loading
Loading
Loading
@@ -7,12 +7,14 @@ type fileInfo struct {
size int64
etag string
lastModified time.Time
directory bool
}
 
type walkFunc func(path string, info fileInfo, err error) error
 
type storageObject interface {
Walk(path string, fn walkFunc) error
Walk(path string, basePath string, fn walkFunc) error
List(path string, fn walkFunc) error
Read(path string, etag string) ([]byte, error)
Delete(path string) error
Info()
Loading
Loading
Loading
Loading
@@ -65,7 +65,7 @@ func (t *tag) addVersion(args []string, info fileInfo) error {
return err
}
 
err = verifyLink(link, info)
err = verifyLink(link, t.versionLinkPath(link), info.etag)
if err != nil {
return err
}
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment