Implements garbage collection subcommand

- Includes a change in the command to run the registry. The registry
  server itself is now started up as a subcommand.
- Includes changes to the high level interfaces to support enumeration
  of various registry objects.

Signed-off-by: Andrew T Nguyen <andrew.nguyen@docker.com>
This commit is contained in:
Andrew T Nguyen 2016-01-19 14:26:15 -08:00
parent bb4d128523
commit b7d3424103
14 changed files with 796 additions and 29 deletions

View file

@ -1,6 +1,8 @@
package storage
import (
"path"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
@ -85,6 +87,36 @@ func (bs *blobStore) Put(ctx context.Context, mediaType string, p []byte) (distr
}, bs.driver.PutContent(ctx, bp, p)
}
func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Digest) error) error {
specPath, err := pathFor(blobsPathSpec{})
if err != nil {
return err
}
err = Walk(ctx, bs.driver, specPath, func(fileInfo driver.FileInfo) error {
// skip directories
if fileInfo.IsDir() {
return nil
}
currentPath := fileInfo.Path()
// we only want to parse paths that end with /data
_, fileName := path.Split(currentPath)
if fileName != "data" {
return nil
}
digest, err := digestFromPath(currentPath)
if err != nil {
return err
}
return ingester(digest)
})
return err
}
// path returns the canonical path for the blob identified by digest. The blob
// may or may not exist.
func (bs *blobStore) path(dgst digest.Digest) (string, error) {

View file

@ -64,3 +64,34 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri
return n, errVal
}
// Enumerate applies ingester to each repository
func (reg *registry) Enumerate(ctx context.Context, ingester func(string) error) error {
repoNameBuffer := make([]string, 100)
var last string
for {
n, err := reg.Repositories(ctx, repoNameBuffer, last)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
last = repoNameBuffer[n-1]
for i := 0; i < n; i++ {
repoName := repoNameBuffer[i]
err = ingester(repoName)
if err != nil {
return err
}
}
if err == io.EOF {
break
}
}
return nil
}

View file

@ -3,6 +3,7 @@ package storage
import (
"fmt"
"net/http"
"path"
"time"
"github.com/docker/distribution"
@ -37,6 +38,9 @@ type linkedBlobStore struct {
// removed an the blob links folder should be merged. The first entry is
// treated as the "canonical" link location and will be used for writes.
linkPathFns []linkPathFunc
// linkDirectoryPathSpec locates the root directories in which one might find links
linkDirectoryPathSpec pathSpec
}
var _ distribution.BlobStore = &linkedBlobStore{}
@ -236,6 +240,55 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
return nil
}
func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.Digest) error) error {
rootPath, err := pathFor(lbs.linkDirectoryPathSpec)
if err != nil {
return err
}
err = Walk(ctx, lbs.blobStore.driver, rootPath, func(fileInfo driver.FileInfo) error {
// exit early if directory...
if fileInfo.IsDir() {
return nil
}
filePath := fileInfo.Path()
// check if it's a link
_, fileName := path.Split(filePath)
if fileName != "link" {
return nil
}
// read the digest found in link
digest, err := lbs.blobStore.readlink(ctx, filePath)
if err != nil {
return err
}
// ensure this conforms to the linkPathFns
_, err = lbs.Stat(ctx, digest)
if err != nil {
// we expect this error to occur so we move on
if err == distribution.ErrBlobUnknown {
return nil
}
return err
}
err = ingestor(digest)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {

View file

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"path"
"encoding/json"
"github.com/docker/distribution"
@ -129,6 +130,52 @@ func (ms *manifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
return ms.blobStore.Delete(ctx, dgst)
}
func (ms *manifestStore) Enumerate(ctx context.Context, manifests []distribution.Manifest, last distribution.Manifest) (n int, err error) {
return 0, distribution.ErrUnsupported
func (ms *manifestStore) Enumerate(ctx context.Context, ingester func(digest.Digest) error) error {
err := ms.blobStore.Enumerate(ctx, func(dgst digest.Digest) error {
err := ingester(dgst)
if err != nil {
return err
}
return nil
})
return err
}
// Only valid for schema1 signed manifests
func (ms *manifestStore) GetSignatures(ctx context.Context, manifestDigest digest.Digest) ([]digest.Digest, error) {
// sanity check that digest refers to a schema1 digest
manifest, err := ms.Get(ctx, manifestDigest)
if err != nil {
return nil, err
}
if _, ok := manifest.(*schema1.SignedManifest); !ok {
return nil, fmt.Errorf("digest %v is not for schema1 manifest", manifestDigest)
}
signaturesPath, err := pathFor(manifestSignaturesPathSpec{
name: ms.repository.Named().Name(),
revision: manifestDigest,
})
if err != nil {
return nil, err
}
signaturesPath = path.Join(signaturesPath, "sha256")
signaturePaths, err := ms.blobStore.driver.List(ctx, signaturesPath)
if err != nil {
return nil, err
}
var digests []digest.Digest
for _, sigPath := range signaturePaths {
sigdigest, err := digest.ParseDigest("sha256:" + path.Base(sigPath))
if err != nil {
// merely found not a digest
continue
}
digests = append(digests, sigdigest)
}
return digests, nil
}

View file

@ -74,6 +74,7 @@ const (
//
// Manifests:
//
// manifestRevisionsPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/
// manifestRevisionPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/
// manifestRevisionLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/link
// manifestSignaturesPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/signatures/
@ -100,6 +101,7 @@ const (
//
// Blob Store:
//
// blobsPathSpec: <root>/v2/blobs/
// blobPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>
// blobDataPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>/data
// blobMediaTypePathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>/data
@ -125,6 +127,9 @@ func pathFor(spec pathSpec) (string, error) {
switch v := spec.(type) {
case manifestRevisionsPathSpec:
return path.Join(append(repoPrefix, v.name, "_manifests", "revisions")...), nil
case manifestRevisionPathSpec:
components, err := digestPathComponents(v.revision, false)
if err != nil {
@ -246,6 +251,17 @@ func pathFor(spec pathSpec) (string, error) {
blobLinkPathComponents := append(repoPrefix, v.name, "_layers")
return path.Join(path.Join(append(blobLinkPathComponents, components...)...), "link"), nil
case blobsPathSpec:
blobsPathPrefix := append(rootPrefix, "blobs")
return path.Join(blobsPathPrefix...), nil
case blobPathSpec:
components, err := digestPathComponents(v.digest, true)
if err != nil {
return "", err
}
blobPathPrefix := append(rootPrefix, "blobs")
return path.Join(append(blobPathPrefix, components...)...), nil
case blobDataPathSpec:
components, err := digestPathComponents(v.digest, true)
if err != nil {
@ -281,6 +297,14 @@ type pathSpec interface {
pathSpec()
}
// manifestRevisionsPathSpec describes the directory path for
// a manifest revision.
type manifestRevisionsPathSpec struct {
name string
}
func (manifestRevisionsPathSpec) pathSpec() {}
// manifestRevisionPathSpec describes the components of the directory path for
// a manifest revision.
type manifestRevisionPathSpec struct {
@ -404,12 +428,17 @@ var blobAlgorithmReplacer = strings.NewReplacer(
";", "/",
)
// // blobPathSpec contains the path for the registry global blob store.
// type blobPathSpec struct {
// digest digest.Digest
// }
// blobsPathSpec contains the path for the blobs directory
type blobsPathSpec struct{}
// func (blobPathSpec) pathSpec() {}
func (blobsPathSpec) pathSpec() {}
// blobPathSpec contains the path for the registry global blob store.
type blobPathSpec struct {
digest digest.Digest
}
func (blobPathSpec) pathSpec() {}
// blobDataPathSpec contains the path for the registry global blob store. For
// now, this contains layer data, exclusively.
@ -491,3 +520,23 @@ func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error)
return append(prefix, suffix...), nil
}
// Reconstructs a digest from a path
func digestFromPath(digestPath string) (digest.Digest, error) {
digestPath = strings.TrimSuffix(digestPath, "/data")
dir, hex := path.Split(digestPath)
dir = path.Dir(dir)
dir, next := path.Split(dir)
// next is either the algorithm OR the first two characters in the hex string
var algo string
if next == hex[:2] {
algo = path.Base(dir)
} else {
algo = next
}
dgst := digest.NewDigestFromHex(algo, hex)
return dgst, dgst.Validate()
}

View file

@ -2,6 +2,8 @@ package storage
import (
"testing"
"github.com/docker/distribution/digest"
)
func TestPathMapper(t *testing.T) {
@ -120,3 +122,29 @@ func TestPathMapper(t *testing.T) {
}
}
func TestDigestFromPath(t *testing.T) {
for _, testcase := range []struct {
path string
expected digest.Digest
multilevel bool
err error
}{
{
path: "/docker/registry/v2/blobs/sha256/99/9943fffae777400c0344c58869c4c2619c329ca3ad4df540feda74d291dd7c86/data",
multilevel: true,
expected: "sha256:9943fffae777400c0344c58869c4c2619c329ca3ad4df540feda74d291dd7c86",
err: nil,
},
} {
result, err := digestFromPath(testcase.path)
if err != testcase.err {
t.Fatalf("Unexpected error value %v when we wanted %v", err, testcase.err)
}
if result != testcase.expected {
t.Fatalf("Unexpected result value %v when we wanted %v", result, testcase.expected)
}
}
}

View file

@ -147,6 +147,14 @@ func (reg *registry) Repository(ctx context.Context, canonicalName reference.Nam
}, nil
}
func (reg *registry) Blobs() distribution.BlobEnumerator {
return reg.blobStore
}
func (reg *registry) BlobStatter() distribution.BlobStatter {
return reg.statter
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
@ -180,6 +188,8 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
blobLinkPath,
}
manifestDirectoryPathSpec := manifestRevisionsPathSpec{name: repo.name.Name()}
blobStore := &linkedBlobStore{
ctx: ctx,
blobStore: repo.blobStore,
@ -193,7 +203,8 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
// TODO(stevvooe): linkPath limits this blob store to only
// manifests. This instance cannot be used for blob checks.
linkPathFns: manifestLinkPathFns,
linkPathFns: manifestLinkPathFns,
linkDirectoryPathSpec: manifestDirectoryPathSpec,
}
ms := &manifestStore{

View file

@ -34,11 +34,13 @@ func (v Vacuum) RemoveBlob(dgst string) error {
return err
}
blobPath, err := pathFor(blobDataPathSpec{digest: d})
blobPath, err := pathFor(blobPathSpec{digest: d})
if err != nil {
return err
}
context.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath)
err = v.driver.Delete(v.ctx, blobPath)
if err != nil {
return err