diff --git a/cmd/registry/config.yml b/cmd/registry/config.yml index 4919e6db..bb3ade11 100644 --- a/cmd/registry/config.yml +++ b/cmd/registry/config.yml @@ -8,3 +8,19 @@ http: secret: asecretforlocaldevelopment debug: addr: localhost:5001 +notifications: + endpoints: + - name: local-8082 + url: http://localhost:5003/callback + headers: + Authorization: [Bearer ] + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true + - name: local-8083 + url: http://localhost:8083/callback + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true diff --git a/configuration/configuration.go b/configuration/configuration.go index 0cf5bc02..ed086ba5 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -4,8 +4,10 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "reflect" "strings" + "time" ) // Configuration is a versioned registry configuration, intended to be provided by a yaml file, and @@ -63,6 +65,10 @@ type Configuration struct { Addr string `yaml:"addr"` } `yaml:"debug"` } `yaml:"http"` + + // Notifications specifies configuration about various endpoint to which + // registry events are dispatched. + Notifications Notifications `yaml:"notifications"` } // v0_1Configuration is a Version 0.1 Configuration struct @@ -240,6 +246,26 @@ func (auth Auth) MarshalYAML() (interface{}, error) { return map[string]Parameters(auth), nil } +// Notifications configures multiple http endpoints. +type Notifications struct { + // Endpoints is a list of http configurations for endpoints that + // respond to webhook notifications. In the future, we may allow other + // kinds of endpoints, such as external queues. + Endpoints []Endpoint `yaml:"endpoints"` +} + +// Endpoint describes the configuration of an http webhook notification +// endpoint. +type Endpoint struct { + Name string `yaml:"name"` // identifies the endpoint in the registry instance. + Disabled bool `yaml:"disabled"` // disables the endpoint + URL string `yaml:"url"` // post url for the endpoint. + Headers http.Header `yaml:"headers"` // static headers that should be added to all requests + Timeout time.Duration `yaml:"timeout"` // HTTP timeout + Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure + Backoff time.Duration `yaml:"backoff"` // backoff duration +} + // Reporting defines error reporting methods. type Reporting struct { // Bugsnag configures error reporting for Bugsnag (bugsnag.com). diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index ad845a41..5a6abf90 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -2,6 +2,7 @@ package configuration import ( "bytes" + "net/http" "os" "testing" @@ -40,6 +41,17 @@ var configStruct = Configuration{ APIKey: "BugsnagApiKey", }, }, + Notifications: Notifications{ + Endpoints: []Endpoint{ + { + Name: "endpoint-1", + URL: "http://example.com", + Headers: http.Header{ + "Authorization": []string{"Bearer "}, + }, + }, + }, + }, } // configYamlV0_1 is a Version 0.1 yaml document representing configStruct @@ -61,6 +73,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] reporting: bugsnag: apikey: BugsnagApiKey @@ -76,6 +94,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] ` type ConfigSuite struct { @@ -129,6 +153,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *C) { suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}} suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}} suite.expectedConfig.Reporting = Reporting{} + suite.expectedConfig.Notifications = Notifications{} os.Setenv("REGISTRY_STORAGE", "filesystem") os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot") @@ -292,5 +317,10 @@ func copyConfig(config Configuration) *Configuration { configCopy.Auth.setParameter(k, v) } + configCopy.Notifications = Notifications{Endpoints: []Endpoint{}} + for _, v := range config.Notifications.Endpoints { + configCopy.Notifications.Endpoints = append(configCopy.Notifications.Endpoints, v) + } + return configCopy } diff --git a/registry/app.go b/registry/app.go index b5cb6776..e7c96b74 100644 --- a/registry/app.go +++ b/registry/app.go @@ -2,16 +2,19 @@ package registry import ( "fmt" + "net" "net/http" + "os" + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" "github.com/docker/distribution/auth" "github.com/docker/distribution/configuration" "github.com/docker/distribution/storage" + "github.com/docker/distribution/storage/notifications" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" - - log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" ) @@ -21,17 +24,22 @@ import ( type App struct { Config configuration.Configuration - router *mux.Router + // InstanceID is a unique id assigned to the application on each creation. + // Provides information in the logs and context to identify restarts. + InstanceID string - // driver maintains the app global storage driver instance. - driver storagedriver.StorageDriver + router *mux.Router // main application router, configured with dispatchers + driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. + registry storage.Registry // registry is the primary registry backend for the app instance. + accessController auth.AccessController // main access controller for application - // registry is the primary registry backend for the app instance. - registry storage.Registry + // events contains notification related configuration. + events struct { + sink notifications.Sink + source notifications.SourceRecord + } - layerHandler storage.LayerHandler - - accessController auth.AccessController + layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider } // NewApp takes a configuration and returns a configured app, ready to serve @@ -39,8 +47,9 @@ type App struct { // handlers accordingly. func NewApp(configuration configuration.Configuration) *App { app := &App{ - Config: configuration, - router: v2.Router(), + Config: configuration, + InstanceID: uuid.New(), + router: v2.Router(), } // Register the handler dispatchers. @@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App { app.register(v2.RouteNameBlobUpload, layerUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher) - driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + var err error + app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) if err != nil { // TODO(stevvooe): Move the creation of a service into a protected @@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App { panic(err) } - app.driver = driver + app.configureEvents(&configuration) app.registry = storage.NewRegistryWithDriver(app.driver) authType := configuration.Auth.Type() @@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App { layerHandlerType := configuration.LayerHandler.Type() if layerHandlerType != "" { - lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver) + lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) if err != nil { panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) } @@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App { return app } -func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Set a header with the Docker Distribution API Version for all responses. - w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") - app.router.ServeHTTP(w, r) -} - // register a handler with the application, by route name. The handler will be // passed through the application filters and context will be constructed at // request time. @@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) { app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch)) } +// configureEvents prepares the event sink for action. +func (app *App) configureEvents(configuration *configuration.Configuration) { + // Configure all of the endpoint sinks. + var sinks []notifications.Sink + for _, endpoint := range configuration.Notifications.Endpoints { + if endpoint.Disabled { + log.Infof("endpoint %s disabled, skipping", endpoint.Name) + continue + } + + log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) + endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + }) + + sinks = append(sinks, endpoint) + } + + // NOTE(stevvooe): Moving to a new queueing implementation is as easy as + // replacing broadcaster with a rabbitmq implementation. It's recommended + // that the registry instances also act as the workers to keep deployment + // simple. + app.events.sink = notifications.NewBroadcaster(sinks...) + + // Populate registry event source + hostname, err := os.Hostname() + if err != nil { + hostname = configuration.HTTP.Addr + } else { + // try to pick the port off the config + _, port, err := net.SplitHostPort(configuration.HTTP.Addr) + if err == nil { + hostname = net.JoinHostPort(hostname, port) + } + } + + app.events.source = notifications.SourceRecord{ + Addr: hostname, + InstanceID: app.InstanceID, + } +} + +func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() // ensure that request body is always closed. + + // Set a header with the Docker Distribution API Version for all responses. + w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") + app.router.ServeHTTP(w, r) +} + // dispatchFunc takes a context and request and returns a constructed handler // for the route. The dispatcher will use this to dynamically create request // specific handlers for each endpoint without creating a new router for each @@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { return } + // decorate the authorized repository with an event bridge. + context.Repository = notifications.Listen( + context.Repository, app.eventBridge(context, r)) + context.log = log.WithField("name", context.Repository.Name()) handler := dispatch(context, r) ssrw := &singleStatusResponseWriter{ResponseWriter: w} - context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their @@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context { vars := mux.Vars(r) context := &Context{ App: app, + RequestID: uuid.New(), urlBuilder: v2.NewURLBuilderFromRequest(r), } @@ -268,6 +329,22 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont return nil } +// eventBridge returns a bridge for the current request, configured with the +// correct actor and source. +func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { + // TODO(stevvooe): Need to extract user data from request context using + // auth system. Would prefer to do this during logging refactor and + // addition of user and google context type. + actor := notifications.ActorRecord{ + Name: "--todo--", + Addr: r.RemoteAddr, + Host: r.Host, + RequestID: ctx.RequestID, + } + + return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink) +} + // apiBase implements a simple yes-man for doing overall checks against the // api. This can support auth roundtrips to support docker login. func apiBase(w http.ResponseWriter, r *http.Request) { diff --git a/registry/context.go b/registry/context.go index 8e8d0fed..eaa603a8 100644 --- a/registry/context.go +++ b/registry/context.go @@ -13,6 +13,9 @@ type Context struct { // App points to the application structure that created this context. *App + // RequestID is the unique id of the request. + RequestID string + // Repository is the repository for the current request. All requests // should be scoped to a single repository. This field may be nil. Repository storage.Repository diff --git a/registry/util.go b/registry/util.go deleted file mode 100644 index 976ddf31..00000000 --- a/registry/util.go +++ /dev/null @@ -1,27 +0,0 @@ -package registry - -import ( - "net/http" - "reflect" - "runtime" - - "github.com/gorilla/handlers" -) - -// functionName returns the name of the function fn. -func functionName(fn interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() -} - -// resolveHandlerName attempts to resolve a nice, pretty name for the passed -// in handler. -func resolveHandlerName(method string, handler http.Handler) string { - switch v := handler.(type) { - case handlers.MethodHandler: - return functionName(v[method]) - case http.HandlerFunc: - return functionName(v) - default: - return functionName(handler.ServeHTTP) - } -} diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go index 2ff0dff6..28326cce 100644 --- a/storage/notifications/bridge.go +++ b/storage/notifications/bridge.go @@ -72,7 +72,7 @@ func (b *bridge) createManifestEventAndWrite(action string, repo storage.Reposit func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { event := b.createEvent(action) - event.Target.Type = "manifest" + event.Target.Type = EventTargetTypeManifest event.Target.Name = repo.Name() event.Target.Tag = sm.Tag @@ -107,7 +107,7 @@ func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { event := b.createEvent(action) - event.Target.Type = "layer" + event.Target.Type = EventTargetTypeBlob event.Target.Name = repo.Name() event.Target.Digest = dgst diff --git a/storage/notifications/event.go b/storage/notifications/event.go index f03920ca..fb2094d7 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -12,7 +12,12 @@ const ( EventActionPull = "pull" EventActionPush = "push" EventActionDelete = "delete" - EventActionPing = "ping" +) + +// EventTargetType constants used in Target section of Event. +const ( + EventTargetTypeManifest = "manifest" + EventTargetTypeBlob = "blob" ) // EventsMediaType is the mediatype for the json event envelope. If the Event, diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 77bd19f9..7bb9fa01 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -97,7 +97,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { manifestPush = prototype manifestPush.ID = "asdf-asdf-asdf-asdf-0" manifestPush.Target.Digest = "sha256:0123456789abcdef0" - manifestPush.Target.Type = "manifest" + manifestPush.Target.Type = EventTargetTypeManifest manifestPush.Target.Name = "library/test" manifestPush.Target.Tag = "latest" manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" @@ -106,7 +106,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { layerPush0 = prototype layerPush0.ID = "asdf-asdf-asdf-asdf-1" layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" - layerPush0.Target.Type = "blob" + layerPush0.Target.Type = EventTargetTypeBlob layerPush0.Target.Name = "library/test" layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" @@ -114,7 +114,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { layerPush1 = prototype layerPush1.ID = "asdf-asdf-asdf-asdf-2" layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" - layerPush1.Target.Type = "blob" + layerPush1.Target.Type = EventTargetTypeBlob layerPush1.Target.Name = "library/test" layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"