diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 79f45ab0..fa5305eb 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -17,6 +17,7 @@ import ( "github.com/docker/distribution/registry/handlers" _ "github.com/docker/distribution/registry/storage/driver/filesystem" _ "github.com/docker/distribution/registry/storage/driver/inmemory" + _ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" _ "github.com/docker/distribution/registry/storage/driver/s3" "github.com/docker/distribution/version" gorhandlers "github.com/gorilla/handlers" diff --git a/configuration/configuration.go b/configuration/configuration.go index 8ba401c6..6c03b27f 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -26,8 +26,8 @@ type Configuration struct { // used to gate requests. Auth Auth `yaml:"auth,omitempty"` - // LayerHandler specifies a middleware for serving image layers. - LayerHandler LayerHandler `yaml:"layerhandler,omitempty"` + // Middleware lists all middlewares to be used by the registry. + Middleware map[string][]Middleware `yaml:"middleware,omitempty"` // Reporting is the configuration for error reporting Reporting Reporting `yaml:"reporting,omitempty"` @@ -295,60 +295,14 @@ type NewRelicReporting struct { Name string `yaml:"name,omitempty"` } -// LayerHandler defines the configuration for middleware layer serving -type LayerHandler map[string]Parameters - -// Type returns the layerhandler type -func (layerHandler LayerHandler) Type() string { - // Return only key in this map - for k := range layerHandler { - return k - } - return "" -} - -// Parameters returns the Parameters map for a LayerHandler configuration -func (layerHandler LayerHandler) Parameters() Parameters { - return layerHandler[layerHandler.Type()] -} - -// UnmarshalYAML implements the yaml.Unmarshaler interface -// Unmarshals a single item map into a Storage or a string into a Storage type with no parameters -func (layerHandler *LayerHandler) UnmarshalYAML(unmarshal func(interface{}) error) error { - var storageMap map[string]Parameters - err := unmarshal(&storageMap) - if err == nil { - if len(storageMap) > 1 { - types := make([]string, 0, len(storageMap)) - for k := range storageMap { - types = append(types, k) - } - return fmt.Errorf("Must provide exactly one layerhandler type. Provided: %v", types) - } - *layerHandler = storageMap - return nil - } - - var storageType string - err = unmarshal(&storageType) - if err == nil { - *layerHandler = LayerHandler{storageType: Parameters{}} - return nil - } - - return err -} - -// MarshalYAML implements the yaml.Marshaler interface -func (layerHandler LayerHandler) MarshalYAML() (interface{}, error) { - if layerHandler.Parameters() == nil { - t := layerHandler.Type() - if t == "" { - return nil, nil - } - return t, nil - } - return map[string]Parameters(layerHandler), nil +// Middleware configures named middlewares to be applied at injection points. +type Middleware struct { + // Name the middleware registers itself as + Name string `yaml:"name"` + // Flag to disable middleware easily + Disabled bool `yaml:"disabled,omitempty"` + // Map of parameters that will be passed to the middleware's initialization function + Options Parameters `yaml:"options"` } // Parse parses an input configuration yaml document into a Configuration struct diff --git a/registry.go b/registry.go index c8ac9c10..359c80ac 100644 --- a/registry.go +++ b/registry.go @@ -2,6 +2,7 @@ package distribution import ( "io" + "net/http" "time" "github.com/docker/distribution/digest" @@ -106,6 +107,11 @@ type Layer interface { // CreatedAt returns the time this layer was created. CreatedAt() time.Time + + // ServeHTTP allows a layer to serve itself, whether by providing + // a redirect directly to the content, or by serving the content + // itself + ServeHTTP(w http.ResponseWriter, r *http.Request) } // LayerUpload provides a handle for working with in-progress uploads. diff --git a/registry/auth/auth.go b/registry/auth/auth.go index cd6ee096..a8499342 100644 --- a/registry/auth/auth.go +++ b/registry/auth/auth.go @@ -110,7 +110,7 @@ func (uic userInfoContext) Value(key interface{}) interface{} { } // InitFunc is the type of an AccessController factory function and is used -// to register the contsructor for different AccesController backends. +// to register the constructor for different AccesController backends. type InitFunc func(options map[string]interface{}) (AccessController, error) var accessControllers map[string]InitFunc diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 4d860cc4..1b5effbc 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -13,9 +13,12 @@ import ( "github.com/docker/distribution/notifications" "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/auth" + registrymiddleware "github.com/docker/distribution/registry/middleware/registry" + repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" "github.com/docker/distribution/registry/storage" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/factory" + storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" "github.com/gorilla/mux" "golang.org/x/net/context" ) @@ -41,8 +44,6 @@ type App struct { sink notifications.Sink source notifications.SourceRecord } - - layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider } // Value intercepts calls context.Context.Value, returning the current app id, @@ -88,9 +89,19 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App // a health check. panic(err) } + app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"]) + if err != nil { + panic(err) + } app.configureEvents(&configuration) + app.registry = storage.NewRegistryWithDriver(app.driver) + app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) + if err != nil { + panic(err) + } + authType := configuration.Auth.Type() if authType != "" { @@ -101,16 +112,6 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App app.accessController = accessController } - layerHandlerType := configuration.LayerHandler.Type() - - if layerHandlerType != "" { - lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) - if err != nil { - panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) - } - app.layerHandler = lh - } - return app } @@ -249,6 +250,15 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { context.Repository = notifications.Listen( repository, app.eventBridge(context, r)) + + context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"]) + if err != nil { + ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err) + context.Errors.Push(v2.ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + serveJSON(w, context.Errors) + return + } } handler := dispatch(context, r) @@ -417,3 +427,40 @@ func appendAccessRecords(records []auth.Access, method string, repo string) []au } return records } + +// applyRegistryMiddleware wraps a registry instance with the configured middlewares +func applyRegistryMiddleware(registry distribution.Registry, middlewares []configuration.Middleware) (distribution.Registry, error) { + for _, mw := range middlewares { + rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry) + if err != nil { + return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) + } + registry = rmw + } + return registry, nil + +} + +// applyRepoMiddleware wraps a repository with the configured middlewares +func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { + for _, mw := range middlewares { + rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository) + if err != nil { + return nil, err + } + repository = rmw + } + return repository, nil +} + +// applyStorageMiddleware wraps a storage driver with the configured middlewares +func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) { + for _, mw := range middlewares { + smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver) + if err != nil { + return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err) + } + driver = smw + } + return driver, nil +} diff --git a/registry/handlers/layer.go b/registry/handlers/layer.go index 913002e0..ae73aee0 100644 --- a/registry/handlers/layer.go +++ b/registry/handlers/layer.go @@ -62,17 +62,6 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { } return } - defer layer.Close() - w.Header().Set("Docker-Content-Digest", lh.Digest.String()) - - if lh.layerHandler != nil { - handler, _ := lh.layerHandler.Resolve(layer) - if handler != nil { - handler.ServeHTTP(w, r) - return - } - } - - http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) + layer.ServeHTTP(w, r) } diff --git a/registry/middleware/registry/middleware.go b/registry/middleware/registry/middleware.go new file mode 100644 index 00000000..d3e88810 --- /dev/null +++ b/registry/middleware/registry/middleware.go @@ -0,0 +1,39 @@ +package middleware + +import ( + "fmt" + + "github.com/docker/distribution" +) + +// InitFunc is the type of a RegistryMiddleware factory function and is +// used to register the constructor for different RegistryMiddleware backends. +type InitFunc func(registry distribution.Registry, options map[string]interface{}) (distribution.Registry, error) + +var middlewares map[string]InitFunc + +// Register is used to register an InitFunc for +// a RegistryMiddleware backend with the given name. +func Register(name string, initFunc InitFunc) error { + if middlewares == nil { + middlewares = make(map[string]InitFunc) + } + if _, exists := middlewares[name]; exists { + return fmt.Errorf("name already registered: %s", name) + } + + middlewares[name] = initFunc + + return nil +} + +// Get constructs a RegistryMiddleware with the given options using the named backend. +func Get(name string, options map[string]interface{}, registry distribution.Registry) (distribution.Registry, error) { + if middlewares != nil { + if initFunc, exists := middlewares[name]; exists { + return initFunc(registry, options) + } + } + + return nil, fmt.Errorf("no registry middleware registered with name: %s", name) +} diff --git a/registry/middleware/repository/middleware.go b/registry/middleware/repository/middleware.go new file mode 100644 index 00000000..d6330fc4 --- /dev/null +++ b/registry/middleware/repository/middleware.go @@ -0,0 +1,39 @@ +package middleware + +import ( + "fmt" + + "github.com/docker/distribution" +) + +// InitFunc is the type of a RepositoryMiddleware factory function and is +// used to register the constructor for different RepositoryMiddleware backends. +type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) + +var middlewares map[string]InitFunc + +// Register is used to register an InitFunc for +// a RepositoryMiddleware backend with the given name. +func Register(name string, initFunc InitFunc) error { + if middlewares == nil { + middlewares = make(map[string]InitFunc) + } + if _, exists := middlewares[name]; exists { + return fmt.Errorf("name already registered: %s", name) + } + + middlewares[name] = initFunc + + return nil +} + +// Get constructs a RepositoryMiddleware with the given options using the named backend. +func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { + if middlewares != nil { + if initFunc, exists := middlewares[name]; exists { + return initFunc(repository, options) + } + } + + return nil, fmt.Errorf("no repository middleware registered with name: %s", name) +} diff --git a/registry/storage/delegatelayerhandler.go b/registry/storage/delegatelayerhandler.go deleted file mode 100644 index 62b08b22..00000000 --- a/registry/storage/delegatelayerhandler.go +++ /dev/null @@ -1,95 +0,0 @@ -package storage - -import ( - "fmt" - "net/http" - "time" - - "github.com/docker/distribution" - storagedriver "github.com/docker/distribution/registry/storage/driver" -) - -// delegateLayerHandler provides a simple implementation of layerHandler that -// simply issues HTTP Temporary Redirects to the URL provided by the -// storagedriver for a given Layer. -type delegateLayerHandler struct { - storageDriver storagedriver.StorageDriver - pathMapper *pathMapper - duration time.Duration -} - -var _ LayerHandler = &delegateLayerHandler{} - -func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { - duration := 20 * time.Minute - d, ok := options["duration"] - if ok { - switch d := d.(type) { - case time.Duration: - duration = d - case string: - dur, err := time.ParseDuration(d) - if err != nil { - return nil, fmt.Errorf("Invalid duration: %s", err) - } - duration = dur - } - } - - return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil -} - -// Resolve returns an http.Handler which can serve the contents of the given -// Layer, or an error if not supported by the storagedriver. -func (lh *delegateLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) { - // TODO(bbland): This is just a sanity check to ensure that the - // storagedriver supports url generation. It would be nice if we didn't have - // to do this twice for non-GET requests. - layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"}) - if err != nil { - return nil, err - } - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { - layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method}) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect) - }), nil -} - -// urlFor returns a download URL for the given layer, or the empty string if -// unsupported. -func (lh *delegateLayerHandler) urlFor(layer distribution.Layer, options map[string]interface{}) (string, error) { - // Crack open the layer to get at the layerStore - layerRd, ok := layer.(*layerReader) - if !ok { - // TODO(stevvooe): We probably want to find a better way to get at the - // underlying filesystem path for a given layer. Perhaps, the layer - // handler should have its own layer store but right now, it is not - // request scoped. - return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer) - } - - if options == nil { - options = make(map[string]interface{}) - } - options["expiry"] = time.Now().Add(lh.duration) - - layerURL, err := lh.storageDriver.URLFor(layerRd.path, options) - if err != nil { - return "", err - } - - return layerURL, nil -} - -// init registers the delegate layerHandler backend. -func init() { - RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler)) -} diff --git a/registry/storage/cloudfrontlayerhandler.go b/registry/storage/driver/middleware/cloudfront/middleware.go similarity index 64% rename from registry/storage/cloudfrontlayerhandler.go rename to registry/storage/driver/middleware/cloudfront/middleware.go index 82bc313d..2d155312 100644 --- a/registry/storage/cloudfrontlayerhandler.go +++ b/registry/storage/driver/middleware/cloudfront/middleware.go @@ -1,34 +1,36 @@ -package storage +// Package middleware - cloudfront wrapper for storage libs +// N.B. currently only works with S3, not arbitrary sites +// +package middleware import ( "crypto/x509" "encoding/pem" "fmt" "io/ioutil" - "net/http" "net/url" "time" "github.com/AdRoll/goamz/cloudfront" - "github.com/docker/distribution" storagedriver "github.com/docker/distribution/registry/storage/driver" + storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" ) -// cloudFrontLayerHandler provides an simple implementation of layerHandler that +// cloudFrontStorageMiddleware provides an simple implementation of layerHandler that // constructs temporary signed CloudFront URLs from the storagedriver layer URL, // then issues HTTP Temporary Redirects to this CloudFront content URL. -type cloudFrontLayerHandler struct { - cloudfront *cloudfront.CloudFront - delegateLayerHandler *delegateLayerHandler - duration time.Duration +type cloudFrontStorageMiddleware struct { + storagedriver.StorageDriver + cloudfront *cloudfront.CloudFront + duration time.Duration } -var _ LayerHandler = &cloudFrontLayerHandler{} +var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{} // newCloudFrontLayerHandler constructs and returns a new CloudFront // LayerHandler implementation. // Required options: baseurl, privatekey, keypairid -func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { +func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) { base, ok := options["baseurl"] if !ok { return nil, fmt.Errorf("No baseurl provided") @@ -68,12 +70,6 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option return nil, err } - lh, err := newDelegateLayerHandler(storageDriver, options) - if err != nil { - return nil, err - } - dlh := lh.(*delegateLayerHandler) - cf := cloudfront.New(baseURL, privateKey, keypairID) duration := 20 * time.Minute @@ -91,33 +87,33 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option } } - return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil + return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil } // Resolve returns an http.Handler which can serve the contents of the given // Layer, or an error if not supported by the storagedriver. -func (lh *cloudFrontLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) { - layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil) +func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) { + // TODO(endophage): currently only supports S3 + options["expiry"] = time.Now().Add(lh.duration) + + layerURLStr, err := lh.StorageDriver.URLFor(path, options) if err != nil { - return nil, err + return "", err } layerURL, err := url.Parse(layerURLStr) if err != nil { - return nil, err + return "", err } cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration)) if err != nil { - return nil, err + return "", err } - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect) - }), nil + return cfURL, nil } // init registers the cloudfront layerHandler backend. func init() { - RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler)) + storagemiddleware.Register("cloudfront", storagemiddleware.InitFunc(newCloudFrontStorageMiddleware)) } diff --git a/registry/storage/driver/middleware/storagemiddleware.go b/registry/storage/driver/middleware/storagemiddleware.go new file mode 100644 index 00000000..7e40a8dd --- /dev/null +++ b/registry/storage/driver/middleware/storagemiddleware.go @@ -0,0 +1,39 @@ +package storagemiddleware + +import ( + "fmt" + + storagedriver "github.com/docker/distribution/registry/storage/driver" +) + +// InitFunc is the type of a StorageMiddleware factory function and is +// used to register the constructor for different StorageMiddleware backends. +type InitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) + +var storageMiddlewares map[string]InitFunc + +// Register is used to register an InitFunc for +// a StorageMiddleware backend with the given name. +func Register(name string, initFunc InitFunc) error { + if storageMiddlewares == nil { + storageMiddlewares = make(map[string]InitFunc) + } + if _, exists := storageMiddlewares[name]; exists { + return fmt.Errorf("name already registered: %s", name) + } + + storageMiddlewares[name] = initFunc + + return nil +} + +// Get constructs a StorageMiddleware with the given options using the named backend. +func Get(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (storagedriver.StorageDriver, error) { + if storageMiddlewares != nil { + if initFunc, exists := storageMiddlewares[name]; exists { + return initFunc(storageDriver, options) + } + } + + return nil, fmt.Errorf("no storage middleware registered with name: %s", name) +} diff --git a/registry/storage/layerhandler.go b/registry/storage/layerhandler.go deleted file mode 100644 index b03bc250..00000000 --- a/registry/storage/layerhandler.go +++ /dev/null @@ -1,51 +0,0 @@ -package storage - -import ( - "fmt" - "net/http" - - "github.com/docker/distribution" - storagedriver "github.com/docker/distribution/registry/storage/driver" -) - -// LayerHandler provides middleware for serving the contents of a Layer. -type LayerHandler interface { - // Resolve returns an http.Handler which can serve the contents of a given - // Layer if possible, or nil and an error when unsupported. This may - // directly serve the contents of the layer or issue a redirect to another - // URL hosting the content. - Resolve(layer distribution.Layer) (http.Handler, error) -} - -// LayerHandlerInitFunc is the type of a LayerHandler factory function and is -// used to register the contsructor for different LayerHandler backends. -type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) - -var layerHandlers map[string]LayerHandlerInitFunc - -// RegisterLayerHandler is used to register an LayerHandlerInitFunc for -// a LayerHandler backend with the given name. -func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error { - if layerHandlers == nil { - layerHandlers = make(map[string]LayerHandlerInitFunc) - } - if _, exists := layerHandlers[name]; exists { - return fmt.Errorf("name already registered: %s", name) - } - - layerHandlers[name] = initFunc - - return nil -} - -// GetLayerHandler constructs a LayerHandler -// with the given options using the named backend. -func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) { - if layerHandlers != nil { - if initFunc, exists := layerHandlers[name]; exists { - return initFunc(storageDriver, options) - } - } - - return nil, fmt.Errorf("no layer handler registered with name: %s", name) -} diff --git a/registry/storage/layerreader.go b/registry/storage/layerreader.go index 1de98e50..1129eb9e 100644 --- a/registry/storage/layerreader.go +++ b/registry/storage/layerreader.go @@ -1,13 +1,14 @@ package storage import ( + "net/http" "time" "github.com/docker/distribution" "github.com/docker/distribution/digest" ) -// layerReadSeeker implements Layer and provides facilities for reading and +// layerReader implements Layer and provides facilities for reading and // seeking. type layerReader struct { fileReader @@ -17,19 +18,28 @@ type layerReader struct { var _ distribution.Layer = &layerReader{} -func (lrs *layerReader) Digest() digest.Digest { - return lrs.digest +func (lr *layerReader) Digest() digest.Digest { + return lr.digest } -func (lrs *layerReader) Length() int64 { - return lrs.size +func (lr *layerReader) Length() int64 { + return lr.size } -func (lrs *layerReader) CreatedAt() time.Time { - return lrs.modtime +func (lr *layerReader) CreatedAt() time.Time { + return lr.modtime } // Close the layer. Should be called when the resource is no longer needed. -func (lrs *layerReader) Close() error { - return lrs.closeWithErr(distribution.ErrLayerClosed) +func (lr *layerReader) Close() error { + return lr.closeWithErr(distribution.ErrLayerClosed) +} + +func (lr *layerReader) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Docker-Content-Digest", lr.digest.String()) + + if url, err := lr.fileReader.driver.URLFor(lr.path, map[string]interface{}{}); err == nil { + http.Redirect(w, r, url, http.StatusTemporaryRedirect) + } + http.ServeContent(w, r, lr.digest.String(), lr.CreatedAt(), lr) } diff --git a/registry/storage/layerstore.go b/registry/storage/layerstore.go index f546529e..05881749 100644 --- a/registry/storage/layerstore.go +++ b/registry/storage/layerstore.go @@ -138,7 +138,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di return nil, err } - return &layerUploadController{ + return &layerWriter{ layerStore: ls, uuid: uuid, startedAt: startedAt, diff --git a/registry/storage/layerupload.go b/registry/storage/layerwriter.go similarity index 69% rename from registry/storage/layerupload.go rename to registry/storage/layerwriter.go index fdb00e93..27bbade1 100644 --- a/registry/storage/layerupload.go +++ b/registry/storage/layerwriter.go @@ -13,9 +13,11 @@ import ( storagedriver "github.com/docker/distribution/registry/storage/driver" ) -// layerUploadController is used to control the various aspects of resumable +var _ distribution.LayerUpload = &layerWriter{} + +// layerWriter is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. -type layerUploadController struct { +type layerWriter struct { layerStore *layerStore uuid string @@ -26,65 +28,64 @@ type layerUploadController struct { bufferedFileWriter } -var _ distribution.LayerUpload = &layerUploadController{} +var _ distribution.LayerUpload = &layerWriter{} // UUID returns the identifier for this upload. -func (luc *layerUploadController) UUID() string { - return luc.uuid +func (lw *layerWriter) UUID() string { + return lw.uuid } -func (luc *layerUploadController) StartedAt() time.Time { - return luc.startedAt +func (lw *layerWriter) StartedAt() time.Time { + return lw.startedAt } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. -func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) { - ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") +func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) { + ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") - err := luc.bufferedFileWriter.Close() + if err := lw.bufferedFileWriter.Close(); err != nil { + return nil, err + } + + canonical, err := lw.validateLayer(digest) if err != nil { return nil, err } - canonical, err := luc.validateLayer(digest) - if err != nil { - return nil, err - } - - if err := luc.moveLayer(canonical); err != nil { + if err := lw.moveLayer(canonical); err != nil { // TODO(stevvooe): Cleanup? return nil, err } // Link the layer blob into the repository. - if err := luc.linkLayer(canonical, digest); err != nil { + if err := lw.linkLayer(canonical, digest); err != nil { return nil, err } - if err := luc.removeResources(); err != nil { + if err := lw.removeResources(); err != nil { return nil, err } - return luc.layerStore.Fetch(canonical) + return lw.layerStore.Fetch(canonical) } // Cancel the layer upload process. -func (luc *layerUploadController) Cancel() error { - ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel") - if err := luc.removeResources(); err != nil { +func (lw *layerWriter) Cancel() error { + ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel") + if err := lw.removeResources(); err != nil { return err } - luc.Close() + lw.Close() return nil } // validateLayer checks the layer data against the digest, returning an error // if it does not match. The canonical digest is returned. -func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { +func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) { digestVerifier, err := digest.NewDigestVerifier(dgst) if err != nil { return "", err @@ -96,7 +97,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige // then only have to fetch the difference. // Read the file from the backend driver and validate it. - fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path) + fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path) if err != nil { return "", err } @@ -125,8 +126,8 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige // moveLayer moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the // move. -func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { - blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{ +func (lw *layerWriter) moveLayer(dgst digest.Digest) error { + blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{ digest: dgst, }) @@ -135,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { } // Check for existence - if _, err := luc.driver.Stat(blobPath); err != nil { + if _, err := lw.driver.Stat(blobPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. @@ -154,7 +155,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { // the size here and write a zero-length file to blobPath if this is the // case. For the most part, this should only ever happen with zero-length // tars. - if _, err := luc.driver.Stat(luc.path); err != nil { + if _, err := lw.driver.Stat(lw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // HACK(stevvooe): This is slightly dangerous: if we verify above, @@ -163,24 +164,24 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { // prevent this horrid thing, we employ the hack of only allowing // to this happen for the zero tarsum. if dgst == digest.DigestSha256EmptyTar { - return luc.driver.PutContent(blobPath, []byte{}) + return lw.driver.PutContent(blobPath, []byte{}) } // We let this fail during the move below. logrus. - WithField("upload.uuid", luc.UUID()). + WithField("upload.uuid", lw.UUID()). WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest") default: return err // unrelated error } } - return luc.driver.Move(luc.path, blobPath) + return lw.driver.Move(lw.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the // named repository for the upload controller. -func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { +func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { dgsts := append([]digest.Digest{canonical}, aliases...) // Don't make duplicate links. @@ -192,8 +193,8 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... } seenDigests[dgst] = struct{}{} - layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{ - name: luc.layerStore.repository.Name(), + layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{ + name: lw.layerStore.repository.Name(), digest: dgst, }) @@ -201,7 +202,7 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... return err } - if err := luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { + if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { return err } } @@ -212,10 +213,10 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... // removeResources should clean up all resources associated with the upload // instance. An error will be returned if the clean up cannot proceed. If the // resources are already not present, no error will be returned. -func (luc *layerUploadController) removeResources() error { - dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{ - name: luc.layerStore.repository.Name(), - uuid: luc.uuid, +func (lw *layerWriter) removeResources() error { + dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{ + name: lw.layerStore.repository.Name(), + uuid: lw.uuid, }) if err != nil { @@ -226,7 +227,7 @@ func (luc *layerUploadController) removeResources() error { // upload related files. dirPath := path.Dir(dataPath) - if err := luc.driver.Delete(dirPath); err != nil { + if err := lw.driver.Delete(dirPath); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: break // already gone!