Skip to content
Snippets Groups Projects
Unverified Commit c6570d55 authored by Kamil Trzcinski's avatar Kamil Trzcinski
Browse files

Use digest object to reduce memory pressure

parent 215ff680
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -14,46 +14,46 @@ import (
var parallelBlobWalk = flag.Bool("parallel-blob-walk", true, "Allow to use parallel blob walker")
 
type blobData struct {
name string
name digest
size int64
references int64
etag string
}
 
func (b *blobData) path() string {
return filepath.Join("blobs", "sha256", b.name[0:2], b.name, "data")
return filepath.Join("blobs", b.name.scopedPath(), "data")
}
 
type blobsData map[string]*blobData
type blobsData map[digest]*blobData
 
var blobsLock sync.Mutex
 
func (b blobsData) mark(name string) error {
blobsLock.Lock()
defer blobsLock.Unlock()
func (b blobsData) mark(digest digest) error {
if *ignoreBlobs {
return nil
}
 
blob := b[name]
blobsLock.Lock()
defer blobsLock.Unlock()
blob := b[digest]
if blob == nil {
return fmt.Errorf("blob not found: %v", name)
return fmt.Errorf("blob not found: %v", digest)
}
blob.references++
return nil
}
 
func (b blobsData) etag(name string) string {
blob := b[name]
func (b blobsData) etag(digest digest) string {
blob := b[digest]
if blob != nil {
return blob.etag
}
return ""
}
 
func (b blobsData) size(name string) int64 {
blob := b[name]
func (b blobsData) size(digest digest) int64 {
blob := b[digest]
if blob != nil {
return blob.size
}
Loading
Loading
@@ -73,32 +73,28 @@ func (b blobsData) addBlob(segments []string, info fileInfo) error {
return fmt.Errorf("unparseable path: %v", segments)
}
 
if segments[0] != "sha256" {
return fmt.Errorf("path needs to start with sha256: %v", segments)
}
if segments[3] != "data" {
return fmt.Errorf("file needs to be data: %v", segments)
}
 
name := segments[2]
if len(name) != 64 {
return fmt.Errorf("blobs need to be sha256: %v", segments)
digest, err := newDigestFromScopedPath(segments[0:3])
if err != nil {
return err
}
 
if segments[1] != name[0:2] {
return fmt.Errorf("path needs to be prefixed with %v: %v", name[0:2], segments)
if segments[0] != "sha256" {
return fmt.Errorf("path needs to start with sha256: %v", segments)
}
 
blobsLock.Lock()
defer blobsLock.Unlock()
 
blob := &blobData{
name: name,
name: digest,
size: info.size,
etag: info.etag,
}
b[name] = blob
b[digest] = blob
return nil
}
 
Loading
Loading
Loading
Loading
@@ -19,8 +19,6 @@ type deletesData []string
 
var deletesLock sync.Mutex
 
const linkFileSize = int64(len("sha256:") + 64)
func (d *deletesData) schedule(path string, size int64) {
deletesLock.Lock()
defer deletesLock.Unlock()
Loading
Loading
digest.go 0 → 100644
package main
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt"
"path/filepath"
)
const digestAlgorithm = "sha256"
const digestReferenceAlgorithm = "sha256:"
var digestEmpty [sha256.Size]byte
const digestReferenceSize = int64(len(digestReferenceAlgorithm) + sha256.Size*2)
type digest struct {
hash [sha256.Size]byte
}
func newDigestFromPath(components []string) (d digest, err error) {
if len(components) != 2 {
return digest{}, fmt.Errorf("digest components should contain exactly two items: %v", components)
}
if components[0] != digestAlgorithm {
return digest{}, fmt.Errorf("only %v is supported: %v", digestAlgorithm, components[0])
}
err = d.decode([]byte(components[1]))
return
}
func newDigestFromScopedPath(components []string) (d digest, err error) {
if len(components) != 3 {
return digest{}, fmt.Errorf("digest components should contain exactly three items: %v", components)
}
if components[0] != digestAlgorithm {
return digest{}, fmt.Errorf("only %v is supported: %v", digestAlgorithm, components[0])
}
if components[1] != components[2][0:2] {
return digest{}, fmt.Errorf("digest needs to be prefixed with %v: %v", components[2][0:2], components)
}
err = d.decode([]byte(components[2]))
if err != nil {
return
}
return
}
func newDigestFromReference(data []byte) (d digest, err error) {
if !bytes.HasPrefix(data, []byte(digestReferenceAlgorithm)) {
return digest{}, fmt.Errorf("digest reference should start with: %v, but was: %v", digestReferenceAlgorithm, data)
}
err = d.decode(data[len(digestReferenceAlgorithm):])
return
}
func (d *digest) decode(data []byte) error {
n, err := hex.Decode(d.hash[:], data)
if err != nil {
return err
}
if n != sha256.Size {
return fmt.Errorf("component should be valid %v, but was: %v", digestAlgorithm, data)
}
return nil
}
func (d *digest) hexHash() string {
return hex.EncodeToString(d.hash[:])
}
func (d *digest) path() string {
return filepath.Join(digestAlgorithm, d.hexHash())
}
func (d *digest) scopedPath() string {
hex := d.hexHash()
return filepath.Join(digestAlgorithm, hex[0:2], hex)
}
func (d *digest) reference() []byte {
return []byte(digestReferenceAlgorithm + d.hexHash())
}
func (d *digest) etag() string {
md5sum := md5.Sum(d.reference())
hex := hex.EncodeToString(md5sum[:])
return "\"" + hex + "\""
}
func (d digest) String() string {
return d.hexHash()
}
func (d *digest) valid() bool {
return !bytes.Equal(d.hash[:], digestEmpty[:])
}
Loading
Loading
@@ -3,47 +3,47 @@ package main
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"strings"
)
 
func analyzeLink(args []string) (string, error) {
func analyzeLink(args []string) (digest, error) {
if len(args) != 3 {
return "", fmt.Errorf("invalid args for link: %v", args)
}
if args[0] != "sha256" {
return "", fmt.Errorf("only sha256 is supported: %v", args[0])
return digest{}, fmt.Errorf("invalid args for link: %v", args)
}
 
if args[2] != "link" {
return "", fmt.Errorf("expected link as path component: %v", args[2])
return digest{}, fmt.Errorf("expected link as path component: %v", args[2])
}
 
return args[1], nil
return newDigestFromPath(args[0:2])
}
 
func analyzeLinkSignature(args []string) (string, string, error) {
func analyzeLinkSignature(args []string) (digest, digest, error) {
// sha256/8d0c94a38dfa0db8827089a036d47482aa30550510d62f8fb2021548f49b1c84/signatures/sha256/6b659c9f4d1ff9c422f7bc517a0e896bc7fadb99a00e5db4c9921ddf8b5d402c/link
 
if len(args) != 6 {
return "", "", fmt.Errorf("invalid args for signature link: %v", args)
return digest{}, digest{}, fmt.Errorf("invalid args for signature link: %v", args)
}
 
if args[0] != "sha256" || args[3] != "sha256" {
return "", "", fmt.Errorf("only sha256 is supported: %v", args[0])
if args[5] != "link" {
return digest{}, digest{}, fmt.Errorf("expected link as path component: %v", args[2])
}
 
if args[2] != "signatures" {
return "", "", fmt.Errorf("expected signatures as path component: %v", args[2])
return digest{}, digest{}, fmt.Errorf("expected signatures as path component: %v", args[2])
}
 
if args[5] != "link" {
return "", "", fmt.Errorf("expected link as path component: %v", args[2])
link, err := newDigestFromPath(args[0:2])
if err != nil {
return digest{}, digest{}, err
}
 
return args[1], args[4], nil
signature, err := newDigestFromPath(args[3:5])
if err != nil {
return digest{}, digest{}, err
}
return link, signature, err
}
 
func compareEtag(data []byte, etag string) bool {
Loading
Loading
@@ -53,30 +53,24 @@ func compareEtag(data []byte, etag string) bool {
return etag == hex
}
 
func readLink(path string, etag string) (string, error) {
func readLink(path string, etag string) (digest, error) {
data, err := currentStorage.Read(path, etag)
if err != nil {
return "", err
return digest{}, err
}
 
link := string(data)
if !strings.HasPrefix(link, "sha256:") {
return "", errors.New("Link has to start with sha256")
}
link = link[len("sha256:"):]
if len(link) != 64 {
return "", fmt.Errorf("Link has to be exactly 256 bit: %v", link)
d, err := newDigestFromReference(data)
if err != nil {
return digest{}, err
}
 
return link, nil
return d, nil
}
 
func verifyLink(link string, path string, etag string) error {
func verifyLink(link digest, path string, etag string) error {
// If we have e-tag, let's verify e-tag
if etag != "" {
content := "sha256:" + link
if compareEtag([]byte(content), etag) {
if link.etag() == etag {
return nil
}
}
Loading
Loading
Loading
Loading
@@ -15,17 +15,17 @@ import (
)
 
type manifestData struct {
name string
layers []string
digest digest
layers []digest
loaded bool
loadErr error
 
loadLock sync.Mutex
}
 
type manifestsData map[string]*manifestData
type manifestsData map[digest]*manifestData
 
var manifests manifestsData = make(map[string]*manifestData)
var manifests manifestsData = make(map[digest]*manifestData)
var manifestsLock sync.Mutex
 
func deserializeManifest(data []byte) (distribution.Manifest, error) {
Loading
Loading
@@ -59,13 +59,13 @@ func deserializeManifest(data []byte) (distribution.Manifest, error) {
}
 
func (m *manifestData) path() string {
return filepath.Join("blobs", "sha256", m.name[0:2], m.name, "data")
return filepath.Join("blobs", m.digest.scopedPath(), "data")
}
 
func (m *manifestData) load(blobs blobsData) error {
logrus.Println("MANIFEST:", m.path(), ": loading...")
 
data, err := currentStorage.Read(m.path(), blobs.etag(m.name))
data, err := currentStorage.Read(m.path(), blobs.etag(m.digest))
if err != nil {
return err
}
Loading
Loading
@@ -76,19 +76,23 @@ func (m *manifestData) load(blobs blobsData) error {
}
 
for _, reference := range manifest.References() {
m.layers = append(m.layers, reference.Digest.Hex())
digest, err := newDigestFromReference([]byte(reference.Digest))
if err != nil {
return err
}
m.layers = append(m.layers, digest)
}
return nil
}
 
func (m manifestsData) get(name string, blobs blobsData) (*manifestData, error) {
func (m manifestsData) get(digest digest, blobs blobsData) (*manifestData, error) {
manifestsLock.Lock()
manifest := m[name]
manifest := m[digest]
if manifest == nil {
manifest = &manifestData{
name: name,
digest: digest,
}
m[name] = manifest
m[digest] = manifest
}
manifestsLock.Unlock()
 
Loading
Loading
Loading
Loading
@@ -15,9 +15,9 @@ import (
 
type repositoryData struct {
name string
layers map[string]int
manifests map[string]int
manifestSignatures map[string][]string
layers map[digest]int
manifests map[digest]int
manifestSignatures map[digest][]digest
tags map[string]*tag
uploads []string
lock sync.Mutex
Loading
Loading
@@ -33,23 +33,23 @@ var repositoryCsvOutput = flag.String("repository-csv-output", "repositories.csv
func newRepositoryData(name string) *repositoryData {
return &repositoryData{
name: name,
layers: make(map[string]int),
manifests: make(map[string]int),
manifestSignatures: make(map[string][]string),
layers: make(map[digest]int),
manifests: make(map[digest]int),
manifestSignatures: make(map[digest][]digest),
tags: make(map[string]*tag),
}
}
 
func (r *repositoryData) layerLinkPath(layer string) string {
return filepath.Join("repositories", r.name, "_layers", "sha256", layer, "link")
func (r *repositoryData) layerLinkPath(layer digest) string {
return filepath.Join("repositories", r.name, "_layers", layer.path(), "link")
}
 
func (r *repositoryData) manifestRevisionPath(revision string) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", "sha256", revision, "link")
func (r *repositoryData) manifestRevisionPath(revision digest) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", revision.path(), "link")
}
 
func (r *repositoryData) manifestRevisionSignaturePath(revision, signature string) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", "sha256", revision, "signatures", "sha256", signature, "link")
func (r *repositoryData) manifestRevisionSignaturePath(revision, signature digest) string {
return filepath.Join("repositories", r.name, "_manifests", "revisions", revision.path(), "signatures", signature.path(), "link")
}
 
func (r *repositoryData) uploadPath(upload string) string {
Loading
Loading
@@ -72,21 +72,21 @@ func (r *repositoryData) tag(name string) *tag {
return t
}
 
func (r *repositoryData) markManifest(name string) error {
func (r *repositoryData) markManifest(revision digest) error {
r.lock.Lock()
defer r.lock.Unlock()
 
r.manifests[name]++
r.manifests[revision]++
return nil
}
 
func (r *repositoryData) markManifestLayers(blobs blobsData, name string) error {
err := blobs.mark(name)
func (r *repositoryData) markManifestLayers(blobs blobsData, revision digest) error {
err := blobs.mark(revision)
if err != nil {
return err
}
 
manifest, err := manifests.get(name, blobs)
manifest, err := manifests.get(revision, blobs)
if err != nil {
return err
}
Loading
Loading
@@ -97,7 +97,7 @@ func (r *repositoryData) markManifestLayers(blobs blobsData, name string) error
for _, layer := range manifest.layers {
_, ok := r.layers[layer]
if !ok {
return fmt.Errorf("layer %s not found reference from manifest %s", layer, name)
return fmt.Errorf("layer %s not found reference from manifest %s", layer, revision)
}
 
r.layers[layer]++
Loading
Loading
@@ -106,21 +106,21 @@ func (r *repositoryData) markManifestLayers(blobs blobsData, name string) error
return nil
}
 
func (r *repositoryData) markManifestSignatures(deletes deletesData, blobs blobsData, name string, signatures []string) error {
if r.manifests[name] > 0 {
func (r *repositoryData) markManifestSignatures(deletes deletesData, blobs blobsData, revision digest, signatures []digest) error {
if r.manifests[revision] > 0 {
for _, signature := range signatures {
blobs.mark(signature)
}
} else {
for _, signature := range signatures {
deletes.schedule(r.manifestRevisionSignaturePath(name, signature), linkFileSize)
deletes.schedule(r.manifestRevisionSignaturePath(revision, signature), digestReferenceSize)
}
}
return nil
}
 
func (r *repositoryData) markLayer(blobs blobsData, name string) error {
return blobs.mark(name)
func (r *repositoryData) markLayer(blobs blobsData, revision digest) error {
return blobs.mark(revision)
}
 
func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
Loading
Loading
@@ -135,44 +135,44 @@ func (r *repositoryData) mark(blobs blobsData, deletes deletesData) error {
}
}
 
for name, used := range r.manifests {
for revision, used := range r.manifests {
if used > 0 {
err := r.markManifestLayers(blobs, name)
err := r.markManifestLayers(blobs, revision)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST:", name, "ERROR:", err)
logrus.Errorln("MARK:", r.name, "MANIFEST:", revision, "ERROR:", err)
continue
}
return err
}
} else {
deletes.schedule(r.manifestRevisionPath(name), linkFileSize)
deletes.schedule(r.manifestRevisionPath(revision), digestReferenceSize)
}
}
 
for name, signatures := range r.manifestSignatures {
err := r.markManifestSignatures(deletes, blobs, name, signatures)
for revision, signatures := range r.manifestSignatures {
err := r.markManifestSignatures(deletes, blobs, revision, signatures)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "MANIFEST SIGNATURE:", name, "ERROR:", err)
logrus.Errorln("MARK:", r.name, "MANIFEST SIGNATURE:", revision, "ERROR:", err)
continue
}
return err
}
}
 
for name, used := range r.layers {
for digest, used := range r.layers {
if used > 0 {
err := r.markLayer(blobs, name)
err := r.markLayer(blobs, digest)
if err != nil {
if *softErrors {
logrus.Errorln("MARK:", r.name, "LAYER:", name, "ERROR:", err)
logrus.Errorln("MARK:", r.name, "LAYER:", digest, "ERROR:", err)
continue
}
return err
}
} else {
deletes.schedule(r.layerLinkPath(name), linkFileSize)
deletes.schedule(r.layerLinkPath(digest), digestReferenceSize)
}
}
 
Loading
Loading
@@ -296,13 +296,13 @@ func (r *repositoryData) info(blobs blobsData, stream io.WriteCloser) {
var tagsVersions int
var layersUsedSize, layersUnusedSize int64
 
for name, used := range r.layers {
for digest, used := range r.layers {
if used > 0 {
layersUsed++
layersUsedSize += blobs.size(name)
layersUsedSize += blobs.size(digest)
} else {
layersUnused++
layersUnusedSize += blobs.size(name)
layersUnusedSize += blobs.size(digest)
}
}
 
Loading
Loading
Loading
Loading
@@ -13,8 +13,8 @@ var deleteOldTagVersions = flag.Bool("delete-old-tag-versions", true, "Delete ol
type tag struct {
repository *repositoryData
name string
current string
versions []string
current digest
versions []digest
lock sync.Mutex
}
 
Loading
Loading
@@ -22,19 +22,21 @@ func (t *tag) currentLinkPath() string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "current", "link")
}
 
func (t *tag) versionLinkPath(version string) string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "index", "sha256", version, "link")
func (t *tag) versionLinkPath(version digest) string {
return filepath.Join("repositories", t.repository.name, "_manifests", "tags", t.name, "index", version.path(), "link")
}
 
func (t *tag) mark(blobs blobsData, deletes deletesData) error {
if t.current != "" {
if t.current.valid() {
t.repository.markManifest(t.current)
} else {
deletes.schedule(t.currentLinkPath(), digestReferenceSize)
}
 
for _, version := range t.versions {
if version != t.current {
if *deleteOldTagVersions {
deletes.schedule(t.versionLinkPath(version), linkFileSize)
deletes.schedule(t.versionLinkPath(version), digestReferenceSize)
} else {
t.repository.markManifest(version)
}
Loading
Loading
@@ -47,12 +49,12 @@ func (t *tag) mark(blobs blobsData, deletes deletesData) error {
func (t *tag) setCurrent(info fileInfo) error {
//INFO[0000] /test2/_manifests/tags/latest/current/link
 
readLink, err := readLink(t.currentLinkPath(), info.etag)
link, err := readLink(t.currentLinkPath(), info.etag)
if err != nil {
return err
}
 
t.current = readLink
t.current = link
logrus.Infoln("TAG:", t.repository.name, ":", t.name, ": is using:", t.current)
return nil
}
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment