Skip to content
Snippets Groups Projects
Unverified Commit 4b666c54 authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

A lot of improvements

- Added s3 caching, based on etags
- Added parallel repositories execution
- Added parallel manifest loading
- Added locks in number of places
- Added s3 info: api calls, cache hit/miss
parent fc89edaa
No related branches found
No related tags found
No related merge requests found
docker-distribution-pruner
examples/registry/
config-dev.yml
/docker-distribution-pruner
/examples/registry/
/config-dev.yml
/tmp-cache/
Loading
Loading
@@ -2,7 +2,9 @@
 
Go to Docker Distribution: https://github.com/docker/distribution/.
 
Simple Go-lang docker-distribution based application to clean all old revisions from Docker Distribution based registry (also GitLab Container Registry)
Highly efficient Garbage Collector to clean all old revisions from Docker Distribution based registry (also GitLab Container Registry).
It uses optimised file accesses and API calls to create walk DAG.
 
**It is only for testing purposes now. Do not yet use that for production data.**
 
Loading
Loading
@@ -27,10 +29,7 @@ docker-distribution-pruner -config /path/to/docker/distribution/config/file -dry
It is highly not advised to change these options as it can leave left-overs in repository.
 
```
-delete-versions=true - delete unreferenced versions for each found tag of the repository repository
-delete-manifests=true - delete unreferenced manifests for each found repository, this unlinks all previous revisions of tags
-delete-blobs=true - delete unreferenced blobs for each found repository, this unlinks all blobs referenced in context of this repository
-delete-global-blobs=true - physically delete manifests and blobs that are no longer used, physically removes data
-delete-old-tag-versions=true - delete old versions for each found tag of the repository repository
```
 
### License
Loading
Loading
Loading
Loading
@@ -5,12 +5,14 @@ import (
"github.com/Sirupsen/logrus"
"path/filepath"
"strings"
"sync"
)
 
type blobData struct {
name string
size int64
references int64
etag string
}
 
func (b *blobData) path() string {
Loading
Loading
@@ -19,7 +21,12 @@ func (b *blobData) path() string {
 
type blobsData map[string]*blobData
 
var blobsLock sync.Mutex
func (b blobsData) mark(name string) error {
blobsLock.Lock()
defer blobsLock.Unlock()
blob := b[name]
if blob == nil {
return fmt.Errorf("blob not found: %v", name)
Loading
Loading
@@ -28,6 +35,14 @@ func (b blobsData) mark(name string) error {
return nil
}
 
func (b blobsData) etag(name string) string {
blob := b[name]
if blob != nil {
return blob.etag
}
return ""
}
func (b blobsData) sweep(deletes deletesData) {
for _, blob := range b {
if blob.references == 0 {
Loading
Loading
@@ -61,13 +76,18 @@ func (b blobsData) addBlob(segments []string, info fileInfo) error {
blob := &blobData{
name: name,
size: info.size,
etag: info.etag,
}
b[name] = blob
return nil
}
 
func (b blobsData) walk(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
func (b blobsData) walk() error {
logrus.Infoln("Walking BLOBS...")
err := currentStorage.Walk("blobs", func(path string, info fileInfo, err error) error {
err = b.addBlob(strings.Split(path, "/"), info)
logrus.Infoln("BLOB:", path, ":", err)
return err
})
return err
}
Loading
Loading
@@ -4,6 +4,7 @@ import (
"path/filepath"
 
"github.com/Sirupsen/logrus"
"sync"
)
 
var (
Loading
Loading
@@ -14,9 +15,14 @@ var (
 
type deletesData []string
 
var deletesLock sync.Mutex
const linkFileSize = int64(len("sha256:") + 64)
 
func (d *deletesData) schedule(path string, size int64) {
deletesLock.Lock()
defer deletesLock.Unlock()
logrus.Infoln("DELETE", path, size)
name := filepath.Base(path)
if name == "link" {
Loading
Loading
@@ -36,10 +42,18 @@ func (d *deletesData) info() {
}
 
func (d *deletesData) run() {
for _, path := range *d {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
}
jg := jobsRunner.group()
for _, path_ := range *d {
path := path_
jg.Dispatch(func() error {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
}
return nil
})
}
jg.Finish()
}
Loading
Loading
@@ -45,10 +45,13 @@ func (f *fsStorage) Walk(rootDir string, fn walkFunc) error {
})
}
 
func (f *fsStorage) Read(path string) ([]byte, error) {
func (f *fsStorage) Read(path string, etag string) ([]byte, error) {
return ioutil.ReadFile(f.fullPath(path))
}
 
func (f *fsStorage) Delete(path string) error {
return syscall.Unlink(f.fullPath(path))
}
func (f *fsStorage) Info() {
}
jobs.go 0 → 100644
package main
import "sync"
type jobsData chan func()
var jobsRunner jobsData = make(jobsData)
func (ch jobsData) group() *jobsGroup {
return &jobsGroup{ch: ch}
}
func (ch jobsData) run(max int) {
for max > 0 {
go func() {
for job := range ch {
job()
}
}()
max--
}
}
func (ch jobsData) close() {
close(ch)
}
type jobsGroup struct {
ch jobsData
wg sync.WaitGroup
err error
lock sync.Mutex
}
func (g *jobsGroup) Dispatch(fn func() error) {
g.wg.Add(1)
g.ch <- func() {
var err error
defer func() {
if err != nil {
g.lock.Lock()
g.err = err
g.lock.Unlock()
}
}()
defer g.wg.Done()
err = fn()
}
}
func (g *jobsGroup) Finish() error {
g.wg.Wait()
return g.err
}
Loading
Loading
@@ -24,8 +24,15 @@ func analyzeLink(args []string) (string, error) {
return args[1], nil
}
 
func readLink(path string) (string, error) {
data, err := currentStorage.Read(path)
func compareEtag(data []byte, etag string) bool {
hash := md5.Sum(data)
hex := hex.EncodeToString(hash[:])
hex = "\"" + hex + "\""
return etag == hex
}
func readLink(path string, etag string) (string, error) {
data, err := currentStorage.Read(path, etag)
if err != nil {
return "", nil
}
Loading
Loading
@@ -47,16 +54,13 @@ func verifyLink(link string, info fileInfo) error {
// If we have e-tag, let's verify e-tag
if info.etag != "" {
content := "sha256:" + link
hash := md5.Sum([]byte(content))
hex := hex.EncodeToString(hash[:])
hex = "\"" + hex + "\""
if info.etag == hex {
if compareEtag([]byte(content), info.etag) {
return nil
} else {
return fmt.Errorf("etag for %s is not equal %s", link, info.etag)
}
return fmt.Errorf("etag for %s is not equal %s to %s", link, info.etag, hex)
} else {
readed, err := readLink(info.fullPath)
readed, err := readLink(info.fullPath, info.etag)
if err != nil {
return err
}
Loading
Loading
Loading
Loading
@@ -4,6 +4,7 @@ import (
"flag"
 
"github.com/Sirupsen/logrus"
"sync"
)
 
var (
Loading
Loading
@@ -11,6 +12,7 @@ var (
verbose = flag.Bool("verbose", true, "Print verbose messages")
dryRun = flag.Bool("dry-run", true, "Dry run")
storage = flag.String("storage", "filesystem", "Storage type to use: filesystem or s3")
jobs = flag.Int("jobs", 100, "Number of concurrent jobs to execute")
)
 
func main() {
Loading
Loading
@@ -41,17 +43,30 @@ func main() {
repositories := make(repositoriesData)
deletes := make(deletesData, 0, 1000)
 
logrus.Infoln("Walking REPOSITORIES...")
err = currentStorage.Walk("repositories", repositories.walk)
if err != nil {
logrus.Fatalln(err)
}
jobsRunner.run(*jobs)
 
logrus.Infoln("Walking BLOBS...")
err = currentStorage.Walk("blobs", blobs.walk)
if err != nil {
logrus.Fatalln(err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err = repositories.walk()
if err != nil {
logrus.Fatalln(err)
}
}()
go func() {
defer wg.Done()
err = blobs.walk()
if err != nil {
logrus.Fatalln(err)
}
}()
wg.Wait()
 
logrus.Infoln("Marking REPOSITORIES...")
err = repositories.mark(blobs, deletes)
Loading
Loading
@@ -68,4 +83,6 @@ func main() {
logrus.Infoln("Sweeping...")
deletes.run()
}
currentStorage.Info()
}
Loading
Loading
@@ -10,16 +10,23 @@ import (
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"sync"
"github.com/Sirupsen/logrus"
)
 
type manifestData struct {
name string
layers []string
name string
layers []string
loaded bool
loadErr error
loadLock sync.Mutex
}
 
type manifestsData map[string]*manifestData
 
var manifests manifestsData = make(map[string]*manifestData)
var manifestsLock sync.Mutex
 
func deserializeManifest(data []byte) (distribution.Manifest, error) {
var versioned manifest.Versioned
Loading
Loading
@@ -55,8 +62,10 @@ func (m *manifestData) path() string {
return filepath.Join("blobs", "sha256", m.name[0:2], m.name, "data")
}
 
func (m *manifestData) load() error {
data, err := currentStorage.Read(m.path())
func (m *manifestData) load(blobs blobsData) error {
logrus.Println("MANIFEST:", m.path(), ": loading...")
data, err := currentStorage.Read(m.path(), blobs.etag(m.name))
if err != nil {
return err
}
Loading
Loading
@@ -72,18 +81,25 @@ func (m *manifestData) load() error {
return nil
}
 
func (m manifestsData) get(name string) (*manifestData, error) {
func (m manifestsData) get(name string, blobs blobsData) (*manifestData, error) {
manifestsLock.Lock()
manifest := m[name]
if manifest == nil {
manifest = &manifestData{
name: name,
}
m[name] = manifest
}
manifestsLock.Unlock()
if !manifest.loaded {
manifest.loadLock.Lock()
defer manifest.loadLock.Unlock()
 
err := manifest.load()
if err != nil {
return nil, err
if !manifest.loaded {
manifest.loadErr = manifest.load(blobs)
manifest.loaded = true
}
m[name] = manifest
}
return manifest, nil
return manifest, manifest.loadErr
}
Loading
Loading
@@ -6,6 +6,7 @@ import (
"strings"
 
"github.com/Sirupsen/logrus"
"sync"
)
 
type repositoryData struct {
Loading
Loading
@@ -14,9 +15,11 @@ type repositoryData struct {
manifests map[string]int
tags map[string]*tag
uploads map[string]int
lock sync.Mutex
}
 
type repositoriesData map[string]*repositoryData
var repositoriesLock sync.Mutex
 
func newRepositoryData(name string) *repositoryData {
return &repositoryData{
Loading
Loading
@@ -41,6 +44,9 @@ func (r *repositoryData) uploadPath(upload string) string {
}
 
func (r *repositoryData) tag(name string) *tag {
r.lock.Lock()
defer r.lock.Unlock()
t := r.tags[name]
if t == nil {
t = &tag{
Loading
Loading
@@ -53,17 +59,28 @@ func (r *repositoryData) tag(name string) *tag {
return t
}
 
func (r *repositoryData) markManifest(blobs blobsData, name string) error {
func (r *repositoryData) markManifest(name string) error {
r.lock.Lock()
defer r.lock.Unlock()
r.manifests[name]++
return nil
}
func (r *repositoryData) markManifestLayers(blobs blobsData, name string) error {
err := blobs.mark(name)
if err != nil {
return err
}
 
manifest, err := manifests.get(name)
manifest, err := manifests.get(name, blobs)
if err != nil {
return err
}
 
r.lock.Lock()
defer r.lock.Unlock()
for _, layer := range manifest.layers {
_, ok := r.layers[layer]
if !ok {
Loading
Loading
@@ -88,9 +105,10 @@ func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
}
}
 
for name, used := range r.manifests {
for name_, used := range r.manifests {
name := name_
if used > 0 {
err := r.markManifest(blobs, name)
err := r.markManifestLayers(blobs, name)
if err != nil {
return err
}
Loading
Loading
@@ -99,7 +117,8 @@ func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
}
}
 
for name, used := range r.layers {
for name_, used := range r.layers {
name := name_
if used > 0 {
err := r.markLayer(blobs, name)
if err != nil {
Loading
Loading
@@ -116,6 +135,9 @@ func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
func (r repositoriesData) get(path []string) *repositoryData {
repositoryName := strings.Join(path, "/")
 
repositoriesLock.Lock()
defer repositoriesLock.Unlock()
repository := r[repositoryName]
if repository == nil {
repository = newRepositoryData(repositoryName)
Loading
Loading
@@ -137,6 +159,9 @@ func (r *repositoryData) addLayer(args []string, info fileInfo) error {
return err
}
 
r.lock.Lock()
defer r.lock.Unlock()
r.layers[link] = 0
return nil
}
Loading
Loading
@@ -153,6 +178,9 @@ func (r *repositoryData) addManifestRevision(args []string, info fileInfo) error
return err
}
 
r.lock.Lock()
defer r.lock.Unlock()
r.manifests[link] = 0
return nil
}
Loading
Loading
@@ -219,18 +247,42 @@ func (r repositoriesData) process(segments []string, info fileInfo) error {
return fmt.Errorf("unparseable path: %v", segments)
}
 
func (r repositoriesData) walk(path string, info fileInfo, err error) error {
err = r.process(strings.Split(path, "/"), info)
logrus.Infoln("REPOSITORY:", path, ":", err)
return err
func (r repositoriesData) walk() error {
jg := jobsRunner.group()
logrus.Infoln("Walking REPOSITORIES...")
err := currentStorage.Walk("repositories", func(path string, info fileInfo, err error) error {
jg.Dispatch(func() error {
err = r.process(strings.Split(path, "/"), info)
logrus.Infoln("REPOSITORY:", path, ":", err)
return err
})
return nil
})
if err != nil {
return err
}
err = jg.Finish()
if err != nil {
return err
}
return nil
}
 
func (r repositoriesData) mark(blobs blobsData, deletes deletesData) error {
for _, repository := range r {
err := repository.mark(blobs, deletes)
if err != nil {
return err
}
jg := jobsRunner.group()
for _, repository_ := range r {
repository := repository_
jg.Dispatch(func() error {
return repository.mark(blobs, deletes)
})
}
err := jg.Finish()
if err != nil {
return err
}
return nil
}
Loading
Loading
@@ -6,21 +6,29 @@ import (
"io/ioutil"
"path/filepath"
"strings"
"os"
 
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"sync/atomic"
"github.com/Sirupsen/logrus"
)
 
const listMax = 1000
 
type s3Storage struct {
S3 *s3.S3
apiCalls int64
cacheHits int64
cacheError int64
cacheMiss int64
}
 
var s3RootDir = flag.String("s3-root-dir", "", "s3 root directory")
var s3Bucket = flag.String("s3-bucket", "", "s3 bucket")
var s3Region = flag.String("s3-region", "us-east-1", "s3 region")
var s3CacheStorage = flag.String("s3-storage-cache", "tmp-cache", "s3 cache")
 
func newS3Storage() storageObject {
sess, err := session.NewSession()
Loading
Loading
@@ -43,6 +51,7 @@ func (f *s3Storage) Walk(path string, fn walkFunc) error {
path = path + "/"
}
 
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
Loading
Loading
@@ -76,6 +85,7 @@ func (f *s3Storage) Walk(path string, fn walkFunc) error {
}
 
if *resp.IsTruncated {
atomic.AddInt64(&f.apiCalls, 1)
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
Loading
Loading
@@ -93,7 +103,23 @@ func (f *s3Storage) Walk(path string, fn walkFunc) error {
return nil
}
 
func (f *s3Storage) Read(path string) ([]byte, error) {
func (f *s3Storage) Read(path string, etag string) ([]byte, error) {
cachePath := filepath.Join(*s3CacheStorage, path)
if etag != "" && *s3CacheStorage != "" {
file, err := ioutil.ReadFile(cachePath)
if err == nil {
if compareEtag(file, etag) {
atomic.AddInt64(&f.cacheHits, 1)
return file, nil
} else {
atomic.AddInt64(&f.cacheError, 1)
}
} else if os.IsNotExist(err) {
atomic.AddInt64(&f.cacheMiss, 1)
}
}
atomic.AddInt64(&f.apiCalls, 1)
resp, err := f.S3.GetObject(&s3.GetObjectInput{
Bucket: s3Bucket,
Key: aws.String(f.fullPath(path)),
Loading
Loading
@@ -104,9 +130,24 @@ func (f *s3Storage) Read(path string) ([]byte, error) {
}
defer resp.Body.Close()
 
return ioutil.ReadAll(resp.Body)
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if etag != "" && *s3CacheStorage != "" {
os.MkdirAll(filepath.Dir(cachePath), 0700)
ioutil.WriteFile(cachePath, data, 0600)
}
return data, nil
}
 
func (f *s3Storage) Delete(path string) error {
return errors.New("not supported")
}
func (f *s3Storage) Info() {
logrus.Infoln("S3: API calls:", f.apiCalls,
"Cache (hit/miss/error):", f.cacheHits, f.cacheMiss, f.cacheError)
}
Loading
Loading
@@ -13,8 +13,9 @@ type walkFunc func(path string, info fileInfo, err error) error
 
type storageObject interface {
Walk(path string, fn walkFunc) error
Read(path string) ([]byte, error)
Read(path string, etag string) ([]byte, error)
Delete(path string) error
Info()
}
 
var currentStorage storageObject
Loading
Loading
@@ -4,8 +4,11 @@ import (
"path/filepath"
 
"github.com/Sirupsen/logrus"
"flag"
)
 
var deleteOldTagVersions = flag.Bool("delete-old-tag-versions", true, "Delete old tag versions")
type tag struct {
repository *repositoryData
name string
Loading
Loading
@@ -23,12 +26,16 @@ func (t *tag) versionLinkPath(version string) string {
 
func (t *tag) mark(blobs blobsData, deletes deletesData) error {
if t.current != "" {
t.repository.manifests[t.current]++
t.repository.markManifest(t.current)
}
 
for _, version := range t.versions {
if version != t.current {
deletes.schedule(t.versionLinkPath(version), linkFileSize)
if *deleteOldTagVersions {
deletes.schedule(t.versionLinkPath(version), linkFileSize)
} else {
t.repository.markManifest(version)
}
}
}
 
Loading
Loading
@@ -38,7 +45,7 @@ func (t *tag) mark(blobs blobsData, deletes deletesData) error {
func (t *tag) setCurrent(info fileInfo) error {
//INFO[0000] /test2/_manifests/tags/latest/current/link
 
readLink, err := readLink(t.currentLinkPath())
readLink, err := readLink(t.currentLinkPath(), info.etag)
if err != nil {
return err
}
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment