From 36023174db108428751f21c3a115a019628d0689 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 13 Jan 2016 11:44:42 -0800 Subject: [PATCH] Adds functional options arguments to the Blobs Create method Removes the Mount operation and instead implements this behavior as part of Create a From option is provided, which in turn returns a rich ErrBlobMounted indicating that a blob upload session was not initiated, but instead the blob was mounted from another repository Signed-off-by: Brian Bland --- docs/client/repository.go | 79 ++++++++++--------------------- docs/client/repository_test.go | 29 ++++++++++-- docs/handlers/blobupload.go | 42 +++++++++------- docs/proxy/proxyblobstore.go | 2 +- docs/proxy/proxyblobstore_test.go | 12 +---- docs/storage/blob_test.go | 21 ++++++-- docs/storage/linkedblobstore.go | 54 ++++++++++++++++++++- 7 files changed, 146 insertions(+), 93 deletions(-) diff --git a/docs/client/repository.go b/docs/client/repository.go index 8f30b4f1..c2aca03f 100644 --- a/docs/client/repository.go +++ b/docs/client/repository.go @@ -10,7 +10,6 @@ import ( "net/http" "net/url" "strconv" - "sync" "time" "github.com/docker/distribution" @@ -19,6 +18,7 @@ import ( "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client/transport" + "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/cache/memory" ) @@ -500,9 +500,6 @@ type blobs struct { statter distribution.BlobDescriptorService distribution.BlobDeleter - - cacheLock sync.Mutex - cachedBlobUpload distribution.BlobWriter } func sanitizeLocation(location, base string) (string, error) { @@ -576,18 +573,23 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut return writer.Commit(ctx, desc) } -func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { - bs.cacheLock.Lock() - if bs.cachedBlobUpload != nil { - upload := bs.cachedBlobUpload - bs.cachedBlobUpload = nil - bs.cacheLock.Unlock() +func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + var opts storage.CreateOptions - return upload, nil + for _, option := range options { + err := option.Apply(&opts) + if err != nil { + return nil, err + } } - bs.cacheLock.Unlock() - u, err := bs.ub.BuildBlobUploadURL(bs.name) + var values []url.Values + + if opts.Mount.ShouldMount { + values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}}) + } + + u, err := bs.ub.BuildBlobUploadURL(bs.name, values...) if err != nil { return nil, err } @@ -598,7 +600,14 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { } defer resp.Body.Close() - if SuccessStatus(resp.StatusCode) { + switch resp.StatusCode { + case http.StatusCreated: + desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest()) + if err != nil { + return nil, err + } + return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} + case http.StatusAccepted: // TODO(dmcgowan): Check for invalid UUID uuid := resp.Header.Get("Docker-Upload-UUID") location, err := sanitizeLocation(resp.Header.Get("Location"), u) @@ -613,53 +622,15 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { startedAt: time.Now(), location: location, }, nil + default: + return nil, HandleErrorResponse(resp) } - return nil, HandleErrorResponse(resp) } func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { panic("not implemented") } -func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}}) - if err != nil { - return distribution.Descriptor{}, err - } - - resp, err := bs.client.Post(u, "", nil) - if err != nil { - return distribution.Descriptor{}, err - } - defer resp.Body.Close() - - switch resp.StatusCode { - case http.StatusCreated: - return bs.Stat(ctx, dgst) - case http.StatusAccepted: - // Triggered a blob upload (legacy behavior), so cache the creation info - uuid := resp.Header.Get("Docker-Upload-UUID") - location, err := sanitizeLocation(resp.Header.Get("Location"), u) - if err != nil { - return distribution.Descriptor{}, err - } - - bs.cacheLock.Lock() - bs.cachedBlobUpload = &httpBlobUpload{ - statter: bs.statter, - client: bs.client, - uuid: uuid, - startedAt: time.Now(), - location: location, - } - bs.cacheLock.Unlock() - - return distribution.Descriptor{}, HandleErrorResponse(resp) - default: - return distribution.Descriptor{}, HandleErrorResponse(resp) - } -} - func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error { return bs.statter.Clear(ctx, dgst) } diff --git a/docs/client/repository_test.go b/docs/client/repository_test.go index 8a7fc1c9..811ab235 100644 --- a/docs/client/repository_test.go +++ b/docs/client/repository_test.go @@ -18,7 +18,9 @@ import ( "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/testutil" "github.com/docker/distribution/uuid" "github.com/docker/libtrust" @@ -471,6 +473,16 @@ func TestBlobMount(t *testing.T) { var m testutil.RequestResponseMap repo := "test.example.com/uploadrepo" sourceRepo := "test.example.com/sourcerepo" + + namedRef, err := reference.ParseNamed(sourceRepo) + if err != nil { + t.Fatal(err) + } + canonicalRef, err := reference.WithDigest(namedRef, dgst) + if err != nil { + t.Fatal(err) + } + m = append(m, testutil.RequestResponseMapping{ Request: testutil.Request{ Method: "POST", @@ -511,13 +523,20 @@ func TestBlobMount(t *testing.T) { l := r.Blobs(ctx) - stat, err := l.Mount(ctx, sourceRepo, dgst) - if err != nil { - t.Fatal(err) + bw, err := l.Create(ctx, storage.WithMountFrom(canonicalRef)) + if bw != nil { + t.Fatalf("Expected blob writer to be nil, was %v", bw) } - if stat.Digest != dgst { - t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst) + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + if ebm.From.Digest() != dgst { + t.Fatalf("Unexpected digest: %s, expected %s", ebm.From.Digest(), dgst) + } + if ebm.From.Name() != sourceRepo { + t.Fatalf("Unexpected from: %s, expected %s", ebm.From.Name(), sourceRepo) + } + } else { + t.Fatalf("Unexpected error: %v, expected an ErrBlobMounted", err) } } diff --git a/docs/handlers/blobupload.go b/docs/handlers/blobupload.go index c5638c83..0f325184 100644 --- a/docs/handlers/blobupload.go +++ b/docs/handlers/blobupload.go @@ -9,8 +9,10 @@ import ( "github.com/docker/distribution" ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) @@ -118,19 +120,27 @@ type blobUploadHandler struct { // StartBlobUpload begins the blob upload process and allocates a server-side // blob writer session, optionally mounting the blob from a separate repository. func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) { + var options []distribution.BlobCreateOption + fromRepo := r.FormValue("from") mountDigest := r.FormValue("mount") if mountDigest != "" && fromRepo != "" { - buh.mountBlob(w, fromRepo, mountDigest) - return + opt, err := buh.createBlobMountOption(fromRepo, mountDigest) + if err != nil { + options = append(options, opt) + } } blobs := buh.Repository.Blobs(buh) - upload, err := blobs.Create(buh) + upload, err := blobs.Create(buh, options...) if err != nil { - if err == distribution.ErrUnsupported { + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil { + buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) + } + } else if err == distribution.ErrUnsupported { buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported) } else { buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) @@ -339,27 +349,23 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http. // mountBlob attempts to mount a blob from another repository by its digest. If // successful, the blob is linked into the blob store and 201 Created is // returned with the canonical url of the blob. -func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) { +func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string) (distribution.BlobCreateOption, error) { dgst, err := digest.ParseDigest(mountDigest) if err != nil { - buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err)) - return + return nil, err } - blobs := buh.Repository.Blobs(buh) - desc, err := blobs.Mount(buh, fromRepo, dgst) + ref, err := reference.ParseNamed(fromRepo) if err != nil { - if err == distribution.ErrBlobUnknown { - buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst)) - } else { - buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) - } - return + return nil, err } - if err := buh.writeBlobCreatedHeaders(w, desc); err != nil { - buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) - return + + canonical, err := reference.WithDigest(ref, dgst) + if err != nil { + return nil, err } + + return storage.WithMountFrom(canonical), nil } // writeBlobCreatedHeaders writes the standard headers describing a newly diff --git a/docs/proxy/proxyblobstore.go b/docs/proxy/proxyblobstore.go index ca39f9f8..41b76e8e 100644 --- a/docs/proxy/proxyblobstore.go +++ b/docs/proxy/proxyblobstore.go @@ -161,7 +161,7 @@ func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) return distribution.Descriptor{}, distribution.ErrUnsupported } -func (pbs *proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } diff --git a/docs/proxy/proxyblobstore_test.go b/docs/proxy/proxyblobstore_test.go index 5c5015a0..7702771c 100644 --- a/docs/proxy/proxyblobstore_test.go +++ b/docs/proxy/proxyblobstore_test.go @@ -42,12 +42,12 @@ func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, return sbs.blobs.Get(ctx, dgst) } -func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { sbsMu.Lock() sbs.stats["create"]++ sbsMu.Unlock() - return sbs.blobs.Create(ctx) + return sbs.blobs.Create(ctx, options...) } func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { @@ -58,14 +58,6 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B return sbs.blobs.Resume(ctx, id) } -func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - sbsMu.Lock() - sbs.stats["mount"]++ - sbsMu.Unlock() - - return sbs.blobs.Mount(ctx, sourceRepo, dgst) -} - func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { sbsMu.Lock() sbs.stats["open"]++ diff --git a/docs/storage/blob_test.go b/docs/storage/blob_test.go index b89814c7..e1eacc00 100644 --- a/docs/storage/blob_test.go +++ b/docs/storage/blob_test.go @@ -12,6 +12,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/cache/memory" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" @@ -377,13 +378,27 @@ func TestBlobMount(t *testing.T) { t.Fatalf("unexpected non-error stating unmounted blob: %v", desc) } - mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest) + namedRef, err := reference.ParseNamed(sourceRepository.Name()) if err != nil { + t.Fatal(err) + } + canonicalRef, err := reference.WithDigest(namedRef, desc.Digest) + if err != nil { + t.Fatal(err) + } + + bw, err := bs.Create(ctx, WithMountFrom(canonicalRef)) + if bw != nil { + t.Fatal("unexpected blobwriter returned from Create call, should mount instead") + } + + ebm, ok := err.(distribution.ErrBlobMounted) + if !ok { t.Fatalf("unexpected error mounting layer: %v", err) } - if mountDesc != desc { - t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc) + if ebm.Descriptor != desc { + t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc) } // Test for existence. diff --git a/docs/storage/linkedblobstore.go b/docs/storage/linkedblobstore.go index 8b7f9f51..d7a9fd13 100644 --- a/docs/storage/linkedblobstore.go +++ b/docs/storage/linkedblobstore.go @@ -1,12 +1,14 @@ package storage import ( + "fmt" "net/http" "time" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/uuid" ) @@ -95,10 +97,58 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) return desc, lbs.linkBlob(ctx, desc) } +// CreateOptions is a collection of blob creation modifiers relevant to general +// blob storage intended to be configured by the BlobCreateOption.Apply method. +type CreateOptions struct { + Mount struct { + ShouldMount bool + From reference.Canonical + } +} + +type optionFunc func(interface{}) error + +func (f optionFunc) Apply(v interface{}) error { + return f(v) +} + +// WithMountFrom returns a BlobCreateOption which designates that the blob should be +// mounted from the given canonical reference. +func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { + return optionFunc(func(v interface{}) error { + opts, ok := v.(*CreateOptions) + if !ok { + return fmt.Errorf("unexpected options type: %T", v) + } + + opts.Mount.ShouldMount = true + opts.Mount.From = ref + + return nil + }) +} + // Writer begins a blob write session, returning a handle. -func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") + var opts CreateOptions + + for _, option := range options { + err := option.Apply(&opts) + if err != nil { + return nil, err + } + } + + if opts.Mount.ShouldMount { + desc, err := lbs.mount(ctx, opts.Mount.From.Name(), opts.Mount.From.Digest()) + if err == nil { + // Mount successful, no need to initiate an upload session + return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} + } + } + uuid := uuid.Generate().String() startedAt := time.Now().UTC() @@ -186,7 +236,7 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro return nil } -func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { +func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { repo, err := lbs.registry.Repository(ctx, sourceRepo) if err != nil { return distribution.Descriptor{}, err