Skip to content
Snippets Groups Projects
Unverified Commit deaaaec0 authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Added support for parallel walk

parent f285c96b
No related branches found
No related tags found
No related merge requests found
package main
 
import (
"flag"
"fmt"
"path/filepath"
"strings"
"sync"
"flag"
 
"github.com/Sirupsen/logrus"
"github.com/dustin/go-humanize"
)
 
var concurrentBlobAccess = flag.Bool("concurrent-blob-access", false, "Allow to use concurrent blob access")
var parallelBlobWalk = flag.Bool("parallel-blob-walk", true, "Allow to use parallel blob walker")
 
type blobData struct {
name string
Loading
Loading
@@ -78,6 +78,9 @@ func (b blobsData) addBlob(segments []string, info fileInfo) error {
return fmt.Errorf("path needs to be prefixed with %v: %v", name[0:2], segments)
}
 
blobsLock.Lock()
defer blobsLock.Unlock()
blob := &blobData{
name: name,
size: info.size,
Loading
Loading
@@ -87,45 +90,24 @@ func (b blobsData) addBlob(segments []string, info fileInfo) error {
return nil
}
 
func (b blobsData) walkPath(walkPath string) error {
logrus.Infoln("BLOBS DIR:", walkPath)
return currentStorage.Walk(walkPath, "blobs", func(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
}
func (b blobsData) walk() error {
logrus.Infoln("Walking BLOBS...")
jg := jobsRunner.group()
 
if *concurrentBlobAccess {
if *parallelBlobWalk {
listRootPath := filepath.Join("blobs", "sha256")
err := currentStorage.List(listRootPath, func(listPath string, info fileInfo, err error) error {
if !info.directory {
return nil
}
jg.Dispatch(func() error {
walkPath := filepath.Join(listRootPath, listPath)
logrus.Infoln("BLOB DIR:", walkPath)
return currentStorage.Walk(walkPath, "blobs", func(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
})
return nil
})
if err != nil {
return err
}
return parallelWalk(listRootPath, b.walkPath)
} else {
err := currentStorage.Walk("blobs", "blobs", func(path string, info fileInfo, err error) error {
if path == "" || info.directory {
return nil
}
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
if err != nil {
return err
}
return b.walkPath("blobs")
}
return jg.Finish()
}
 
func (b blobsData) info() {
Loading
Loading
Loading
Loading
@@ -46,6 +46,10 @@ func (f *fsStorage) Walk(rootDir string, baseDir string, fn walkFunc) error {
path = path[len(baseDir):]
}
 
if path == "" {
return nil
}
fi := fileInfo{fullPath: fullPath, size: info.Size()}
return fn(path, fi, err)
})
Loading
Loading
Loading
Loading
@@ -5,6 +5,7 @@ import "sync"
type jobsData chan func()
 
var jobsRunner jobsData = make(jobsData)
var parallelWalkRunner jobsData = make(jobsData)
 
func (ch jobsData) group() *jobsGroup {
return &jobsGroup{ch: ch}
Loading
Loading
Loading
Loading
@@ -2,17 +2,20 @@ package main
 
import (
"flag"
"os"
"os/signal"
"sync"
 
"github.com/Sirupsen/logrus"
)
 
var (
debug = flag.Bool("debug", false, "Print debug messages")
verbose = flag.Bool("verbose", true, "Print verbose messages")
dryRun = flag.Bool("dry-run", true, "Dry run")
storage = flag.String("storage", "filesystem", "Storage type to use: filesystem or s3")
jobs = flag.Int("jobs", 100, "Number of concurrent jobs to execute")
debug = flag.Bool("debug", false, "Print debug messages")
verbose = flag.Bool("verbose", true, "Print verbose messages")
dryRun = flag.Bool("dry-run", true, "Dry run")
storage = flag.String("storage", "filesystem", "Storage type to use: filesystem or s3")
jobs = flag.Int("jobs", 100, "Number of concurrent jobs to execute")
parallelWalkJobs = flag.Int("parallel-walk-jobs", 100, "Number of concurrent parallel walk jobs to execute")
)
 
func main() {
Loading
Loading
@@ -44,6 +47,17 @@ func main() {
deletes := make(deletesData, 0, 1000)
 
jobsRunner.run(*jobs)
parallelWalkRunner.run(*parallelWalkJobs)
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt)
go func() {
for signal := range signals {
currentStorage.Info()
logrus.Fatalln("Signal received:", signal)
}
}()
 
var wg sync.WaitGroup
wg.Add(2)
Loading
Loading
package main
 
import (
"flag"
"fmt"
"path/filepath"
"strings"
Loading
Loading
@@ -24,6 +25,8 @@ type repositoriesData map[string]*repositoryData
 
var repositoriesLock sync.Mutex
 
var parallelRepositoryWalk = flag.Bool("parallel-repository-walk", true, "Allow to use parallel repository walker")
func newRepositoryData(name string) *repositoryData {
return &repositoryData{
name: name,
Loading
Loading
@@ -327,11 +330,9 @@ func (r repositoriesData) process(segments []string, info fileInfo) error {
return fmt.Errorf("unparseable path: %v", segments)
}
 
func (r repositoriesData) walk() error {
jg := jobsRunner.group()
logrus.Infoln("Walking REPOSITORIES...")
err := currentStorage.Walk("repositories", "repositories", func(path string, info fileInfo, err error) error {
func (r repositoriesData) walkPath(walkPath string, jg *jobsGroup) error {
logrus.Infoln("REPOSITORIES DIR:", walkPath)
return currentStorage.Walk(walkPath, "repositories", func(path string, info fileInfo, err error) error {
jg.Dispatch(func() error {
err = r.process(strings.Split(path, "/"), info)
if err != nil {
Loading
Loading
@@ -342,15 +343,28 @@ func (r repositoriesData) walk() error {
})
return nil
})
if err != nil {
return err
}
}
 
err = jg.Finish()
if err != nil {
return err
func (r repositoriesData) walk() error {
logrus.Infoln("Walking REPOSITORIES...")
jg := jobsRunner.group()
if *parallelRepositoryWalk {
err := parallelWalk("repositories", func(listPath string) error {
return r.walkPath(listPath, jg)
})
if err != nil {
return err
}
} else {
err := r.walkPath("repositories", jg)
if err != nil {
return err
}
}
return nil
return jg.Finish()
}
 
func (r repositoriesData) mark(blobs blobsData, deletes deletesData) error {
Loading
Loading
Loading
Loading
@@ -76,13 +76,21 @@ func (f *s3Storage) Walk(path string, baseDir string, fn walkFunc) error {
keyPath = keyPath[len(baseDir):]
}
 
if keyPath == "" {
continue
}
if strings.HasSuffix(keyPath, "/") {
logrus.Debugln("S3 Walk:", keyPath, "for", baseDir)
continue
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
etag: *key.ETag,
lastModified: *key.LastModified,
}
err = fn(keyPath, fi, err)
if err != nil {
return err
Loading
Loading
@@ -132,6 +140,10 @@ func (f *s3Storage) List(path string, fn walkFunc) error {
keyPath = keyPath[len(path):]
}
 
if keyPath == "" {
continue
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
Loading
Loading
@@ -146,6 +158,27 @@ func (f *s3Storage) List(path string, fn walkFunc) error {
}
}
 
for _, commonPrefix := range resp.CommonPrefixes {
prefixPath := *commonPrefix.Prefix
if strings.HasPrefix(prefixPath, path) {
prefixPath = prefixPath[len(path):]
}
if prefixPath == "" {
continue
}
fi := fileInfo{
fullPath: *commonPrefix.Prefix,
directory: true,
}
err = fn(prefixPath, fi, err)
if err != nil {
return err
}
}
if *resp.IsTruncated {
atomic.AddInt64(&f.apiCalls, 1)
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Loading
Loading
@@ -179,6 +212,7 @@ func (f *s3Storage) Read(path string, etag string) ([]byte, error) {
}
} else if os.IsNotExist(err) {
atomic.AddInt64(&f.cacheMiss, 1)
logrus.Infoln("CACHE MISS: ", path)
}
}
 
Loading
Loading
package main
 
import "time"
import (
"path/filepath"
"time"
)
 
type fileInfo struct {
fullPath string
Loading
Loading
@@ -21,3 +24,24 @@ type storageObject interface {
}
 
var currentStorage storageObject
func parallelWalk(rootPath string, fn func(string) error) error {
pwg := parallelWalkRunner.group()
err := currentStorage.List(rootPath, func(listPath string, info fileInfo, err error) error {
if !info.directory {
return nil
}
pwg.Dispatch(func() error {
walkPath := filepath.Join(rootPath, listPath)
return fn(walkPath)
})
return nil
})
if err != nil {
return err
}
return pwg.Finish()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment