Skip to content
Snippets Groups Projects
Unverified Commit 144a7cf3 authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Support S3 storage

parent a5515382
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -28,10 +28,10 @@ func (b blobsData) mark(name string) error {
return nil
}
 
func (b blobsData) sweep() {
func (b blobsData) sweep(deletes deletesData) {
for _, blob := range b {
if blob.references == 0 {
scheduleDelete(blob.path(), blob.size)
deletes.schedule(blob.path(), blob.size)
}
}
}
Loading
Loading
Loading
Loading
@@ -4,19 +4,19 @@ import (
"path/filepath"
 
"github.com/Sirupsen/logrus"
"syscall"
)
 
var (
deletedLinks int
deletedBlobs int
deletedBlobSize int64
deletes []string
)
 
type deletesData []string
const linkFileSize = int64(len("sha256:") + 64)
 
func scheduleDelete(path string, size int64) {
func (d *deletesData) schedule(path string, size int64) {
logrus.Infoln("DELETE", path, size)
name := filepath.Base(path)
if name == "link" {
Loading
Loading
@@ -25,12 +25,19 @@ func scheduleDelete(path string, size int64) {
deletedBlobs++
}
deletedBlobSize += size
deletes = append(deletes, path)
*d = append(*d, path)
}
func (d *deletesData) info() {
logrus.Warningln("Deleted:", deletedLinks, "links,",
deletedBlobs, "blobs,",
deletedBlobSize/1024/1024, "in MB",
)
}
 
func runDeletes() {
for _, path := range deletes {
err := syscall.Unlink(path)
func (d *deletesData) run() {
for _, path := range *d {
err := currentStorage.Delete(path)
if err != nil {
logrus.Fatalln(err)
}
Loading
Loading
package main
 
import (
"io/ioutil"
"path/filepath"
"syscall"
"flag"
"io/ioutil"
"os"
"path/filepath"
"strings"
"syscall"
)
 
type fsStorage struct {
Loading
Loading
@@ -26,16 +26,18 @@ func (f *fsStorage) Walk(rootDir string, fn walkFunc) error {
}
rootDir += "/"
 
return filepath.Walk(rootDir, func(path string, info os.FileInfo, err error) error {
return filepath.Walk(rootDir, func(fullPath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
 
path := fullPath
if strings.HasPrefix(path, rootDir) {
path = path[len(rootDir):]
}
 
fi := fileInfo{path: path, size: info.Size()}
fi := fileInfo{fullPath: fullPath, size: info.Size()}
return fn(path, fi, err)
})
}
Loading
Loading
package main
 
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"strings"
"crypto/md5"
"encoding/hex"
)
 
func analyzeLink(args []string) (string, error) {
Loading
Loading
@@ -49,19 +49,22 @@ func verifyLink(link string, info fileInfo) error {
content := "sha256:" + link
hash := md5.Sum([]byte(content))
hex := hex.EncodeToString(hash[:])
hex = "\"" + hex + "\""
if info.etag == hex {
return nil
}
}
 
readed, err := readLink(info.path)
if err != nil {
return err
}
return fmt.Errorf("etag for %s is not equal %s to %s", link, info.etag, hex)
} else {
readed, err := readLink(info.fullPath)
if err != nil {
return err
}
 
if readed != link {
return fmt.Errorf("readed link for %s is not equal %s", link, readed)
}
if readed != link {
return fmt.Errorf("readed link for %s is not equal %s", link, readed)
}
 
return nil
return nil
}
}
Loading
Loading
@@ -2,6 +2,7 @@ package main
 
import (
"flag"
"github.com/Sirupsen/logrus"
)
 
Loading
Loading
@@ -9,6 +10,7 @@ var (
debug = flag.Bool("debug", false, "Print debug messages")
verbose = flag.Bool("verbose", true, "Print verbose messages")
dryRun = flag.Bool("dry-run", true, "Dry run")
storage = flag.String("storage", "filesystem", "Storage type to use: filesystem or s3")
)
 
func main() {
Loading
Loading
@@ -24,39 +26,46 @@ func main() {
 
var err error
 
currentStorage = &fsStorage{}
switch *storage {
case "filesystem":
currentStorage = &fsStorage{}
case "s3":
currentStorage = newS3Storage()
default:
logrus.Fatalln("Unknown storage specified:", *storage)
}
 
blobs := make(blobsData)
repositories := make(repositoriesData)
deletes := make(deletesData, 0, 1000)
 
logrus.Infoln("Walking BLOBS...")
err = currentStorage.Walk("blobs", blobs.walk)
logrus.Infoln("Walking REPOSITORIES...")
err = currentStorage.Walk("repositories", repositories.walk)
if err != nil {
logrus.Fatalln(err)
}
 
logrus.Infoln("Walking REPOSITORIES...")
err = currentStorage.Walk("repositories", repositories.walk)
logrus.Infoln("Walking BLOBS...")
err = currentStorage.Walk("blobs", blobs.walk)
if err != nil {
logrus.Fatalln(err)
}
 
logrus.Infoln("Marking REPOSITORIES...")
err = repositories.mark(blobs)
err = repositories.mark(blobs, deletes)
if err != nil {
logrus.Fatalln(err)
}
 
logrus.Infoln("Sweeping BLOBS...")
blobs.sweep()
blobs.sweep(deletes)
 
logrus.Warningln("Deleted:", deletedLinks, "links,",
deletedBlobs, "blobs,",
deletedBlobSize/1024/1024, "in MB",
)
deletes.info()
 
if !*dryRun {
logrus.Infoln("Sweeping...")
runDeletes()
deletes.run()
}
}
Loading
Loading
@@ -2,9 +2,10 @@ package main
 
import (
"fmt"
"github.com/Sirupsen/logrus"
"path/filepath"
"strings"
"github.com/Sirupsen/logrus"
)
 
type repositoryData struct {
Loading
Loading
@@ -79,9 +80,9 @@ func (r *repositoryData) markLayer(blobs blobsData, name string) error {
return blobs.mark(name)
}
 
func (r *repositoryData) mark(blobs blobsData) error {
func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
for _, t := range r.tags {
err := t.mark(blobs)
err := t.mark(blobs, deletes)
if err != nil {
return err
}
Loading
Loading
@@ -94,7 +95,7 @@ func (r *repositoryData) mark(blobs blobsData) error {
return err
}
} else {
scheduleDelete(r.manifestRevisionPath(name), linkFileSize)
deletes.schedule(r.manifestRevisionPath(name), linkFileSize)
}
}
 
Loading
Loading
@@ -105,7 +106,7 @@ func (r *repositoryData) mark(blobs blobsData) error {
return err
}
} else {
scheduleDelete(r.layerLinkPath(name), linkFileSize)
deletes.schedule(r.layerLinkPath(name), linkFileSize)
}
}
 
Loading
Loading
@@ -190,7 +191,8 @@ func (r *repositoryData) addManifest(args []string, info fileInfo) error {
func (r *repositoryData) addUpload(args []string, info fileInfo) error {
// /test/_uploads/579c7fc9b0d60a19706cd6c1573fec9a28fa758bfe1ece86a1e5c68ad6f4e9d1
if len(args) != 1 {
return fmt.Errorf("invalid args for uploads: %v", args)
logrus.Warningln("invalid args for uploads: %v", args)
return nil
}
 
r.uploads[args[0]] = 1
Loading
Loading
@@ -223,9 +225,9 @@ func (r repositoriesData) walk(path string, info fileInfo, err error) error {
return err
}
 
func (r repositoriesData) mark(blobs blobsData) error {
func (r repositoriesData) mark(blobs blobsData, deletes deletesData) error {
for _, repository := range r {
err := repository.mark(blobs)
err := repository.mark(blobs, deletes)
if err != nil {
return err
}
Loading
Loading
package main
import (
"errors"
"flag"
"io/ioutil"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
const listMax = 1000
type s3Storage struct {
S3 *s3.S3
}
var s3RootDir = flag.String("s3-root-dir", "", "s3 root directory")
var s3Bucket = flag.String("s3-bucket", "", "s3 bucket")
var s3Region = flag.String("s3-region", "us-east-1", "s3 region")
func newS3Storage() *s3Storage {
sess, err := session.NewSession()
if err != nil {
panic(err)
}
return &s3Storage{
S3: s3.New(sess, aws.NewConfig().WithRegion(*s3Region)),
}
}
func (f *s3Storage) fullPath(path string) string {
return filepath.Join(*s3RootDir, "docker", "registry", "v2", path)
}
func (f *s3Storage) Walk(path string, fn walkFunc) error {
path = f.fullPath(path)
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
resp, err := f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
})
if err != nil {
return err
}
for {
lastKey := ""
for _, key := range resp.Contents {
lastKey = *key.Key
keyPath := *key.Key
if strings.HasPrefix(keyPath, path) {
keyPath = keyPath[len(path):]
}
fi := fileInfo{
fullPath: *key.Key,
size: *key.Size,
etag: *key.ETag,
lastModified: *key.LastModified,
}
err = fn(keyPath, fi, err)
if err != nil {
return err
}
}
if *resp.IsTruncated {
resp, err = f.S3.ListObjects(&s3.ListObjectsInput{
Bucket: s3Bucket,
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
Marker: aws.String(lastKey),
})
if err != nil {
return err
}
} else {
break
}
}
return nil
}
func (f *s3Storage) Read(path string) ([]byte, error) {
resp, err := f.S3.GetObject(&s3.GetObjectInput{
Bucket: s3Bucket,
Key: aws.String(f.fullPath(path)),
})
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
func (f *s3Storage) Delete(path string) error {
return errors.New("not supported")
}
package main
 
import "time"
type fileInfo struct {
path string
size int64
etag string
fullPath string
size int64
etag string
lastModified time.Time
}
 
type walkFunc func(path string, info fileInfo, err error) error
 
type storage interface {
type storageObject interface {
Walk(path string, fn walkFunc) error
Read(path string) ([]byte, error)
Delete(path string) error
}
 
var currentStorage storage
var currentStorage storageObject
Loading
Loading
@@ -2,6 +2,8 @@ package main
 
import (
"path/filepath"
"github.com/Sirupsen/logrus"
)
 
type tag struct {
Loading
Loading
@@ -19,14 +21,14 @@ func (t *tag) versionLinkPath(version string) string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "index", "sha256", version, "link")
}
 
func (t *tag) mark(blobs blobsData) error {
func (t *tag) mark(blobs blobsData, deletes deletesData) error {
if t.current != "" {
t.repository.manifests[t.current]++
}
 
for _, version := range t.versions {
if version != t.current {
scheduleDelete(t.versionLinkPath(version), linkFileSize)
deletes.schedule(t.versionLinkPath(version), linkFileSize)
}
}
 
Loading
Loading
@@ -42,6 +44,7 @@ func (t *tag) setCurrent(info fileInfo) error {
}
 
t.current = readLink
logrus.Infoln("TAG:", t.repository.name, ":", t.name, ": is using:", t.current)
return nil
}
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment