Implementation of the Manifest Service API refactor.

Add a generic Manifest interface to represent manifests in the registry and
remove references to schema specific manifests.

Add a ManifestBuilder to construct Manifest objects. Concrete manifest builders
will exist for each manifest type and implementations will contain manifest
specific data used to build a manifest.

Remove Signatures() from Repository interface.

Signatures are relevant only to schema1 manifests.  Move access to the signature
store inside the schema1 manifestStore.  Add some API tests to verify
signature roundtripping.

schema1
-------

Change the way data is stored in schema1.Manifest to enable Payload() to be used
to return complete Manifest JSON from the HTTP handler without knowledge of the
schema1 protocol.

tags
----

Move tag functionality to a seperate TagService and update ManifestService
to use the new interfaces.  Implement a driver based tagService to be backward
compatible with the current tag service.

Add a proxyTagService to enable the registry to get a digest for remote manifests
from a tag.

manifest store
--------------

Remove revision store and move all signing functionality into the signed manifeststore.

manifest registration
---------------------

Add a mechanism to register manifest media types and to allow different manifest
types to be Unmarshalled correctly.

client
------

Add ManifestServiceOptions to client functions to allow tags to be passed into Put and
Get for building correct registry URLs.  Change functional arguments to be an interface type
to allow passing data without mutating shared state.

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>

Signed-off-by: Richard Scothern <richard.scothern@docker.com>
This commit is contained in:
Richard Scothern 2015-08-20 21:50:15 -07:00 committed by Richard Scothern
parent 0fef25389d
commit 8efb9ca329
18 changed files with 1161 additions and 656 deletions

View file

@ -6,8 +6,6 @@ import (
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/proxy/scheduler"
)
@ -24,8 +22,8 @@ type proxyManifestStore struct {
var _ distribution.ManifestService = &proxyManifestStore{}
func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) {
exists, err := pms.localManifests.Exists(dgst)
func (pms proxyManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
exists, err := pms.localManifests.Exists(ctx, dgst)
if err != nil {
return false, err
}
@ -33,117 +31,56 @@ func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) {
return true, nil
}
return pms.remoteManifests.Exists(dgst)
return pms.remoteManifests.Exists(ctx, dgst)
}
func (pms proxyManifestStore) Get(dgst digest.Digest) (*schema1.SignedManifest, error) {
sm, err := pms.localManifests.Get(dgst)
if err == nil {
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
// At this point `dgst` was either specified explicitly, or returned by the
// tagstore with the most recent association.
var fromRemote bool
manifest, err := pms.localManifests.Get(ctx, dgst, options...)
if err != nil {
manifest, err = pms.remoteManifests.Get(ctx, dgst, options...)
if err != nil {
return nil, err
}
fromRemote = true
}
sm, err = pms.remoteManifests.Get(dgst)
_, payload, err := manifest.Payload()
if err != nil {
return nil, err
}
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
proxyMetrics.ManifestPush(uint64(len(payload)))
if fromRemote {
proxyMetrics.ManifestPull(uint64(len(payload)))
_, err = pms.localManifests.Put(ctx, manifest)
if err != nil {
return nil, err
}
// Schedule the repo for removal
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
// Ensure the manifest blob is cleaned up
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
}
// Schedule the repo for removal
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
// Ensure the manifest blob is cleaned up
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
return manifest, err
}
func (pms proxyManifestStore) Tags() ([]string, error) {
return pms.localManifests.Tags()
func (pms proxyManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
var d digest.Digest
return d, distribution.ErrUnsupported
}
func (pms proxyManifestStore) ExistsByTag(tag string) (bool, error) {
exists, err := pms.localManifests.ExistsByTag(tag)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return pms.remoteManifests.ExistsByTag(tag)
}
func (pms proxyManifestStore) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) {
var localDigest digest.Digest
localManifest, err := pms.localManifests.GetByTag(tag, options...)
switch err.(type) {
case distribution.ErrManifestUnknown, distribution.ErrManifestUnknownRevision:
goto fromremote
case nil:
break
default:
return nil, err
}
localDigest, err = manifestDigest(localManifest)
if err != nil {
return nil, err
}
fromremote:
var sm *schema1.SignedManifest
sm, err = pms.remoteManifests.GetByTag(tag, client.AddEtagToTag(tag, localDigest.String()))
if err != nil && err != distribution.ErrManifestNotModified {
return nil, err
}
if err == distribution.ErrManifestNotModified {
context.GetLogger(pms.ctx).Debugf("Local manifest for %q is latest, dgst=%s", tag, localDigest.String())
return localManifest, nil
}
context.GetLogger(pms.ctx).Debugf("Updated manifest for %q, dgst=%s", tag, localDigest.String())
err = pms.localManifests.Put(sm)
if err != nil {
return nil, err
}
dgst, err := manifestDigest(sm)
if err != nil {
return nil, err
}
pms.scheduler.AddBlob(dgst.String(), repositoryTTL)
pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
proxyMetrics.ManifestPull(uint64(len(sm.Raw)))
proxyMetrics.ManifestPush(uint64(len(sm.Raw)))
return sm, err
}
func manifestDigest(sm *schema1.SignedManifest) (digest.Digest, error) {
payload, err := sm.Payload()
if err != nil {
return "", err
}
return digest.FromBytes(payload), nil
}
func (pms proxyManifestStore) Put(manifest *schema1.SignedManifest) error {
func (pms proxyManifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}
func (pms proxyManifestStore) Delete(dgst digest.Digest) error {
return distribution.ErrUnsupported
/*func (pms proxyManifestStore) Enumerate(ctx context.Context, manifests []distribution.Manifest, last distribution.Manifest) (n int, err error) {
return 0, distribution.ErrUnsupported
}
*/

View file

@ -37,40 +37,31 @@ func (te manifestStoreTestEnv) RemoteStats() *map[string]int {
return &rs
}
func (sm statsManifest) Delete(dgst digest.Digest) error {
func (sm statsManifest) Delete(ctx context.Context, dgst digest.Digest) error {
sm.stats["delete"]++
return sm.manifests.Delete(dgst)
return sm.manifests.Delete(ctx, dgst)
}
func (sm statsManifest) Exists(dgst digest.Digest) (bool, error) {
func (sm statsManifest) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
sm.stats["exists"]++
return sm.manifests.Exists(dgst)
return sm.manifests.Exists(ctx, dgst)
}
func (sm statsManifest) ExistsByTag(tag string) (bool, error) {
sm.stats["existbytag"]++
return sm.manifests.ExistsByTag(tag)
}
func (sm statsManifest) Get(dgst digest.Digest) (*schema1.SignedManifest, error) {
func (sm statsManifest) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
sm.stats["get"]++
return sm.manifests.Get(dgst)
return sm.manifests.Get(ctx, dgst)
}
func (sm statsManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) {
sm.stats["getbytag"]++
return sm.manifests.GetByTag(tag, options...)
}
func (sm statsManifest) Put(manifest *schema1.SignedManifest) error {
func (sm statsManifest) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
sm.stats["put"]++
return sm.manifests.Put(manifest)
return sm.manifests.Put(ctx, manifest)
}
func (sm statsManifest) Tags() ([]string, error) {
sm.stats["tags"]++
return sm.manifests.Tags()
/*func (sm statsManifest) Enumerate(ctx context.Context, manifests []distribution.Manifest, last distribution.Manifest) (n int, err error) {
sm.stats["enumerate"]++
return sm.manifests.Enumerate(ctx, manifests, last)
}
*/
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
@ -169,15 +160,12 @@ func populateRepo(t *testing.T, ctx context.Context, repository distribution.Rep
if err != nil {
t.Fatalf(err.Error())
}
ms.Put(sm)
dgst, err := ms.Put(ctx, sm)
if err != nil {
t.Fatalf("unexpected errors putting manifest: %v", err)
}
pl, err := sm.Payload()
if err != nil {
t.Fatal(err)
}
return digest.FromBytes(pl), nil
return dgst, nil
}
// TestProxyManifests contains basic acceptance tests
@ -189,8 +177,9 @@ func TestProxyManifests(t *testing.T) {
localStats := env.LocalStats()
remoteStats := env.RemoteStats()
ctx := context.Background()
// Stat - must check local and remote
exists, err := env.manifests.ExistsByTag("latest")
exists, err := env.manifests.Exists(ctx, env.manifestDigest)
if err != nil {
t.Fatalf("Error checking existance")
}
@ -198,15 +187,16 @@ func TestProxyManifests(t *testing.T) {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 1 && (*remoteStats)["existbytag"] != 1 {
t.Errorf("Unexpected exists count")
if (*localStats)["exists"] != 1 && (*remoteStats)["exists"] != 1 {
t.Errorf("Unexpected exists count : \n%v \n%v", localStats, remoteStats)
}
// Get - should succeed and pull manifest into local
_, err = env.manifests.Get(env.manifestDigest)
_, err = env.manifests.Get(ctx, env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 {
t.Errorf("Unexpected get count")
}
@ -216,7 +206,7 @@ func TestProxyManifests(t *testing.T) {
}
// Stat - should only go to local
exists, err = env.manifests.ExistsByTag("latest")
exists, err = env.manifests.Exists(ctx, env.manifestDigest)
if err != nil {
t.Fatal(err)
}
@ -224,19 +214,21 @@ func TestProxyManifests(t *testing.T) {
t.Errorf("Unexpected non-existant manifest")
}
if (*localStats)["existbytag"] != 2 && (*remoteStats)["existbytag"] != 1 {
if (*localStats)["exists"] != 2 && (*remoteStats)["exists"] != 1 {
t.Errorf("Unexpected exists count")
}
// Get - should get from remote, to test freshness
_, err = env.manifests.Get(env.manifestDigest)
_, err = env.manifests.Get(ctx, env.manifestDigest)
if err != nil {
t.Fatal(err)
}
if (*remoteStats)["get"] != 2 && (*remoteStats)["existsbytag"] != 1 && (*localStats)["put"] != 1 {
if (*remoteStats)["get"] != 2 && (*remoteStats)["exists"] != 1 && (*localStats)["put"] != 1 {
t.Errorf("Unexpected get count")
}
}
func TestProxyTagService(t *testing.T) {
}

View file

@ -42,6 +42,7 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
s.OnManifestExpire(func(repoName string) error {
return v.RemoveRepository(repoName)
})
err = s.Start()
if err != nil {
return nil, err
@ -78,7 +79,7 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distri
if err != nil {
return nil, err
}
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification)
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
if err != nil {
return nil, err
}
@ -106,8 +107,11 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distri
ctx: ctx,
scheduler: pr.scheduler,
},
name: name,
signatures: localRepo.Signatures(),
name: name,
tags: proxyTagService{
localTags: localRepo.Tags(ctx),
remoteTags: remoteRepo.Tags(ctx),
},
}, nil
}
@ -115,14 +119,13 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distri
// locally, or pulling it through from a remote and caching it locally if it doesn't
// already exist
type proxiedRepository struct {
blobStore distribution.BlobStore
manifests distribution.ManifestService
name string
signatures distribution.SignatureService
blobStore distribution.BlobStore
manifests distribution.ManifestService
name string
tags distribution.TagService
}
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
// options
return pr.manifests, nil
}
@ -134,6 +137,6 @@ func (pr *proxiedRepository) Name() string {
return pr.name
}
func (pr *proxiedRepository) Signatures() distribution.SignatureService {
return pr.signatures
func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
return pr.tags
}

View file

@ -0,0 +1,58 @@
package proxy
import (
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
// proxyTagService supports local and remote lookup of tags.
type proxyTagService struct {
localTags distribution.TagService
remoteTags distribution.TagService
}
var _ distribution.TagService = proxyTagService{}
// Get attempts to get the most recent digest for the tag by checking the remote
// tag service first and then caching it locally. If the remote is unavailable
// the local association is returned
func (pt proxyTagService) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
desc, err := pt.remoteTags.Get(ctx, tag)
if err == nil {
err := pt.localTags.Tag(ctx, tag, desc)
if err != nil {
return distribution.Descriptor{}, err
}
return desc, nil
}
desc, err = pt.localTags.Get(ctx, tag)
if err != nil {
return distribution.Descriptor{}, err
}
return desc, nil
}
func (pt proxyTagService) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
return distribution.ErrUnsupported
}
func (pt proxyTagService) Untag(ctx context.Context, tag string) error {
err := pt.localTags.Untag(ctx, tag)
if err != nil {
return err
}
return nil
}
func (pt proxyTagService) All(ctx context.Context) ([]string, error) {
tags, err := pt.remoteTags.All(ctx)
if err == nil {
return tags, err
}
return pt.localTags.All(ctx)
}
func (pt proxyTagService) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) {
return []string{}, distribution.ErrUnsupported
}

View file

@ -0,0 +1,164 @@
package proxy
import (
"sort"
"sync"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
type mockTagStore struct {
mapping map[string]distribution.Descriptor
sync.Mutex
}
var _ distribution.TagService = &mockTagStore{}
func (m *mockTagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
m.Lock()
defer m.Unlock()
if d, ok := m.mapping[tag]; ok {
return d, nil
}
return distribution.Descriptor{}, distribution.ErrTagUnknown{}
}
func (m *mockTagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
m.Lock()
defer m.Unlock()
m.mapping[tag] = desc
return nil
}
func (m *mockTagStore) Untag(ctx context.Context, tag string) error {
m.Lock()
defer m.Unlock()
if _, ok := m.mapping[tag]; ok {
delete(m.mapping, tag)
return nil
}
return distribution.ErrTagUnknown{}
}
func (m *mockTagStore) All(ctx context.Context) ([]string, error) {
m.Lock()
defer m.Unlock()
var tags []string
for tag := range m.mapping {
tags = append(tags, tag)
}
return tags, nil
}
func (m *mockTagStore) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) {
panic("not implemented")
}
func testProxyTagService(local, remote map[string]distribution.Descriptor) *proxyTagService {
if local == nil {
local = make(map[string]distribution.Descriptor)
}
if remote == nil {
remote = make(map[string]distribution.Descriptor)
}
return &proxyTagService{
localTags: &mockTagStore{mapping: local},
remoteTags: &mockTagStore{mapping: remote},
}
}
func TestGet(t *testing.T) {
remoteDesc := distribution.Descriptor{Size: 42}
remoteTag := "remote"
proxyTags := testProxyTagService(map[string]distribution.Descriptor{remoteTag: remoteDesc}, nil)
ctx := context.Background()
// Get pre-loaded tag
d, err := proxyTags.Get(ctx, remoteTag)
if err != nil {
t.Fatal(err)
}
if d != remoteDesc {
t.Fatal("unable to get put tag")
}
local, err := proxyTags.localTags.Get(ctx, remoteTag)
if err != nil {
t.Fatal("remote tag not pulled into store")
}
if local != remoteDesc {
t.Fatalf("unexpected descriptor pulled through")
}
// Manually overwrite remote tag
newRemoteDesc := distribution.Descriptor{Size: 43}
err = proxyTags.remoteTags.Tag(ctx, remoteTag, newRemoteDesc)
if err != nil {
t.Fatal(err)
}
d, err = proxyTags.Get(ctx, remoteTag)
if err != nil {
t.Fatal(err)
}
if d != newRemoteDesc {
t.Fatal("unable to get put tag")
}
_, err = proxyTags.localTags.Get(ctx, remoteTag)
if err != nil {
t.Fatal("remote tag not pulled into store")
}
// untag, ensure it's removed locally, but present in remote
err = proxyTags.Untag(ctx, remoteTag)
if err != nil {
t.Fatal(err)
}
_, err = proxyTags.localTags.Get(ctx, remoteTag)
if err == nil {
t.Fatalf("Expected error getting Untag'd tag")
}
_, err = proxyTags.remoteTags.Get(ctx, remoteTag)
if err != nil {
t.Fatalf("remote tag should not be untagged with proxyTag.Untag")
}
_, err = proxyTags.Get(ctx, remoteTag)
if err != nil {
t.Fatal("untagged tag should be pulled through")
}
// Add another tag. Ensure both tags appear in enumerate
err = proxyTags.remoteTags.Tag(ctx, "funtag", distribution.Descriptor{Size: 42})
if err != nil {
t.Fatal(err)
}
all, err := proxyTags.All(ctx)
if err != nil {
t.Fatal(err)
}
if len(all) != 2 {
t.Fatalf("Unexpected tag length returned from All() : %d ", len(all))
}
sort.Strings(all)
if all[0] != "funtag" && all[1] != "remote" {
t.Fatalf("Unexpected tags returned from All() : %v ", all)
}
}