Webhook notification support in registry webapp

Endpoints are now created at applications startup time, using notification
configuration. The instances are then added to a Broadcaster instance, which
becomes the main event sink for the application. At request time, an event
bridge is configured to listen to repository method calls. The actor and source
of the eventBridge are created from the requeest context and application,
respectively. The result is notifications are dispatched with calls to the
context's Repository instance and are queued to each endpoint via the
broadcaster.

This commit also adds the concept of a RequestID and App.InstanceID. The
request id uniquely identifies each request and the InstanceID uniquely
identifies a run of the registry. These identifiers can be used in the future
to correlate log messages with generated events to support rich debugging.

The fields of the app were slightly reorganized for clarity and a few horrid
util functions have been removed.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-01-28 15:55:18 -08:00
parent 499382dd0b
commit 0a29b59e14
9 changed files with 185 additions and 55 deletions

View file

@ -8,3 +8,19 @@ http:
secret: asecretforlocaldevelopment secret: asecretforlocaldevelopment
debug: debug:
addr: localhost:5001 addr: localhost:5001
notifications:
endpoints:
- name: local-8082
url: http://localhost:5003/callback
headers:
Authorization: [Bearer <an example token>]
timeout: 1s
threshold: 10
backoff: 1s
disabled: true
- name: local-8083
url: http://localhost:8083/callback
timeout: 1s
threshold: 10
backoff: 1s
disabled: true

View file

@ -4,8 +4,10 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http"
"reflect" "reflect"
"strings" "strings"
"time"
) )
// Configuration is a versioned registry configuration, intended to be provided by a yaml file, and // Configuration is a versioned registry configuration, intended to be provided by a yaml file, and
@ -63,6 +65,10 @@ type Configuration struct {
Addr string `yaml:"addr"` Addr string `yaml:"addr"`
} `yaml:"debug"` } `yaml:"debug"`
} `yaml:"http"` } `yaml:"http"`
// Notifications specifies configuration about various endpoint to which
// registry events are dispatched.
Notifications Notifications `yaml:"notifications"`
} }
// v0_1Configuration is a Version 0.1 Configuration struct // v0_1Configuration is a Version 0.1 Configuration struct
@ -240,6 +246,26 @@ func (auth Auth) MarshalYAML() (interface{}, error) {
return map[string]Parameters(auth), nil return map[string]Parameters(auth), nil
} }
// Notifications configures multiple http endpoints.
type Notifications struct {
// Endpoints is a list of http configurations for endpoints that
// respond to webhook notifications. In the future, we may allow other
// kinds of endpoints, such as external queues.
Endpoints []Endpoint `yaml:"endpoints"`
}
// Endpoint describes the configuration of an http webhook notification
// endpoint.
type Endpoint struct {
Name string `yaml:"name"` // identifies the endpoint in the registry instance.
Disabled bool `yaml:"disabled"` // disables the endpoint
URL string `yaml:"url"` // post url for the endpoint.
Headers http.Header `yaml:"headers"` // static headers that should be added to all requests
Timeout time.Duration `yaml:"timeout"` // HTTP timeout
Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure
Backoff time.Duration `yaml:"backoff"` // backoff duration
}
// Reporting defines error reporting methods. // Reporting defines error reporting methods.
type Reporting struct { type Reporting struct {
// Bugsnag configures error reporting for Bugsnag (bugsnag.com). // Bugsnag configures error reporting for Bugsnag (bugsnag.com).

View file

@ -2,6 +2,7 @@ package configuration
import ( import (
"bytes" "bytes"
"net/http"
"os" "os"
"testing" "testing"
@ -40,6 +41,17 @@ var configStruct = Configuration{
APIKey: "BugsnagApiKey", APIKey: "BugsnagApiKey",
}, },
}, },
Notifications: Notifications{
Endpoints: []Endpoint{
{
Name: "endpoint-1",
URL: "http://example.com",
Headers: http.Header{
"Authorization": []string{"Bearer <example>"},
},
},
},
},
} }
// configYamlV0_1 is a Version 0.1 yaml document representing configStruct // configYamlV0_1 is a Version 0.1 yaml document representing configStruct
@ -61,6 +73,12 @@ auth:
silly: silly:
realm: silly realm: silly
service: silly service: silly
notifications:
endpoints:
- name: endpoint-1
url: http://example.com
headers:
Authorization: [Bearer <example>]
reporting: reporting:
bugsnag: bugsnag:
apikey: BugsnagApiKey apikey: BugsnagApiKey
@ -76,6 +94,12 @@ auth:
silly: silly:
realm: silly realm: silly
service: silly service: silly
notifications:
endpoints:
- name: endpoint-1
url: http://example.com
headers:
Authorization: [Bearer <example>]
` `
type ConfigSuite struct { type ConfigSuite struct {
@ -129,6 +153,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *C) {
suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}} suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}}
suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}} suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}}
suite.expectedConfig.Reporting = Reporting{} suite.expectedConfig.Reporting = Reporting{}
suite.expectedConfig.Notifications = Notifications{}
os.Setenv("REGISTRY_STORAGE", "filesystem") os.Setenv("REGISTRY_STORAGE", "filesystem")
os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot") os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot")
@ -292,5 +317,10 @@ func copyConfig(config Configuration) *Configuration {
configCopy.Auth.setParameter(k, v) configCopy.Auth.setParameter(k, v)
} }
configCopy.Notifications = Notifications{Endpoints: []Endpoint{}}
for _, v := range config.Notifications.Endpoints {
configCopy.Notifications.Endpoints = append(configCopy.Notifications.Endpoints, v)
}
return configCopy return configCopy
} }

View file

@ -2,16 +2,19 @@ package registry
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"os"
"code.google.com/p/go-uuid/uuid"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/api/v2" "github.com/docker/distribution/api/v2"
"github.com/docker/distribution/auth" "github.com/docker/distribution/auth"
"github.com/docker/distribution/configuration" "github.com/docker/distribution/configuration"
"github.com/docker/distribution/storage" "github.com/docker/distribution/storage"
"github.com/docker/distribution/storage/notifications"
"github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver"
"github.com/docker/distribution/storagedriver/factory" "github.com/docker/distribution/storagedriver/factory"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -21,17 +24,22 @@ import (
type App struct { type App struct {
Config configuration.Configuration Config configuration.Configuration
router *mux.Router // InstanceID is a unique id assigned to the application on each creation.
// Provides information in the logs and context to identify restarts.
InstanceID string
// driver maintains the app global storage driver instance. router *mux.Router // main application router, configured with dispatchers
driver storagedriver.StorageDriver driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
registry storage.Registry // registry is the primary registry backend for the app instance.
accessController auth.AccessController // main access controller for application
// registry is the primary registry backend for the app instance. // events contains notification related configuration.
registry storage.Registry events struct {
sink notifications.Sink
source notifications.SourceRecord
}
layerHandler storage.LayerHandler layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
accessController auth.AccessController
} }
// NewApp takes a configuration and returns a configured app, ready to serve // NewApp takes a configuration and returns a configured app, ready to serve
@ -40,6 +48,7 @@ type App struct {
func NewApp(configuration configuration.Configuration) *App { func NewApp(configuration configuration.Configuration) *App {
app := &App{ app := &App{
Config: configuration, Config: configuration,
InstanceID: uuid.New(),
router: v2.Router(), router: v2.Router(),
} }
@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App {
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher) app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) var err error
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
if err != nil { if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected // TODO(stevvooe): Move the creation of a service into a protected
@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App {
panic(err) panic(err)
} }
app.driver = driver app.configureEvents(&configuration)
app.registry = storage.NewRegistryWithDriver(app.driver) app.registry = storage.NewRegistryWithDriver(app.driver)
authType := configuration.Auth.Type() authType := configuration.Auth.Type()
@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App {
layerHandlerType := configuration.LayerHandler.Type() layerHandlerType := configuration.LayerHandler.Type()
if layerHandlerType != "" { if layerHandlerType != "" {
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver) lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver)
if err != nil { if err != nil {
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
} }
@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App {
return app return app
} }
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set a header with the Docker Distribution API Version for all responses.
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
app.router.ServeHTTP(w, r)
}
// register a handler with the application, by route name. The handler will be // register a handler with the application, by route name. The handler will be
// passed through the application filters and context will be constructed at // passed through the application filters and context will be constructed at
// request time. // request time.
@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) {
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch)) app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
} }
// configureEvents prepares the event sink for action.
func (app *App) configureEvents(configuration *configuration.Configuration) {
// Configure all of the endpoint sinks.
var sinks []notifications.Sink
for _, endpoint := range configuration.Notifications.Endpoints {
if endpoint.Disabled {
log.Infof("endpoint %s disabled, skipping", endpoint.Name)
continue
}
log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
Timeout: endpoint.Timeout,
Threshold: endpoint.Threshold,
Backoff: endpoint.Backoff,
Headers: endpoint.Headers,
})
sinks = append(sinks, endpoint)
}
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
// replacing broadcaster with a rabbitmq implementation. It's recommended
// that the registry instances also act as the workers to keep deployment
// simple.
app.events.sink = notifications.NewBroadcaster(sinks...)
// Populate registry event source
hostname, err := os.Hostname()
if err != nil {
hostname = configuration.HTTP.Addr
} else {
// try to pick the port off the config
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
if err == nil {
hostname = net.JoinHostPort(hostname, port)
}
}
app.events.source = notifications.SourceRecord{
Addr: hostname,
InstanceID: app.InstanceID,
}
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() // ensure that request body is always closed.
// Set a header with the Docker Distribution API Version for all responses.
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
app.router.ServeHTTP(w, r)
}
// dispatchFunc takes a context and request and returns a constructed handler // dispatchFunc takes a context and request and returns a constructed handler
// for the route. The dispatcher will use this to dynamically create request // for the route. The dispatcher will use this to dynamically create request
// specific handlers for each endpoint without creating a new router for each // specific handlers for each endpoint without creating a new router for each
@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
return return
} }
// decorate the authorized repository with an event bridge.
context.Repository = notifications.Listen(
context.Repository, app.eventBridge(context, r))
context.log = log.WithField("name", context.Repository.Name()) context.log = log.WithField("name", context.Repository.Name())
handler := dispatch(context, r) handler := dispatch(context, r)
ssrw := &singleStatusResponseWriter{ResponseWriter: w} ssrw := &singleStatusResponseWriter{ResponseWriter: w}
context.log.Infoln("handler", resolveHandlerName(r.Method, handler))
handler.ServeHTTP(ssrw, r) handler.ServeHTTP(ssrw, r)
// Automated error response handling here. Handlers may return their // Automated error response handling here. Handlers may return their
@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context {
vars := mux.Vars(r) vars := mux.Vars(r)
context := &Context{ context := &Context{
App: app, App: app,
RequestID: uuid.New(),
urlBuilder: v2.NewURLBuilderFromRequest(r), urlBuilder: v2.NewURLBuilderFromRequest(r),
} }
@ -268,6 +329,22 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
return nil return nil
} }
// eventBridge returns a bridge for the current request, configured with the
// correct actor and source.
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
// TODO(stevvooe): Need to extract user data from request context using
// auth system. Would prefer to do this during logging refactor and
// addition of user and google context type.
actor := notifications.ActorRecord{
Name: "--todo--",
Addr: r.RemoteAddr,
Host: r.Host,
RequestID: ctx.RequestID,
}
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink)
}
// apiBase implements a simple yes-man for doing overall checks against the // apiBase implements a simple yes-man for doing overall checks against the
// api. This can support auth roundtrips to support docker login. // api. This can support auth roundtrips to support docker login.
func apiBase(w http.ResponseWriter, r *http.Request) { func apiBase(w http.ResponseWriter, r *http.Request) {

View file

@ -13,6 +13,9 @@ type Context struct {
// App points to the application structure that created this context. // App points to the application structure that created this context.
*App *App
// RequestID is the unique id of the request.
RequestID string
// Repository is the repository for the current request. All requests // Repository is the repository for the current request. All requests
// should be scoped to a single repository. This field may be nil. // should be scoped to a single repository. This field may be nil.
Repository storage.Repository Repository storage.Repository

View file

@ -1,27 +0,0 @@
package registry
import (
"net/http"
"reflect"
"runtime"
"github.com/gorilla/handlers"
)
// functionName returns the name of the function fn.
func functionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
// resolveHandlerName attempts to resolve a nice, pretty name for the passed
// in handler.
func resolveHandlerName(method string, handler http.Handler) string {
switch v := handler.(type) {
case handlers.MethodHandler:
return functionName(v[method])
case http.HandlerFunc:
return functionName(v)
default:
return functionName(handler.ServeHTTP)
}
}

View file

@ -72,7 +72,7 @@ func (b *bridge) createManifestEventAndWrite(action string, repo storage.Reposit
func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.Type = "manifest" event.Target.Type = EventTargetTypeManifest
event.Target.Name = repo.Name() event.Target.Name = repo.Name()
event.Target.Tag = sm.Tag event.Target.Tag = sm.Tag
@ -107,7 +107,7 @@ func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository
func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.Type = "layer" event.Target.Type = EventTargetTypeBlob
event.Target.Name = repo.Name() event.Target.Name = repo.Name()
event.Target.Digest = dgst event.Target.Digest = dgst

View file

@ -12,7 +12,12 @@ const (
EventActionPull = "pull" EventActionPull = "pull"
EventActionPush = "push" EventActionPush = "push"
EventActionDelete = "delete" EventActionDelete = "delete"
EventActionPing = "ping" )
// EventTargetType constants used in Target section of Event.
const (
EventTargetTypeManifest = "manifest"
EventTargetTypeBlob = "blob"
) )
// EventsMediaType is the mediatype for the json event envelope. If the Event, // EventsMediaType is the mediatype for the json event envelope. If the Event,

View file

@ -97,7 +97,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) {
manifestPush = prototype manifestPush = prototype
manifestPush.ID = "asdf-asdf-asdf-asdf-0" manifestPush.ID = "asdf-asdf-asdf-asdf-0"
manifestPush.Target.Digest = "sha256:0123456789abcdef0" manifestPush.Target.Digest = "sha256:0123456789abcdef0"
manifestPush.Target.Type = "manifest" manifestPush.Target.Type = EventTargetTypeManifest
manifestPush.Target.Name = "library/test" manifestPush.Target.Name = "library/test"
manifestPush.Target.Tag = "latest" manifestPush.Target.Tag = "latest"
manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"
@ -106,7 +106,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) {
layerPush0 = prototype layerPush0 = prototype
layerPush0.ID = "asdf-asdf-asdf-asdf-1" layerPush0.ID = "asdf-asdf-asdf-asdf-1"
layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1"
layerPush0.Target.Type = "blob" layerPush0.Target.Type = EventTargetTypeBlob
layerPush0.Target.Name = "library/test" layerPush0.Target.Name = "library/test"
layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"
@ -114,7 +114,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) {
layerPush1 = prototype layerPush1 = prototype
layerPush1.ID = "asdf-asdf-asdf-asdf-2" layerPush1.ID = "asdf-asdf-asdf-asdf-2"
layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2"
layerPush1.Target.Type = "blob" layerPush1.Target.Type = EventTargetTypeBlob
layerPush1.Target.Name = "library/test" layerPush1.Target.Name = "library/test"
layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest" layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"