diff --git a/blobs.go b/blobs.go index bd5f0bc9..ce43ea2e 100644 --- a/blobs.go +++ b/blobs.go @@ -9,6 +9,7 @@ import ( "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" ) var ( @@ -40,6 +41,18 @@ func (err ErrBlobInvalidDigest) Error() string { err.Digest, err.Reason) } +// ErrBlobMounted returned when a blob is mounted from another repository +// instead of initiating an upload session. +type ErrBlobMounted struct { + From reference.Canonical + Descriptor Descriptor +} + +func (err ErrBlobMounted) Error() string { + return fmt.Sprintf("blob mounted from: %v to: %v", + err.From, err.Descriptor) +} + // Descriptor describes targeted content. Used in conjunction with a blob // store, a descriptor can be used to fetch, store and target any kind of // blob. The struct also describes the wire protocol format. Fields should @@ -151,14 +164,19 @@ type BlobIngester interface { // returned handle can be written to and later resumed using an opaque // identifier. With this approach, one can Close and Resume a BlobWriter // multiple times until the BlobWriter is committed or cancelled. - Create(ctx context.Context) (BlobWriter, error) + Create(ctx context.Context, options ...BlobCreateOption) (BlobWriter, error) // Resume attempts to resume a write to a blob, identified by an id. Resume(ctx context.Context, id string) (BlobWriter, error) +} - // Mount adds a blob to this service from another source repository, - // identified by a digest. - Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error) +// BlobCreateOption is a general extensible function argument for blob creation +// methods. A BlobIngester may choose to honor any or none of the given +// BlobCreateOptions, which can be specific to the implementation of the +// BlobIngester receiving them. +// TODO (brianbland): unify this with ManifestServiceOption in the future +type BlobCreateOption interface { + Apply(interface{}) error } // BlobWriter provides a handle for inserting data into a blob store. diff --git a/docs/spec/api.md b/docs/spec/api.md index 042cbd8f..d46ed93c 100644 --- a/docs/spec/api.md +++ b/docs/spec/api.md @@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may ignore the value but if it is used, the client should verify the value against the uploaded blob data. +If a mount fails due to invalid repository or digest arguments, the registry +will fall back to the standard upload behavior and return a `202 Accepted` with +the upload URL in the `Location` header: + +``` +202 Accepted +Location: /v2//blobs/uploads/ +Range: bytes=0- +Content-Length: 0 +Docker-Upload-UUID: +``` + +This behavior is consistent with older versions of the registry, which do not +recognize the repository mount query parameters. + +Note: a client may issue a HEAD request to check existence of a blob in a source +repository to distinguish between the registry not supporting blob mounts and +the blob not existing in the expected repository. + ##### Errors If an 502, 503 or 504 error is received, the client should assume that the diff --git a/docs/spec/api.md.tmpl b/docs/spec/api.md.tmpl index 375d50c1..0419666b 100644 --- a/docs/spec/api.md.tmpl +++ b/docs/spec/api.md.tmpl @@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may ignore the value but if it is used, the client should verify the value against the uploaded blob data. +If a mount fails due to invalid repository or digest arguments, the registry +will fall back to the standard upload behavior and return a `202 Accepted` with +the upload URL in the `Location` header: + +``` +202 Accepted +Location: /v2//blobs/uploads/ +Range: bytes=0- +Content-Length: 0 +Docker-Upload-UUID: +``` + +This behavior is consistent with older versions of the registry, which do not +recognize the repository mount query parameters. + +Note: a client may issue a HEAD request to check existence of a blob in a source +repository to distinguish between the registry not supporting blob mounts and +the blob not existing in the expected repository. + ##### Errors If an 502, 503 or 504 error is received, the client should assume that the diff --git a/manifest/schema1/config_builder_test.go b/manifest/schema1/config_builder_test.go index 3e219141..f8d41c7c 100644 --- a/manifest/schema1/config_builder_test.go +++ b/manifest/schema1/config_builder_test.go @@ -42,7 +42,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte) return d, nil } -func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { panic("not implemented") } @@ -50,10 +50,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution. panic("not implemented") } -func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - panic("not implemented") -} - func TestEmptyTar(t *testing.T) { // Confirm that gzippedEmptyTar expands to 1024 NULL bytes. var decompressed [2048]byte diff --git a/manifest/schema2/builder_test.go b/manifest/schema2/builder_test.go index 53175e45..6806033d 100644 --- a/manifest/schema2/builder_test.go +++ b/manifest/schema2/builder_test.go @@ -38,7 +38,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte) return d, nil } -func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { panic("not implemented") } @@ -46,10 +46,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution. panic("not implemented") } -func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - panic("not implemented") -} - func TestBuilder(t *testing.T) { imgJSON := []byte(`{ "architecture": "amd64", diff --git a/notifications/listener.go b/notifications/listener.go index d4e1abe7..21857edf 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -160,8 +160,15 @@ func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []b return desc, err } -func (bsl *blobServiceListener) Create(ctx context.Context) (distribution.BlobWriter, error) { - wr, err := bsl.BlobStore.Create(ctx) +func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + wr, err := bsl.BlobStore.Create(ctx, options...) + switch err := err.(type) { + case distribution.ErrBlobMounted: + if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), err.Descriptor, err.From.Name()); err != nil { + context.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err) + } + return nil, err + } return bsl.decorateWriter(wr), err } @@ -170,17 +177,6 @@ func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribu return bsl.decorateWriter(wr), err } -func (bsl *blobServiceListener) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - desc, err := bsl.BlobStore.Mount(ctx, sourceRepo, dgst) - if err == nil { - if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), desc, sourceRepo); err != nil { - context.GetLogger(ctx).Errorf("error dispatching layer mount to listener: %v", err) - } - } - - return desc, err -} - func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter { return &blobWriterListener{ BlobWriter: wr, diff --git a/registry/client/repository.go b/registry/client/repository.go index 8f30b4f1..c2aca03f 100644 --- a/registry/client/repository.go +++ b/registry/client/repository.go @@ -10,7 +10,6 @@ import ( "net/http" "net/url" "strconv" - "sync" "time" "github.com/docker/distribution" @@ -19,6 +18,7 @@ import ( "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client/transport" + "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/cache/memory" ) @@ -500,9 +500,6 @@ type blobs struct { statter distribution.BlobDescriptorService distribution.BlobDeleter - - cacheLock sync.Mutex - cachedBlobUpload distribution.BlobWriter } func sanitizeLocation(location, base string) (string, error) { @@ -576,18 +573,23 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut return writer.Commit(ctx, desc) } -func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { - bs.cacheLock.Lock() - if bs.cachedBlobUpload != nil { - upload := bs.cachedBlobUpload - bs.cachedBlobUpload = nil - bs.cacheLock.Unlock() +func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + var opts storage.CreateOptions - return upload, nil + for _, option := range options { + err := option.Apply(&opts) + if err != nil { + return nil, err + } } - bs.cacheLock.Unlock() - u, err := bs.ub.BuildBlobUploadURL(bs.name) + var values []url.Values + + if opts.Mount.ShouldMount { + values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}}) + } + + u, err := bs.ub.BuildBlobUploadURL(bs.name, values...) if err != nil { return nil, err } @@ -598,7 +600,14 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { } defer resp.Body.Close() - if SuccessStatus(resp.StatusCode) { + switch resp.StatusCode { + case http.StatusCreated: + desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest()) + if err != nil { + return nil, err + } + return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} + case http.StatusAccepted: // TODO(dmcgowan): Check for invalid UUID uuid := resp.Header.Get("Docker-Upload-UUID") location, err := sanitizeLocation(resp.Header.Get("Location"), u) @@ -613,53 +622,15 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { startedAt: time.Now(), location: location, }, nil + default: + return nil, HandleErrorResponse(resp) } - return nil, HandleErrorResponse(resp) } func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { panic("not implemented") } -func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}}) - if err != nil { - return distribution.Descriptor{}, err - } - - resp, err := bs.client.Post(u, "", nil) - if err != nil { - return distribution.Descriptor{}, err - } - defer resp.Body.Close() - - switch resp.StatusCode { - case http.StatusCreated: - return bs.Stat(ctx, dgst) - case http.StatusAccepted: - // Triggered a blob upload (legacy behavior), so cache the creation info - uuid := resp.Header.Get("Docker-Upload-UUID") - location, err := sanitizeLocation(resp.Header.Get("Location"), u) - if err != nil { - return distribution.Descriptor{}, err - } - - bs.cacheLock.Lock() - bs.cachedBlobUpload = &httpBlobUpload{ - statter: bs.statter, - client: bs.client, - uuid: uuid, - startedAt: time.Now(), - location: location, - } - bs.cacheLock.Unlock() - - return distribution.Descriptor{}, HandleErrorResponse(resp) - default: - return distribution.Descriptor{}, HandleErrorResponse(resp) - } -} - func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error { return bs.statter.Clear(ctx, dgst) } diff --git a/registry/client/repository_test.go b/registry/client/repository_test.go index 8a7fc1c9..811ab235 100644 --- a/registry/client/repository_test.go +++ b/registry/client/repository_test.go @@ -18,7 +18,9 @@ import ( "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/testutil" "github.com/docker/distribution/uuid" "github.com/docker/libtrust" @@ -471,6 +473,16 @@ func TestBlobMount(t *testing.T) { var m testutil.RequestResponseMap repo := "test.example.com/uploadrepo" sourceRepo := "test.example.com/sourcerepo" + + namedRef, err := reference.ParseNamed(sourceRepo) + if err != nil { + t.Fatal(err) + } + canonicalRef, err := reference.WithDigest(namedRef, dgst) + if err != nil { + t.Fatal(err) + } + m = append(m, testutil.RequestResponseMapping{ Request: testutil.Request{ Method: "POST", @@ -511,13 +523,20 @@ func TestBlobMount(t *testing.T) { l := r.Blobs(ctx) - stat, err := l.Mount(ctx, sourceRepo, dgst) - if err != nil { - t.Fatal(err) + bw, err := l.Create(ctx, storage.WithMountFrom(canonicalRef)) + if bw != nil { + t.Fatalf("Expected blob writer to be nil, was %v", bw) } - if stat.Digest != dgst { - t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst) + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + if ebm.From.Digest() != dgst { + t.Fatalf("Unexpected digest: %s, expected %s", ebm.From.Digest(), dgst) + } + if ebm.From.Name() != sourceRepo { + t.Fatalf("Unexpected from: %s, expected %s", ebm.From.Name(), sourceRepo) + } + } else { + t.Fatalf("Unexpected error: %v, expected an ErrBlobMounted", err) } } diff --git a/registry/handlers/blobupload.go b/registry/handlers/blobupload.go index c5638c83..0f325184 100644 --- a/registry/handlers/blobupload.go +++ b/registry/handlers/blobupload.go @@ -9,8 +9,10 @@ import ( "github.com/docker/distribution" ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) @@ -118,19 +120,27 @@ type blobUploadHandler struct { // StartBlobUpload begins the blob upload process and allocates a server-side // blob writer session, optionally mounting the blob from a separate repository. func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) { + var options []distribution.BlobCreateOption + fromRepo := r.FormValue("from") mountDigest := r.FormValue("mount") if mountDigest != "" && fromRepo != "" { - buh.mountBlob(w, fromRepo, mountDigest) - return + opt, err := buh.createBlobMountOption(fromRepo, mountDigest) + if err != nil { + options = append(options, opt) + } } blobs := buh.Repository.Blobs(buh) - upload, err := blobs.Create(buh) + upload, err := blobs.Create(buh, options...) if err != nil { - if err == distribution.ErrUnsupported { + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil { + buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) + } + } else if err == distribution.ErrUnsupported { buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported) } else { buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) @@ -339,27 +349,23 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http. // mountBlob attempts to mount a blob from another repository by its digest. If // successful, the blob is linked into the blob store and 201 Created is // returned with the canonical url of the blob. -func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) { +func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string) (distribution.BlobCreateOption, error) { dgst, err := digest.ParseDigest(mountDigest) if err != nil { - buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err)) - return + return nil, err } - blobs := buh.Repository.Blobs(buh) - desc, err := blobs.Mount(buh, fromRepo, dgst) + ref, err := reference.ParseNamed(fromRepo) if err != nil { - if err == distribution.ErrBlobUnknown { - buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst)) - } else { - buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) - } - return + return nil, err } - if err := buh.writeBlobCreatedHeaders(w, desc); err != nil { - buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) - return + + canonical, err := reference.WithDigest(ref, dgst) + if err != nil { + return nil, err } + + return storage.WithMountFrom(canonical), nil } // writeBlobCreatedHeaders writes the standard headers describing a newly diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index ca39f9f8..41b76e8e 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -161,7 +161,7 @@ func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) return distribution.Descriptor{}, distribution.ErrUnsupported } -func (pbs *proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { return nil, distribution.ErrUnsupported } diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index 5c5015a0..7702771c 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -42,12 +42,12 @@ func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, return sbs.blobs.Get(ctx, dgst) } -func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { sbsMu.Lock() sbs.stats["create"]++ sbsMu.Unlock() - return sbs.blobs.Create(ctx) + return sbs.blobs.Create(ctx, options...) } func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { @@ -58,14 +58,6 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B return sbs.blobs.Resume(ctx, id) } -func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { - sbsMu.Lock() - sbs.stats["mount"]++ - sbsMu.Unlock() - - return sbs.blobs.Mount(ctx, sourceRepo, dgst) -} - func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { sbsMu.Lock() sbs.stats["open"]++ diff --git a/registry/storage/blob_test.go b/registry/storage/blob_test.go index b89814c7..e1eacc00 100644 --- a/registry/storage/blob_test.go +++ b/registry/storage/blob_test.go @@ -12,6 +12,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/cache/memory" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" @@ -377,13 +378,27 @@ func TestBlobMount(t *testing.T) { t.Fatalf("unexpected non-error stating unmounted blob: %v", desc) } - mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest) + namedRef, err := reference.ParseNamed(sourceRepository.Name()) if err != nil { + t.Fatal(err) + } + canonicalRef, err := reference.WithDigest(namedRef, desc.Digest) + if err != nil { + t.Fatal(err) + } + + bw, err := bs.Create(ctx, WithMountFrom(canonicalRef)) + if bw != nil { + t.Fatal("unexpected blobwriter returned from Create call, should mount instead") + } + + ebm, ok := err.(distribution.ErrBlobMounted) + if !ok { t.Fatalf("unexpected error mounting layer: %v", err) } - if mountDesc != desc { - t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc) + if ebm.Descriptor != desc { + t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc) } // Test for existence. diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index 8b7f9f51..d7a9fd13 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -1,12 +1,14 @@ package storage import ( + "fmt" "net/http" "time" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/uuid" ) @@ -95,10 +97,58 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) return desc, lbs.linkBlob(ctx, desc) } +// CreateOptions is a collection of blob creation modifiers relevant to general +// blob storage intended to be configured by the BlobCreateOption.Apply method. +type CreateOptions struct { + Mount struct { + ShouldMount bool + From reference.Canonical + } +} + +type optionFunc func(interface{}) error + +func (f optionFunc) Apply(v interface{}) error { + return f(v) +} + +// WithMountFrom returns a BlobCreateOption which designates that the blob should be +// mounted from the given canonical reference. +func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption { + return optionFunc(func(v interface{}) error { + opts, ok := v.(*CreateOptions) + if !ok { + return fmt.Errorf("unexpected options type: %T", v) + } + + opts.Mount.ShouldMount = true + opts.Mount.From = ref + + return nil + }) +} + // Writer begins a blob write session, returning a handle. -func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { +func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") + var opts CreateOptions + + for _, option := range options { + err := option.Apply(&opts) + if err != nil { + return nil, err + } + } + + if opts.Mount.ShouldMount { + desc, err := lbs.mount(ctx, opts.Mount.From.Name(), opts.Mount.From.Digest()) + if err == nil { + // Mount successful, no need to initiate an upload session + return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc} + } + } + uuid := uuid.Generate().String() startedAt := time.Now().UTC() @@ -186,7 +236,7 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro return nil } -func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { +func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { repo, err := lbs.registry.Repository(ctx, sourceRepo) if err != nil { return distribution.Descriptor{}, err