Skip to content
Snippets Groups Projects
Unverified Commit 965f218e authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Introduce soft deletes (enabled by default)

parent 453fae60
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -60,7 +60,7 @@ func (b blobsData) size(digest digest) int64 {
return 0
}
 
func (b blobsData) sweep(deletes deletesData) {
func (b blobsData) sweep(deletes *deletesData) {
for _, blob := range b {
if blob.references == 0 {
deletes.schedule(blob.path(), blob.size)
Loading
Loading
Loading
Loading
@@ -15,7 +15,9 @@ var (
deletedBlobSize int64
)
 
type deletesData []string
type deletesData struct {
files []string
}
 
var deletesLock sync.Mutex
 
Loading
Loading
@@ -33,10 +35,7 @@ func (d *deletesData) schedule(path string, size int64) {
deletedOther++
}
deletedBlobSize += size
if *dryRun {
return
}
*d = append(*d, path)
d.files = append(d.files, path)
}
 
func (d *deletesData) info() {
Loading
Loading
@@ -47,19 +46,22 @@ func (d *deletesData) info() {
)
}
 
func (d *deletesData) run() {
if *dryRun {
return
}
func (d *deletesData) run(softDelete bool) {
jg := jobsRunner.group()
 
for _, path_ := range *d {
for _, path_ := range d.files {
path := path_
jg.Dispatch(func() error {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
if softDelete {
err := currentStorage.Move(path, filepath.Join("backup", path))
if err != nil {
logrus.Fatalln(err)
}
} else {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
}
}
return nil
})
Loading
Loading
Loading
Loading
@@ -6,7 +6,6 @@ import (
"os"
"path/filepath"
"strings"
"syscall"
)
 
type fsStorage struct {
Loading
Loading
@@ -22,6 +21,10 @@ func (f *fsStorage) fullPath(path string) string {
return filepath.Join(*fsRootDir, "docker", "registry", "v2", path)
}
 
func (f *fsStorage) backupPath(path string) string {
return filepath.Join(*fsRootDir, "docker_backup", "registry", "v2", path)
}
func (f *fsStorage) Walk(rootDir string, baseDir string, fn walkFunc) error {
rootDir, err := filepath.Abs(f.fullPath(rootDir))
if err != nil {
Loading
Loading
@@ -95,7 +98,14 @@ func (f *fsStorage) Read(path string, etag string) ([]byte, error) {
}
 
func (f *fsStorage) Delete(path string) error {
return syscall.Unlink(f.fullPath(path))
return os.Remove(f.fullPath(path))
}
func (f *fsStorage) Move(path, newPath string) error {
path = f.fullPath(path)
newPath = f.backupPath(newPath)
os.MkdirAll(filepath.Dir(newPath), 0700)
return os.Rename(path, newPath)
}
 
func (f *fsStorage) Info() {
Loading
Loading
Loading
Loading
@@ -12,8 +12,9 @@ import (
var (
debug = flag.Bool("debug", false, "Print debug messages")
verbose = flag.Bool("verbose", true, "Print verbose messages")
dryRun = flag.Bool("dry-run", true, "Dry run")
storage = flag.String("storage", "filesystem", "Storage type to use: filesystem or s3")
delete = flag.Bool("delete", false, "Delete data, instead of dry run")
softDelete = flag.Bool("soft-delete", true, "When deleting, do not remove, but move to backup/ folder")
storage = flag.String("storage", "", "Storage type to use: filesystem or s3")
jobs = flag.Int("jobs", 10, "Number of concurrent jobs to execute")
parallelWalkJobs = flag.Int("parallel-walk-jobs", 10, "Number of concurrent parallel walk jobs to execute")
ignoreBlobs = flag.Bool("ignore-blobs", false, "Ignore blobs processing and recycling")
Loading
Loading
@@ -56,7 +57,7 @@ func main() {
 
blobs := make(blobsData)
repositories := make(repositoriesData)
deletes := make(deletesData, 0, 1000)
deletes := new(deletesData)
 
jobsRunner.run(*jobs)
parallelWalkRunner.run(*parallelWalkJobs)
Loading
Loading
@@ -107,14 +108,14 @@ func main() {
logrus.Infoln("Sweeping BLOBS...")
blobs.sweep(deletes)
 
deletes.info()
if !*dryRun {
logrus.Infoln("Sweeping...")
deletes.run()
if *delete {
logrus.Infoln("Deleting...")
deletes.run(*softDelete)
}
 
logrus.Infoln("Summary...")
repositories.info(blobs)
blobs.info()
deletes.info()
currentStorage.Info()
}
Loading
Loading
@@ -106,7 +106,7 @@ func (r *repositoryData) markManifestLayers(blobs blobsData, revision digest) er
return nil
}
 
func (r *repositoryData) markManifestSignatures(deletes deletesData, blobs blobsData, revision digest, signatures []digest) error {
func (r *repositoryData) markManifestSignatures(deletes *deletesData, blobs blobsData, revision digest, signatures []digest) error {
if r.manifests[revision] > 0 {
for _, signature := range signatures {
blobs.mark(signature)
Loading
Loading
@@ -123,7 +123,7 @@ func (r *repositoryData) markLayer(blobs blobsData, revision digest) error {
return blobs.mark(revision)
}
 
func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
func (r *repositoryData) mark(blobs blobsData, deletes *deletesData) error {
for name, t := range r.tags {
err := t.mark(blobs, deletes)
if err != nil {
Loading
Loading
@@ -398,7 +398,7 @@ func (r repositoriesData) walk() error {
return jg.Finish()
}
 
func (r repositoriesData) mark(blobs blobsData, deletes deletesData) error {
func (r repositoriesData) mark(blobs blobsData, deletes *deletesData) error {
jg := jobsRunner.group()
 
for _, repository_ := range r {
Loading
Loading
package main
 
import (
"errors"
"flag"
"io/ioutil"
"os"
Loading
Loading
@@ -18,11 +17,12 @@ import (
const listMax = 1000
 
type s3Storage struct {
S3 *s3.S3
apiCalls int64
cacheHits int64
cacheError int64
cacheMiss int64
S3 *s3.S3
apiCalls int64
expensiveApiCalls int64
cacheHits int64
cacheError int64
cacheMiss int64
}
 
var s3RootDir = flag.String("s3-root-dir", "", "s3 root directory")
Loading
Loading
@@ -45,6 +45,10 @@ func (f *s3Storage) fullPath(path string) string {
return filepath.Join(*s3RootDir, "docker", "registry", "v2", path)
}
 
func (f *s3Storage) backupPath(path string) string {
return filepath.Join(*s3RootDir, "docker-backup", "registry", "v2", path)
}
func (f *s3Storage) Walk(path string, baseDir string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
Loading
Loading
@@ -241,10 +245,28 @@ func (f *s3Storage) Read(path string, etag string) ([]byte, error) {
}
 
func (f *s3Storage) Delete(path string) error {
return errors.New("not supported")
atomic.AddInt64(&f.expensiveApiCalls, 1)
_, err := f.S3.DeleteObject(&s3.DeleteObjectInput{
Bucket: s3Bucket,
Key: aws.String(f.fullPath(path)),
})
return err
}
func (f *s3Storage) Move(path, newPath string) error {
atomic.AddInt64(&f.expensiveApiCalls, 1)
_, err := f.S3.CopyObject(&s3.CopyObjectInput{
CopySource: aws.String("/" + *s3Bucket + "/" + f.fullPath(path)),
Bucket: s3Bucket,
Key: aws.String(f.backupPath(newPath)),
})
if err != nil {
return err
}
return f.Delete(path)
}
 
func (f *s3Storage) Info() {
logrus.Infoln("S3 INFO: API calls:", f.apiCalls,
logrus.Infoln("S3 INFO: API calls/expensive:", f.apiCalls, f.expensiveApiCalls,
"Cache (hit/miss/error):", f.cacheHits, f.cacheMiss, f.cacheError)
}
Loading
Loading
@@ -20,6 +20,7 @@ type storageObject interface {
List(path string, fn walkFunc) error
Read(path string, etag string) ([]byte, error)
Delete(path string) error
Move(path, newPath string) error
Info()
}
 
Loading
Loading
Loading
Loading
@@ -26,7 +26,7 @@ func (t *tagData) versionLinkPath(version digest) string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "index", version.path(), "link")
}
 
func (t *tagData) mark(blobs blobsData, deletes deletesData) error {
func (t *tagData) mark(blobs blobsData, deletes *deletesData) error {
if t.current.valid() {
t.repository.markManifest(t.current)
} else {
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment