Skip to content
Snippets Groups Projects
Unverified Commit 9437ada7 authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Write proper sweep

parent 52cfc5da
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -60,12 +60,25 @@ func (b blobsData) size(digest digest) int64 {
return 0
}
 
func (b blobsData) sweep(deletes *deletesData) {
for _, blob := range b {
if blob.references == 0 {
deletes.schedule(blob.path(), blob.size)
}
func (b blobsData) sweep() error {
jg := jobsRunner.group()
for _, blob_ := range b {
blob := blob_
jg.Dispatch(func() error {
if blob.references > 0 {
return nil
}
err := deleteFile(blob.path(), blob.size)
if err != nil {
return err
}
return nil
})
}
return jg.Finish()
}
 
func (b blobsData) addBlob(segments []string, info fileInfo) error {
Loading
Loading
package main
 
import (
"flag"
"path/filepath"
"sync"
"sync/atomic"
 
"github.com/Sirupsen/logrus"
"github.com/dustin/go-humanize"
)
 
var (
deletedLinks int
deletedBlobs int
deletedOther int
deletedLinks int32
deletedBlobs int32
deletedOther int32
deletedBlobSize int64
)
 
type deletesData struct {
files []string
}
var deletesLock sync.Mutex
func (d *deletesData) schedule(path string, size int64) {
deletesLock.Lock()
defer deletesLock.Unlock()
var (
delete = flag.Bool("delete", false, "Delete data, instead of dry run")
softDelete = flag.Bool("soft-delete", true, "When deleting, do not remove, but move to backup/ folder")
)
 
func deleteFile(path string, size int64) error {
logrus.Infoln("DELETE", path, size)
name := filepath.Base(path)
if name == "link" {
deletedLinks++
atomic.AddInt32(&deletedLinks, 1)
} else if name == "data" {
deletedBlobs++
atomic.AddInt32(&deletedBlobs, 1)
} else {
atomic.AddInt32(&deletedOther, 1)
}
atomic.AddInt64(&deletedBlobSize, size)
if !*delete {
// Do not delete, only write
return nil
}
if *softDelete {
return currentStorage.Move(path, filepath.Join("backup", path))
} else {
deletedOther++
return currentStorage.Delete(path)
}
deletedBlobSize += size
d.files = append(d.files, path)
}
 
func (d *deletesData) info() {
func deletesInfo() {
logrus.Warningln("DELETEABLE INFO:", deletedLinks, "links,",
deletedBlobs, "blobs,",
deletedOther, "other,",
humanize.Bytes(uint64(deletedBlobSize)),
)
}
func (d *deletesData) run(softDelete bool) {
jg := jobsRunner.group()
for _, path_ := range d.files {
path := path_
jg.Dispatch(func() error {
if softDelete {
err := currentStorage.Move(path, filepath.Join("backup", path))
if err != nil {
logrus.Fatalln(err)
}
} else {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
}
}
return nil
})
}
jg.Finish()
}
Loading
Loading
@@ -12,8 +12,6 @@ import (
var (
debug = flag.Bool("debug", false, "Print debug messages")
verbose = flag.Bool("verbose", true, "Print verbose messages")
delete = flag.Bool("delete", false, "Delete data, instead of dry run")
softDelete = flag.Bool("soft-delete", true, "When deleting, do not remove, but move to backup/ folder")
storage = flag.String("storage", "", "Storage type to use: filesystem or s3")
jobs = flag.Int("jobs", 10, "Number of concurrent jobs to execute")
parallelWalkJobs = flag.Int("parallel-walk-jobs", 10, "Number of concurrent parallel walk jobs to execute")
Loading
Loading
@@ -57,7 +55,6 @@ func main() {
 
blobs := make(blobsData)
repositories := make(repositoriesData)
deletes := new(deletesData)
 
jobsRunner.run(*jobs)
parallelWalkRunner.run(*parallelWalkJobs)
Loading
Loading
@@ -100,22 +97,26 @@ func main() {
wg.Wait()
 
logrus.Infoln("Marking REPOSITORIES...")
err = repositories.mark(blobs, deletes)
err = repositories.mark(blobs)
if err != nil {
logErrorln(err)
}
 
logrus.Infoln("Sweeping BLOBS...")
blobs.sweep(deletes)
logrus.Infoln("Sweeping REPOSITORIES...")
err = repositories.sweep()
if err != nil {
logErrorln(err)
}
 
if *delete {
logrus.Infoln("Deleting...")
deletes.run(*softDelete)
logrus.Infoln("Sweeping BLOBS...")
err = blobs.sweep()
if err != nil {
logErrorln(err)
}
 
logrus.Infoln("Summary...")
repositories.info(blobs)
blobs.info()
deletes.info()
deletesInfo()
currentStorage.Info()
}
Loading
Loading
@@ -106,14 +106,26 @@ func (r *repositoryData) markManifestLayers(blobs blobsData, revision digest) er
return nil
}
 
func (r *repositoryData) markManifestSignatures(deletes *deletesData, blobs blobsData, revision digest, signatures []digest) error {
func (r *repositoryData) markManifestSignatures(blobs blobsData, revision digest, signatures []digest) error {
if r.manifests[revision] == 0 {
return nil
}
for _, signature := range signatures {
blobs.mark(signature)
}
return nil
}
func (r *repositoryData) sweepManifestSignatures(revision digest, signatures []digest) error {
if r.manifests[revision] > 0 {
for _, signature := range signatures {
blobs.mark(signature)
}
} else {
for _, signature := range signatures {
deletes.schedule(r.manifestRevisionSignaturePath(revision, signature), digestReferenceSize)
return nil
}
for _, signature := range signatures {
err := deleteFile(r.manifestRevisionSignaturePath(revision, signature), digestReferenceSize)
if err != nil {
return err
}
}
return nil
Loading
Loading
@@ -123,9 +135,9 @@ func (r *repositoryData) markLayer(blobs blobsData, revision digest) error {
return blobs.mark(revision)
}
 
func (r *repositoryData) mark(blobs blobsData, deletes *deletesData) error {
func (r *repositoryData) mark(blobs blobsData) error {
for name, t := range r.tags {
err := t.mark(blobs, deletes)
err := t.mark(blobs)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "TAG:", name, "ERROR:", err)
Loading
Loading
@@ -136,22 +148,22 @@ func (r *repositoryData) mark(blobs blobsData, deletes *deletesData) error {
}
 
for revision, used := range r.manifests {
if used > 0 {
err := r.markManifestLayers(blobs, revision)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST:", revision, "ERROR:", err)
continue
}
return err
if used == 0 {
continue
}
err := r.markManifestLayers(blobs, revision)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST:", revision, "ERROR:", err)
continue
}
} else {
deletes.schedule(r.manifestRevisionPath(revision), digestReferenceSize)
return err
}
}
 
for revision, signatures := range r.manifestSignatures {
err := r.markManifestSignatures(deletes, blobs, revision, signatures)
err := r.markManifestSignatures(blobs, revision, signatures)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST SIGNATURE:", revision, "ERROR:", err)
Loading
Loading
@@ -162,17 +174,72 @@ func (r *repositoryData) mark(blobs blobsData, deletes *deletesData) error {
}
 
for digest, used := range r.layers {
if used == 0 {
continue
}
err := r.markLayer(blobs, digest)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "LAYER:", digest, "ERROR:", err)
continue
}
return err
}
}
return nil
}
func (r *repositoryData) sweep() error {
for name, t := range r.tags {
err := t.sweep()
if err != nil {
if *softErrors {
logrus.Errorln("SWEEP:", r.name, "TAG:", name, "ERROR:", err)
continue
}
return err
}
}
for revision, used := range r.manifests {
if used > 0 {
err := r.markLayer(blobs, digest)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "LAYER:", digest, "ERROR:", err)
continue
}
return err
continue
}
err := deleteFile(r.manifestRevisionPath(revision), digestReferenceSize)
if err != nil {
if *softErrors {
logrus.Errorln("SWEEP:", r.name, "MANIFEST:", revision, "ERROR:", err)
continue
}
} else {
deletes.schedule(r.layerLinkPath(digest), digestReferenceSize)
return err
}
}
for revision, signatures := range r.manifestSignatures {
err := r.sweepManifestSignatures(revision, signatures)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST SIGNATURES:", revision, "ERROR:", err)
continue
}
return err
}
}
for digest, used := range r.layers {
if used > 0 {
continue
}
err := deleteFile(r.layerLinkPath(digest), digestReferenceSize)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "LAYER:", digest, "ERROR:", err)
continue
}
return err
}
}
 
Loading
Loading
@@ -398,13 +465,30 @@ func (r repositoriesData) walk() error {
return jg.Finish()
}
 
func (r repositoriesData) mark(blobs blobsData, deletes *deletesData) error {
func (r repositoriesData) mark(blobs blobsData) error {
jg := jobsRunner.group()
for _, repository_ := range r {
repository := repository_
jg.Dispatch(func() error {
return repository.mark(blobs)
})
}
err := jg.Finish()
if err != nil {
return err
}
return nil
}
func (r repositoriesData) sweep() error {
jg := jobsRunner.group()
 
for _, repository_ := range r {
repository := repository_
jg.Dispatch(func() error {
return repository.mark(blobs, deletes)
return repository.sweep()
})
}
 
Loading
Loading
Loading
Loading
@@ -26,23 +26,46 @@ func (t *tagData) versionLinkPath(version digest) string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "index", version.path(), "link")
}
 
func (t *tagData) mark(blobs blobsData, deletes *deletesData) error {
func (t *tagData) mark(blobs blobsData) error {
if t.current.valid() {
t.repository.markManifest(t.current)
} else {
deletes.schedule(t.currentLinkPath(), digestReferenceSize)
}
 
for _, version := range t.versions {
if version != t.current {
if *deleteOldTagVersions {
deletes.schedule(t.versionLinkPath(version), digestReferenceSize)
} else {
t.repository.markManifest(version)
}
if version == t.current {
continue
}
if *deleteOldTagVersions {
continue
}
t.repository.markManifest(version)
}
return nil
}
func (t *tagData) sweep() error {
if !t.current.valid() {
err := deleteFile(t.currentLinkPath(), digestReferenceSize)
if err != nil {
return err
}
}
 
for _, version := range t.versions {
if version == t.current {
continue
}
if *deleteOldTagVersions {
err := deleteFile(t.versionLinkPath(version), digestReferenceSize)
if err != nil {
return err
}
}
}
return nil
}
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment