Merge pull request #244 from endophage/cloudfront_refactor

registry/middleware, registry/storage, configuration: refactoring cloudfront + generic middlewares
This commit is contained in:
Stephen Day 2015-03-11 12:10:49 -07:00
commit 10881152ac
12 changed files with 263 additions and 249 deletions

View file

@ -110,7 +110,7 @@ func (uic userInfoContext) Value(key interface{}) interface{} {
} }
// InitFunc is the type of an AccessController factory function and is used // InitFunc is the type of an AccessController factory function and is used
// to register the contsructor for different AccesController backends. // to register the constructor for different AccesController backends.
type InitFunc func(options map[string]interface{}) (AccessController, error) type InitFunc func(options map[string]interface{}) (AccessController, error)
var accessControllers map[string]InitFunc var accessControllers map[string]InitFunc

View file

@ -13,9 +13,12 @@ import (
"github.com/docker/distribution/notifications" "github.com/docker/distribution/notifications"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth" "github.com/docker/distribution/registry/auth"
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory" "github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -41,8 +44,6 @@ type App struct {
sink notifications.Sink sink notifications.Sink
source notifications.SourceRecord source notifications.SourceRecord
} }
layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
} }
// Value intercepts calls context.Context.Value, returning the current app id, // Value intercepts calls context.Context.Value, returning the current app id,
@ -88,9 +89,19 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
// a health check. // a health check.
panic(err) panic(err)
} }
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
if err != nil {
panic(err)
}
app.configureEvents(&configuration) app.configureEvents(&configuration)
app.registry = storage.NewRegistryWithDriver(app.driver) app.registry = storage.NewRegistryWithDriver(app.driver)
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
if err != nil {
panic(err)
}
authType := configuration.Auth.Type() authType := configuration.Auth.Type()
if authType != "" { if authType != "" {
@ -101,16 +112,6 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
app.accessController = accessController app.accessController = accessController
} }
layerHandlerType := configuration.LayerHandler.Type()
if layerHandlerType != "" {
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver)
if err != nil {
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
}
app.layerHandler = lh
}
return app return app
} }
@ -249,6 +250,15 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
context.Repository = notifications.Listen( context.Repository = notifications.Listen(
repository, repository,
app.eventBridge(context, r)) app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"])
if err != nil {
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors.Push(v2.ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)
serveJSON(w, context.Errors)
return
}
} }
handler := dispatch(context, r) handler := dispatch(context, r)
@ -417,3 +427,40 @@ func appendAccessRecords(records []auth.Access, method string, repo string) []au
} }
return records return records
} }
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(registry distribution.Registry, middlewares []configuration.Middleware) (distribution.Registry, error) {
for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry)
if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
}
registry = rmw
}
return registry, nil
}
// applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository)
if err != nil {
return nil, err
}
repository = rmw
}
return repository, nil
}
// applyStorageMiddleware wraps a storage driver with the configured middlewares
func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
for _, mw := range middlewares {
smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver)
if err != nil {
return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
}
driver = smw
}
return driver, nil
}

View file

@ -62,17 +62,6 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
} }
return return
} }
defer layer.Close()
w.Header().Set("Docker-Content-Digest", lh.Digest.String()) layer.ServeHTTP(w, r)
if lh.layerHandler != nil {
handler, _ := lh.layerHandler.Resolve(layer)
if handler != nil {
handler.ServeHTTP(w, r)
return
}
}
http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer)
} }

View file

@ -0,0 +1,39 @@
package middleware
import (
"fmt"
"github.com/docker/distribution"
)
// InitFunc is the type of a RegistryMiddleware factory function and is
// used to register the constructor for different RegistryMiddleware backends.
type InitFunc func(registry distribution.Registry, options map[string]interface{}) (distribution.Registry, error)
var middlewares map[string]InitFunc
// Register is used to register an InitFunc for
// a RegistryMiddleware backend with the given name.
func Register(name string, initFunc InitFunc) error {
if middlewares == nil {
middlewares = make(map[string]InitFunc)
}
if _, exists := middlewares[name]; exists {
return fmt.Errorf("name already registered: %s", name)
}
middlewares[name] = initFunc
return nil
}
// Get constructs a RegistryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, registry distribution.Registry) (distribution.Registry, error) {
if middlewares != nil {
if initFunc, exists := middlewares[name]; exists {
return initFunc(registry, options)
}
}
return nil, fmt.Errorf("no registry middleware registered with name: %s", name)
}

View file

@ -0,0 +1,39 @@
package middleware
import (
"fmt"
"github.com/docker/distribution"
)
// InitFunc is the type of a RepositoryMiddleware factory function and is
// used to register the constructor for different RepositoryMiddleware backends.
type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error)
var middlewares map[string]InitFunc
// Register is used to register an InitFunc for
// a RepositoryMiddleware backend with the given name.
func Register(name string, initFunc InitFunc) error {
if middlewares == nil {
middlewares = make(map[string]InitFunc)
}
if _, exists := middlewares[name]; exists {
return fmt.Errorf("name already registered: %s", name)
}
middlewares[name] = initFunc
return nil
}
// Get constructs a RepositoryMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) {
if middlewares != nil {
if initFunc, exists := middlewares[name]; exists {
return initFunc(repository, options)
}
}
return nil, fmt.Errorf("no repository middleware registered with name: %s", name)
}

View file

@ -1,95 +0,0 @@
package storage
import (
"fmt"
"net/http"
"time"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
// delegateLayerHandler provides a simple implementation of layerHandler that
// simply issues HTTP Temporary Redirects to the URL provided by the
// storagedriver for a given Layer.
type delegateLayerHandler struct {
storageDriver storagedriver.StorageDriver
pathMapper *pathMapper
duration time.Duration
}
var _ LayerHandler = &delegateLayerHandler{}
func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) {
duration := 20 * time.Minute
d, ok := options["duration"]
if ok {
switch d := d.(type) {
case time.Duration:
duration = d
case string:
dur, err := time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("Invalid duration: %s", err)
}
duration = dur
}
}
return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil
}
// Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver.
func (lh *delegateLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) {
// TODO(bbland): This is just a sanity check to ensure that the
// storagedriver supports url generation. It would be nice if we didn't have
// to do this twice for non-GET requests.
layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"})
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect)
}), nil
}
// urlFor returns a download URL for the given layer, or the empty string if
// unsupported.
func (lh *delegateLayerHandler) urlFor(layer distribution.Layer, options map[string]interface{}) (string, error) {
// Crack open the layer to get at the layerStore
layerRd, ok := layer.(*layerReader)
if !ok {
// TODO(stevvooe): We probably want to find a better way to get at the
// underlying filesystem path for a given layer. Perhaps, the layer
// handler should have its own layer store but right now, it is not
// request scoped.
return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer)
}
if options == nil {
options = make(map[string]interface{})
}
options["expiry"] = time.Now().Add(lh.duration)
layerURL, err := lh.storageDriver.URLFor(layerRd.path, options)
if err != nil {
return "", err
}
return layerURL, nil
}
// init registers the delegate layerHandler backend.
func init() {
RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler))
}

View file

@ -1,34 +1,36 @@
package storage // Package middleware - cloudfront wrapper for storage libs
// N.B. currently only works with S3, not arbitrary sites
//
package middleware
import ( import (
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http"
"net/url" "net/url"
"time" "time"
"github.com/AdRoll/goamz/cloudfront" "github.com/AdRoll/goamz/cloudfront"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
) )
// cloudFrontLayerHandler provides an simple implementation of layerHandler that // cloudFrontStorageMiddleware provides an simple implementation of layerHandler that
// constructs temporary signed CloudFront URLs from the storagedriver layer URL, // constructs temporary signed CloudFront URLs from the storagedriver layer URL,
// then issues HTTP Temporary Redirects to this CloudFront content URL. // then issues HTTP Temporary Redirects to this CloudFront content URL.
type cloudFrontLayerHandler struct { type cloudFrontStorageMiddleware struct {
cloudfront *cloudfront.CloudFront storagedriver.StorageDriver
delegateLayerHandler *delegateLayerHandler cloudfront *cloudfront.CloudFront
duration time.Duration duration time.Duration
} }
var _ LayerHandler = &cloudFrontLayerHandler{} var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{}
// newCloudFrontLayerHandler constructs and returns a new CloudFront // newCloudFrontLayerHandler constructs and returns a new CloudFront
// LayerHandler implementation. // LayerHandler implementation.
// Required options: baseurl, privatekey, keypairid // Required options: baseurl, privatekey, keypairid
func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
base, ok := options["baseurl"] base, ok := options["baseurl"]
if !ok { if !ok {
return nil, fmt.Errorf("No baseurl provided") return nil, fmt.Errorf("No baseurl provided")
@ -68,12 +70,6 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option
return nil, err return nil, err
} }
lh, err := newDelegateLayerHandler(storageDriver, options)
if err != nil {
return nil, err
}
dlh := lh.(*delegateLayerHandler)
cf := cloudfront.New(baseURL, privateKey, keypairID) cf := cloudfront.New(baseURL, privateKey, keypairID)
duration := 20 * time.Minute duration := 20 * time.Minute
@ -91,33 +87,33 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option
} }
} }
return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil
} }
// Resolve returns an http.Handler which can serve the contents of the given // Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver. // Layer, or an error if not supported by the storagedriver.
func (lh *cloudFrontLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) { func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) {
layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil) // TODO(endophage): currently only supports S3
options["expiry"] = time.Now().Add(lh.duration)
layerURLStr, err := lh.StorageDriver.URLFor(path, options)
if err != nil { if err != nil {
return nil, err return "", err
} }
layerURL, err := url.Parse(layerURLStr) layerURL, err := url.Parse(layerURLStr)
if err != nil { if err != nil {
return nil, err return "", err
} }
cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration)) cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration))
if err != nil { if err != nil {
return nil, err return "", err
} }
return cfURL, nil
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect)
}), nil
} }
// init registers the cloudfront layerHandler backend. // init registers the cloudfront layerHandler backend.
func init() { func init() {
RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler)) storagemiddleware.Register("cloudfront", storagemiddleware.InitFunc(newCloudFrontStorageMiddleware))
} }

View file

@ -0,0 +1,39 @@
package storagemiddleware
import (
"fmt"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
// InitFunc is the type of a StorageMiddleware factory function and is
// used to register the constructor for different StorageMiddleware backends.
type InitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error)
var storageMiddlewares map[string]InitFunc
// Register is used to register an InitFunc for
// a StorageMiddleware backend with the given name.
func Register(name string, initFunc InitFunc) error {
if storageMiddlewares == nil {
storageMiddlewares = make(map[string]InitFunc)
}
if _, exists := storageMiddlewares[name]; exists {
return fmt.Errorf("name already registered: %s", name)
}
storageMiddlewares[name] = initFunc
return nil
}
// Get constructs a StorageMiddleware with the given options using the named backend.
func Get(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (storagedriver.StorageDriver, error) {
if storageMiddlewares != nil {
if initFunc, exists := storageMiddlewares[name]; exists {
return initFunc(storageDriver, options)
}
}
return nil, fmt.Errorf("no storage middleware registered with name: %s", name)
}

View file

@ -1,51 +0,0 @@
package storage
import (
"fmt"
"net/http"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
// LayerHandler provides middleware for serving the contents of a Layer.
type LayerHandler interface {
// Resolve returns an http.Handler which can serve the contents of a given
// Layer if possible, or nil and an error when unsupported. This may
// directly serve the contents of the layer or issue a redirect to another
// URL hosting the content.
Resolve(layer distribution.Layer) (http.Handler, error)
}
// LayerHandlerInitFunc is the type of a LayerHandler factory function and is
// used to register the contsructor for different LayerHandler backends.
type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error)
var layerHandlers map[string]LayerHandlerInitFunc
// RegisterLayerHandler is used to register an LayerHandlerInitFunc for
// a LayerHandler backend with the given name.
func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error {
if layerHandlers == nil {
layerHandlers = make(map[string]LayerHandlerInitFunc)
}
if _, exists := layerHandlers[name]; exists {
return fmt.Errorf("name already registered: %s", name)
}
layerHandlers[name] = initFunc
return nil
}
// GetLayerHandler constructs a LayerHandler
// with the given options using the named backend.
func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) {
if layerHandlers != nil {
if initFunc, exists := layerHandlers[name]; exists {
return initFunc(storageDriver, options)
}
}
return nil, fmt.Errorf("no layer handler registered with name: %s", name)
}

View file

@ -1,13 +1,14 @@
package storage package storage
import ( import (
"net/http"
"time" "time"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
) )
// layerReadSeeker implements Layer and provides facilities for reading and // layerReader implements Layer and provides facilities for reading and
// seeking. // seeking.
type layerReader struct { type layerReader struct {
fileReader fileReader
@ -17,19 +18,28 @@ type layerReader struct {
var _ distribution.Layer = &layerReader{} var _ distribution.Layer = &layerReader{}
func (lrs *layerReader) Digest() digest.Digest { func (lr *layerReader) Digest() digest.Digest {
return lrs.digest return lr.digest
} }
func (lrs *layerReader) Length() int64 { func (lr *layerReader) Length() int64 {
return lrs.size return lr.size
} }
func (lrs *layerReader) CreatedAt() time.Time { func (lr *layerReader) CreatedAt() time.Time {
return lrs.modtime return lr.modtime
} }
// Close the layer. Should be called when the resource is no longer needed. // Close the layer. Should be called when the resource is no longer needed.
func (lrs *layerReader) Close() error { func (lr *layerReader) Close() error {
return lrs.closeWithErr(distribution.ErrLayerClosed) return lr.closeWithErr(distribution.ErrLayerClosed)
}
func (lr *layerReader) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Docker-Content-Digest", lr.digest.String())
if url, err := lr.fileReader.driver.URLFor(lr.path, map[string]interface{}{}); err == nil {
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}
http.ServeContent(w, r, lr.digest.String(), lr.CreatedAt(), lr)
} }

View file

@ -138,7 +138,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
return nil, err return nil, err
} }
return &layerUploadController{ return &layerWriter{
layerStore: ls, layerStore: ls,
uuid: uuid, uuid: uuid,
startedAt: startedAt, startedAt: startedAt,

View file

@ -13,9 +13,11 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
// layerUploadController is used to control the various aspects of resumable var _ distribution.LayerUpload = &layerWriter{}
// layerWriter is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface. // layer upload. It implements the LayerUpload interface.
type layerUploadController struct { type layerWriter struct {
layerStore *layerStore layerStore *layerStore
uuid string uuid string
@ -26,65 +28,64 @@ type layerUploadController struct {
bufferedFileWriter bufferedFileWriter
} }
var _ distribution.LayerUpload = &layerUploadController{} var _ distribution.LayerUpload = &layerWriter{}
// UUID returns the identifier for this upload. // UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string { func (lw *layerWriter) UUID() string {
return luc.uuid return lw.uuid
} }
func (luc *layerUploadController) StartedAt() time.Time { func (lw *layerWriter) StartedAt() time.Time {
return luc.startedAt return lw.startedAt
} }
// Finish marks the upload as completed, returning a valid handle to the // Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the // uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the // contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>. // format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) { func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish")
err := luc.bufferedFileWriter.Close() if err := lw.bufferedFileWriter.Close(); err != nil {
return nil, err
}
canonical, err := lw.validateLayer(digest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
canonical, err := luc.validateLayer(digest) if err := lw.moveLayer(canonical); err != nil {
if err != nil {
return nil, err
}
if err := luc.moveLayer(canonical); err != nil {
// TODO(stevvooe): Cleanup? // TODO(stevvooe): Cleanup?
return nil, err return nil, err
} }
// Link the layer blob into the repository. // Link the layer blob into the repository.
if err := luc.linkLayer(canonical, digest); err != nil { if err := lw.linkLayer(canonical, digest); err != nil {
return nil, err return nil, err
} }
if err := luc.removeResources(); err != nil { if err := lw.removeResources(); err != nil {
return nil, err return nil, err
} }
return luc.layerStore.Fetch(canonical) return lw.layerStore.Fetch(canonical)
} }
// Cancel the layer upload process. // Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error { func (lw *layerWriter) Cancel() error {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel") ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel")
if err := luc.removeResources(); err != nil { if err := lw.removeResources(); err != nil {
return err return err
} }
luc.Close() lw.Close()
return nil return nil
} }
// validateLayer checks the layer data against the digest, returning an error // validateLayer checks the layer data against the digest, returning an error
// if it does not match. The canonical digest is returned. // if it does not match. The canonical digest is returned.
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
digestVerifier, err := digest.NewDigestVerifier(dgst) digestVerifier, err := digest.NewDigestVerifier(dgst)
if err != nil { if err != nil {
return "", err return "", err
@ -96,7 +97,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// then only have to fetch the difference. // then only have to fetch the difference.
// Read the file from the backend driver and validate it. // Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path) fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -125,8 +126,8 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// moveLayer moves the data into its final, hash-qualified destination, // moveLayer moves the data into its final, hash-qualified destination,
// identified by dgst. The layer should be validated before commencing the // identified by dgst. The layer should be validated before commencing the
// move. // move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{ blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{
digest: dgst, digest: dgst,
}) })
@ -135,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
} }
// Check for existence // Check for existence
if _, err := luc.driver.Stat(blobPath); err != nil { if _, err := lw.driver.Stat(blobPath); err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist. break // ensure that it doesn't exist.
@ -154,7 +155,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
// the size here and write a zero-length file to blobPath if this is the // the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length // case. For the most part, this should only ever happen with zero-length
// tars. // tars.
if _, err := luc.driver.Stat(luc.path); err != nil { if _, err := lw.driver.Stat(lw.path); err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above, // HACK(stevvooe): This is slightly dangerous: if we verify above,
@ -163,24 +164,24 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
// prevent this horrid thing, we employ the hack of only allowing // prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum. // to this happen for the zero tarsum.
if dgst == digest.DigestSha256EmptyTar { if dgst == digest.DigestSha256EmptyTar {
return luc.driver.PutContent(blobPath, []byte{}) return lw.driver.PutContent(blobPath, []byte{})
} }
// We let this fail during the move below. // We let this fail during the move below.
logrus. logrus.
WithField("upload.uuid", luc.UUID()). WithField("upload.uuid", lw.UUID()).
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest") WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
default: default:
return err // unrelated error return err // unrelated error
} }
} }
return luc.driver.Move(luc.path, blobPath) return lw.driver.Move(lw.path, blobPath)
} }
// linkLayer links a valid, written layer blob into the registry under the // linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller. // named repository for the upload controller.
func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error {
dgsts := append([]digest.Digest{canonical}, aliases...) dgsts := append([]digest.Digest{canonical}, aliases...)
// Don't make duplicate links. // Don't make duplicate links.
@ -192,8 +193,8 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
} }
seenDigests[dgst] = struct{}{} seenDigests[dgst] = struct{}{}
layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{ layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{
name: luc.layerStore.repository.Name(), name: lw.layerStore.repository.Name(),
digest: dgst, digest: dgst,
}) })
@ -201,7 +202,7 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
return err return err
} }
if err := luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
return err return err
} }
} }
@ -212,10 +213,10 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...
// removeResources should clean up all resources associated with the upload // removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the // instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned. // resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error { func (lw *layerWriter) removeResources() error {
dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{ dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{
name: luc.layerStore.repository.Name(), name: lw.layerStore.repository.Name(),
uuid: luc.uuid, uuid: lw.uuid,
}) })
if err != nil { if err != nil {
@ -226,7 +227,7 @@ func (luc *layerUploadController) removeResources() error {
// upload related files. // upload related files.
dirPath := path.Dir(dataPath) dirPath := path.Dir(dataPath)
if err := luc.driver.Delete(dirPath); err != nil { if err := lw.driver.Delete(dirPath); err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
break // already gone! break // already gone!