package storage import ( "net/http" "time" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/uuid" ) // linkPathFunc describes a function that can resolve a link based on the // repository name and digest. type linkPathFunc func(name string, dgst digest.Digest) (string, error) // linkedBlobStore provides a full BlobService that namespaces the blobs to a // given repository. Effectively, it manages the links in a given repository // that grant access to the global blob store. type linkedBlobStore struct { *blobStore blobServer distribution.BlobServer blobAccessController distribution.BlobDescriptorService repository distribution.Repository ctx context.Context // only to be used where context can't come through method args deleteEnabled bool resumableDigestEnabled bool // linkPathFns specifies one or more path functions allowing one to // control the repository blob link set to which the blob store // dispatches. This is required because manifest and layer blobs have not // yet been fully merged. At some point, this functionality should be // removed an the blob links folder should be merged. The first entry is // treated as the "canonical" link location and will be used for writes. linkPathFns []linkPathFunc } var _ distribution.BlobStore = &linkedBlobStore{} func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { return lbs.blobAccessController.Stat(ctx, dgst) } func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { canonical, err := lbs.Stat(ctx, dgst) // access check if err != nil { return nil, err } return lbs.blobStore.Get(ctx, canonical.Digest) } func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { canonical, err := lbs.Stat(ctx, dgst) // access check if err != nil { return nil, err } return lbs.blobStore.Open(ctx, canonical.Digest) } func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { canonical, err := lbs.Stat(ctx, dgst) // access check if err != nil { return err } if canonical.MediaType != "" { // Set the repository local content type. w.Header().Set("Content-Type", canonical.MediaType) } return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest) } func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { dgst := digest.FromBytes(p) // Place the data in the blob store first. desc, err := lbs.blobStore.Put(ctx, mediaType, p) if err != nil { context.GetLogger(ctx).Errorf("error putting into main store: %v", err) return distribution.Descriptor{}, err } if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil { return distribution.Descriptor{}, err } // TODO(stevvooe): Write out mediatype if incoming differs from what is // returned by Put above. Note that we should allow updates for a given // repository. return desc, lbs.linkBlob(ctx, desc) } // Writer begins a blob write session, returning a handle. func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer") uuid := uuid.Generate().String() startedAt := time.Now().UTC() path, err := pathFor(uploadDataPathSpec{ name: lbs.repository.Name(), id: uuid, }) if err != nil { return nil, err } startedAtPath, err := pathFor(uploadStartedAtPathSpec{ name: lbs.repository.Name(), id: uuid, }) if err != nil { return nil, err } // Write a startedat file for this upload if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { return nil, err } return lbs.newBlobUpload(ctx, uuid, path, startedAt) } func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { context.GetLogger(ctx).Debug("(*linkedBlobStore).Resume") startedAtPath, err := pathFor(uploadStartedAtPathSpec{ name: lbs.repository.Name(), id: id, }) if err != nil { return nil, err } startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath) if err != nil { switch err := err.(type) { case driver.PathNotFoundError: return nil, distribution.ErrBlobUploadUnknown default: return nil, err } } startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) if err != nil { return nil, err } path, err := pathFor(uploadDataPathSpec{ name: lbs.repository.Name(), id: id, }) if err != nil { return nil, err } return lbs.newBlobUpload(ctx, id, path, startedAt) } func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { if !lbs.deleteEnabled { return distribution.ErrUnsupported } // Ensure the blob is available for deletion _, err := lbs.blobAccessController.Stat(ctx, dgst) if err != nil { return err } err = lbs.blobAccessController.Clear(ctx, dgst) if err != nil { return err } return nil } // newBlobUpload allocates a new upload controller with the given state. func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { fw, err := newFileWriter(ctx, lbs.driver, path) if err != nil { return nil, err } bw := &blobWriter{ blobStore: lbs, id: uuid, startedAt: startedAt, digester: digest.Canonical.New(), bufferedFileWriter: *fw, resumableDigestEnabled: lbs.resumableDigestEnabled, } return bw, nil } // linkBlob links a valid, written blob into the registry under the named // repository for the upload controller. func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error { dgsts := append([]digest.Digest{canonical.Digest}, aliases...) // TODO(stevvooe): Need to write out mediatype for only canonical hash // since we don't care about the aliases. They are generally unused except // for tarsum but those versions don't care about mediatype. // Don't make duplicate links. seenDigests := make(map[digest.Digest]struct{}, len(dgsts)) // only use the first link linkPathFn := lbs.linkPathFns[0] for _, dgst := range dgsts { if _, seen := seenDigests[dgst]; seen { continue } seenDigests[dgst] = struct{}{} blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst) if err != nil { return err } if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil { return err } } return nil } type linkedBlobStatter struct { *blobStore repository distribution.Repository // linkPathFns specifies one or more path functions allowing one to // control the repository blob link set to which the blob store // dispatches. This is required because manifest and layer blobs have not // yet been fully merged. At some point, this functionality should be // removed an the blob links folder should be merged. The first entry is // treated as the "canonical" link location and will be used for writes. linkPathFns []linkPathFunc } var _ distribution.BlobDescriptorService = &linkedBlobStatter{} func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { var ( resolveErr error target digest.Digest ) // try the many link path functions until we get success or an error that // is not PathNotFoundError. for _, linkPathFn := range lbs.linkPathFns { var err error target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn) if err == nil { break // success! } switch err := err.(type) { case driver.PathNotFoundError: resolveErr = distribution.ErrBlobUnknown // move to the next linkPathFn, saving the error default: return distribution.Descriptor{}, err } } if resolveErr != nil { return distribution.Descriptor{}, resolveErr } if target != dgst { // Track when we are doing cross-digest domain lookups. ie, tarsum to sha256. context.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target) } // TODO(stevvooe): Look up repository local mediatype and replace that on // the returned descriptor. return lbs.blobStore.statter.Stat(ctx, target) } func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) { // clear any possible existence of a link described in linkPathFns for _, linkPathFn := range lbs.linkPathFns { blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst) if err != nil { return err } err = lbs.blobStore.driver.Delete(ctx, blobLinkPath) if err != nil { switch err := err.(type) { case driver.PathNotFoundError: continue // just ignore this error and continue default: return err } } } return nil } // resolveTargetWithFunc allows us to read a link to a resource with different // linkPathFuncs to let us try a few different paths before returning not // found. func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) { blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst) if err != nil { return "", err } return lbs.blobStore.readlink(ctx, blobLinkPath) } func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { // The canonical descriptor for a blob is set at the commit phase of upload return nil } // blobLinkPath provides the path to the blob link, also known as layers. func blobLinkPath(name string, dgst digest.Digest) (string, error) { return pathFor(layerLinkPathSpec{name: name, digest: dgst}) } // manifestRevisionLinkPath provides the path to the manifest revision link. func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) { return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst}) }