registry/storage/notifications/bridge.go

140 lines
3.7 KiB
Go
Raw Normal View History

Implement notification endpoint webhook dispatch This changeset implements webhook notification endpoints for dispatching registry events. Repository instances can be decorated by a listener that converts calls into context-aware events, using a bridge. Events generated in the bridge are written to a sink. Implementations of sink include a broadcast and endpoint sink which can be used to configure event dispatch. Endpoints represent a webhook notification target, with queueing and retries built in. They can be added to a Broadcaster, which is a simple sink that writes a block of events to several sinks, to provide a complete dispatch mechanism. The main caveat to the current approach is that all unsent notifications are inmemory. Best effort is made to ensure that notifications are not dropped, to the point where queues may back up on faulty endpoints. If the endpoint is fixed, the events will be retried and all messages will go through. Internally, this functionality is all made up of Sink objects. The queuing functionality is implemented with an eventQueue sink and retries are implemented with retryingSink. Replacing the inmemory queuing with something persistent should be as simple as replacing broadcaster with a remote queue and that sets up the sinks to be local workers listening to that remote queue. Metrics are kept for each endpoint and exported via expvar. This may not be a permanent appraoch but should provide enough information for troubleshooting notification problems. Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-01-28 07:27:46 +00:00
package notifications
import (
"time"
"github.com/docker/distribution/manifest"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storage"
)
type bridge struct {
ub URLBuilder
actor ActorRecord
source SourceRecord
sink Sink
}
var _ Listener = &bridge{}
// URLBuilder defines a subset of url builder to be used by the event listener.
type URLBuilder interface {
BuildManifestURL(name, tag string) (string, error)
BuildBlobURL(name string, dgst digest.Digest) (string, error)
}
// NewBridge returns a notification listener that writes records to sink,
// using the actor and source. Any urls populated in the events created by
// this bridge will be created using the URLBuilder.
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, sink Sink) Listener {
return &bridge{
ub: ub,
actor: actor,
source: source,
sink: sink,
}
}
func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPush, repo, sm)
}
func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
}
func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
}
func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest())
}
func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest())
}
func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest())
}
func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error {
event, err := b.createManifestEvent(action, repo, sm)
if err != nil {
return err
}
return b.sink.Write(*event)
}
func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) {
event := b.createEvent(action)
event.Target.Type = "manifest"
event.Target.Name = repo.Name()
event.Target.Tag = sm.Tag
p, err := sm.Payload()
if err != nil {
return nil, err
}
event.Target.Digest, err = digest.FromBytes(p)
if err != nil {
return nil, err
}
// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is
// implemented, this should be replaced.
event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag)
if err != nil {
return nil, err
}
return event, nil
}
func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error {
event, err := b.createLayerEvent(action, repo, dgst)
if err != nil {
return err
}
return b.sink.Write(*event)
}
func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) {
event := b.createEvent(action)
event.Target.Type = "layer"
event.Target.Name = repo.Name()
event.Target.Digest = dgst
var err error
event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst)
if err != nil {
return nil, err
}
return event, nil
}
// createEvent creates an event with actor and source populated.
func (b *bridge) createEvent(action string) *Event {
event := createEvent(action)
event.Source = b.source
event.Actor = b.actor
return event
}
// createEvent returns a new event, timestamped, with the specified action.
func createEvent(action string) *Event {
return &Event{
ID: uuid.New(),
Timestamp: time.Now(),
Action: action,
}
}