From af7eb42793258a3772183776bc82e82be28a3844 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 27 Jan 2015 19:37:16 -0800 Subject: [PATCH 1/7] Event notification message definition This commit defines the message format used to notify external parties of activity within a registry instance. The event includes information about which action was taken on which registry object, including what user created the action and which instance generated the event. Message instances can be sent throughout an application or transmitted externally. An envelope format along with a custom media type is defined along with tests to detect changes to the wire format. Signed-off-by: Stephen J Day --- storage/notifications/event.go | 122 ++++++++++++++++++++++++++ storage/notifications/event_test.go | 131 ++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 storage/notifications/event.go create mode 100644 storage/notifications/event_test.go diff --git a/storage/notifications/event.go b/storage/notifications/event.go new file mode 100644 index 00000000..3c000dc2 --- /dev/null +++ b/storage/notifications/event.go @@ -0,0 +1,122 @@ +package notifications + +import ( + "fmt" + "time" + + "github.com/docker/distribution/digest" +) + +// EventAction constants used in action field of Event. +const ( + EventActionPull = "pull" + EventActionPush = "push" + EventActionDelete = "delete" + EventActionPing = "ping" +) + +// EventsMediaType is the mediatype for the json event envelope. If the Event, +// ActorRecord, SourceRecord or Envelope structs change, the version number +// should be incremented. +const EventsMediaType = "application/vnd.docker.distribution.events.v1+json" + +// Envelope defines the fields of a json event envelope message that can hold +// one or more events. +type Envelope struct { + // Events make up the contents of the envelope. Events present in a single + // envelope are not necessarily related. + Events []Event `json:"events,omitempty"` +} + +// TODO(stevvooe): The event type should be separate from the json format. It +// should be defined as an interface. Leaving as is for now since we don't +// need that at this time. If we make this change, the struct below would be +// called "EventRecord". + +// Event provides the fields required to describe a registry event. +type Event struct { + // ID provides a unique identifier for the event. + ID string `json:"uuid,omitempty"` + + // Timestamp is the time at which the event occurred. + Timestamp time.Time `json:"timestamp,omitempty"` + + // Action indicates what action encompasses the provided event. + Action string `json:"action,omitempty"` + + // Target uniquely describes the target of the event. + Target struct { + // Type should be "manifest" or "blob" + Type string `json:"type,omitempty"` + + // Name identifies the named repository. + Name string `json:"name,omitempty"` + + // Digest should identify the object in the repository. + Digest digest.Digest `json:"digest,omitempty"` + + // Tag is present if the operation involved a tagged manifest. + Tag string `json:"tag,omitempty"` + + // URL provides a link to the content on the relevant repository instance. + URL string `json:"url,omitempty"` + } `json:"target,omitempty"` + + // Actor specifies the agent that initiated the event. For most + // situations, this could be from the authorizaton context of the request. + Actor ActorRecord `json:"actor,omitempty"` + + // Source identifies the registry node that generated the event. Put + // differently, while the actor "initiates" the event, the source + // "generates" it. + Source SourceRecord `json:"source,omitempty"` +} + +// ActorRecord specifies the agent that initiated the event. For most +// situations, this could be from the authorizaton context of the request. +type ActorRecord struct { + // Name corresponds to the subject or username associated with the + // request context that generated the event. + Name string `json:"name,omitempty"` + + // Addr contains the ip or hostname and possibly port of the client + // connection that initiated the event. + Addr string `json:"addr,omitempty"` +} + +// SourceRecord identifies the registry node that generated the event. Put +// differently, while the actor "initiates" the event, the source "generates" +// it. +type SourceRecord struct { + // Addr contains the ip or hostname and the port of the registry node + // that generated the event. Generally, this will be resolved by + // os.Hostname() along with the running port. + Addr string `json:"addr,omitempty"` + + // Host is the dns name of the registry cluster, as configured. + Host string `json:"host,omitempty"` + + // RequestID uniquely identifies the registry request that generated the + // event. + RequestID string `json:"request_id,omitempty"` +} + +var ( + // ErrSinkClosed is returned if a write is issued to a sink that has been + // closed. If encountered, the error should be considered terminal and + // retries will not be successful. + ErrSinkClosed = fmt.Errorf("sink: closed") +) + +// Sink accepts and sends events. +type Sink interface { + // Write writes one or more events to the sink. If no error is returned, + // the caller will assume that all events have been committed and will not + // try to send them again. If an error is received, the caller may retry + // sending the event. The caller should cede the slice of memory to the + // sink and not modify it after calling this method. + Write(events ...Event) error + + // Close the sink, possibly waiting for pending events to flush. + Close() error +} diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go new file mode 100644 index 00000000..37e43c66 --- /dev/null +++ b/storage/notifications/event_test.go @@ -0,0 +1,131 @@ +package notifications + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +// TestEventJSONFormat provides silly test to detect if the event format or +// envelope has changed. If this code fails, the revision of the protocol may +// need to be incremented. +func TestEventEnvelopeJSONFormat(t *testing.T) { + var expected = strings.TrimSpace(` +{ + "events": [ + { + "uuid": "asdf-asdf-asdf-asdf-0", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "manifest", + "name": "library/test", + "digest": "sha256:0123456789abcdef0", + "tag": "latest", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "actor": { + "name": "test-actor", + "addr": "hostname.local" + }, + "source": { + "addr": "hostname.local", + "host": "registrycluster.local", + "request_id": "asdfasdf" + } + }, + { + "uuid": "asdf-asdf-asdf-asdf-1", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef1", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "actor": { + "name": "test-actor", + "addr": "hostname.local" + }, + "source": { + "addr": "hostname.local", + "host": "registrycluster.local", + "request_id": "asdfasdf" + } + }, + { + "uuid": "asdf-asdf-asdf-asdf-2", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef2", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "actor": { + "name": "test-actor", + "addr": "hostname.local" + }, + "source": { + "addr": "hostname.local", + "host": "registrycluster.local", + "request_id": "asdfasdf" + } + } + ] +} + `) + + tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5]) + if err != nil { + t.Fatalf("error creating time: %v", err) + } + + var prototype Event + prototype.Action = "push" + prototype.Timestamp = tm + prototype.Actor.Addr = "hostname.local" + prototype.Actor.Name = "test-actor" + prototype.Source.Addr = "hostname.local" + prototype.Source.Host = "registrycluster.local" + prototype.Source.RequestID = "asdfasdf" + + var manifestPush Event + manifestPush = prototype + manifestPush.ID = "asdf-asdf-asdf-asdf-0" + manifestPush.Target.Digest = "sha256:0123456789abcdef0" + manifestPush.Target.Type = "manifest" + manifestPush.Target.Name = "library/test" + manifestPush.Target.Tag = "latest" + manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush0 Event + layerPush0 = prototype + layerPush0.ID = "asdf-asdf-asdf-asdf-1" + layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" + layerPush0.Target.Type = "blob" + layerPush0.Target.Name = "library/test" + layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush1 Event + layerPush1 = prototype + layerPush1.ID = "asdf-asdf-asdf-asdf-2" + layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" + layerPush1.Target.Type = "blob" + layerPush1.Target.Name = "library/test" + layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var envelope Envelope + envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1) + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + t.Fatalf("unexpected error marshaling envelope: %v", err) + } + if string(p) != expected { + t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected) + } +} From 14fb80d6c3e3dcc80db00816cab173acd70b6ab1 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 28 Jan 2015 14:54:09 -0800 Subject: [PATCH 2/7] Add payload and signatures method to SignedManifest To provide easier access to digestible content, the paylaod has been made accessible on the signed manifest type. This hides the specifics of the interaction with libtrust with the caveat that signatures may be parsed twice. We'll have to have a future look at the interface for manifest as we may be making problematic architectural decisions. We'll visit this after the initial release. Signed-off-by: Stephen J Day --- manifest/manifest.go | 26 ++++++++++++++++++++++++++ storage/manifeststore.go | 5 +++++ storage/revisionstore.go | 9 ++------- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/manifest/manifest.go b/manifest/manifest.go index ad3ecd00..8ea4d14d 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/docker/distribution/digest" + "github.com/docker/libtrust" ) // Versioned provides a struct with just the manifest schemaVersion. Incoming @@ -62,6 +63,31 @@ func (sm *SignedManifest) UnmarshalJSON(b []byte) error { return nil } +// Payload returns the raw, signed content of the signed manifest. The +// contents can be used to calculate the content identifier. +func (sm *SignedManifest) Payload() ([]byte, error) { + jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") + if err != nil { + return nil, err + } + + // Resolve the payload in the manifest. + return jsig.Payload() +} + +// Signatures returns the signatures as provided by +// (*libtrust.JSONSignature).Signatures. The byte slices are opaque jws +// signatures. +func (sm *SignedManifest) Signatures() ([][]byte, error) { + jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") + if err != nil { + return nil, err + } + + // Resolve the payload in the manifest. + return jsig.Signatures() +} + // MarshalJSON returns the contents of raw. If Raw is nil, marshals the inner // contents. Applications requiring a marshaled signed manifest should simply // use Raw directly, since the the content produced by json.Marshal will be diff --git a/storage/manifeststore.go b/storage/manifeststore.go index bc28f3b8..f84a0208 100644 --- a/storage/manifeststore.go +++ b/storage/manifeststore.go @@ -94,6 +94,11 @@ func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) { } func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error { + // TODO(stevvooe): Add check here to see if the revision is already + // present in the repository. If it is, we should merge the signatures, do + // a shallow verify (or a full one, doesn't matter) and return an error + // indicating what happened. + // Verify the manifest. if err := ms.verifyManifest(tag, manifest); err != nil { return err diff --git a/storage/revisionstore.go b/storage/revisionstore.go index a88ca8c7..b3ecd711 100644 --- a/storage/revisionstore.go +++ b/storage/revisionstore.go @@ -79,13 +79,8 @@ func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, // put stores the manifest in the repository, if not already present. Any // updated signatures will be stored, as well. func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) { - jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") - if err != nil { - return "", err - } - // Resolve the payload in the manifest. - payload, err := jsig.Payload() + payload, err := sm.Payload() if err != nil { return "", err } @@ -103,7 +98,7 @@ func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) } // Grab each json signature and store them. - signatures, err := jsig.Signatures() + signatures, err := sm.Signatures() if err != nil { return "", err } From 9f0c8d6616a99a13bdb106c743835ea314227151 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 27 Jan 2015 23:27:46 -0800 Subject: [PATCH 3/7] Implement notification endpoint webhook dispatch This changeset implements webhook notification endpoints for dispatching registry events. Repository instances can be decorated by a listener that converts calls into context-aware events, using a bridge. Events generated in the bridge are written to a sink. Implementations of sink include a broadcast and endpoint sink which can be used to configure event dispatch. Endpoints represent a webhook notification target, with queueing and retries built in. They can be added to a Broadcaster, which is a simple sink that writes a block of events to several sinks, to provide a complete dispatch mechanism. The main caveat to the current approach is that all unsent notifications are inmemory. Best effort is made to ensure that notifications are not dropped, to the point where queues may back up on faulty endpoints. If the endpoint is fixed, the events will be retried and all messages will go through. Internally, this functionality is all made up of Sink objects. The queuing functionality is implemented with an eventQueue sink and retries are implemented with retryingSink. Replacing the inmemory queuing with something persistent should be as simple as replacing broadcaster with a remote queue and that sets up the sinks to be local workers listening to that remote queue. Metrics are kept for each endpoint and exported via expvar. This may not be a permanent appraoch but should provide enough information for troubleshooting notification problems. Signed-off-by: Stephen J Day --- storage/notifications/bridge.go | 139 ++++++++++ storage/notifications/endpoint.go | 86 +++++++ storage/notifications/event.go | 29 ++- storage/notifications/event_test.go | 38 +-- storage/notifications/http.go | 145 +++++++++++ storage/notifications/http_test.go | 155 ++++++++++++ storage/notifications/listener.go | 140 ++++++++++ storage/notifications/listener_test.go | 151 +++++++++++ storage/notifications/metrics.go | 152 +++++++++++ storage/notifications/sinks.go | 337 +++++++++++++++++++++++++ storage/notifications/sinks_test.go | 223 ++++++++++++++++ 11 files changed, 1569 insertions(+), 26 deletions(-) create mode 100644 storage/notifications/bridge.go create mode 100644 storage/notifications/endpoint.go create mode 100644 storage/notifications/http.go create mode 100644 storage/notifications/http_test.go create mode 100644 storage/notifications/listener.go create mode 100644 storage/notifications/listener_test.go create mode 100644 storage/notifications/metrics.go create mode 100644 storage/notifications/sinks.go create mode 100644 storage/notifications/sinks_test.go diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go new file mode 100644 index 00000000..2ff0dff6 --- /dev/null +++ b/storage/notifications/bridge.go @@ -0,0 +1,139 @@ +package notifications + +import ( + "time" + + "github.com/docker/distribution/manifest" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storage" +) + +type bridge struct { + ub URLBuilder + actor ActorRecord + source SourceRecord + sink Sink +} + +var _ Listener = &bridge{} + +// URLBuilder defines a subset of url builder to be used by the event listener. +type URLBuilder interface { + BuildManifestURL(name, tag string) (string, error) + BuildBlobURL(name string, dgst digest.Digest) (string, error) +} + +// NewBridge returns a notification listener that writes records to sink, +// using the actor and source. Any urls populated in the events created by +// this bridge will be created using the URLBuilder. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, sink Sink) Listener { + return &bridge{ + ub: ub, + actor: actor, + source: source, + sink: sink, + } +} + +func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPush, repo, sm) +} + +func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPull, repo, sm) +} + +func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +} + +func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) +} + +func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) +} + +func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) +} + +func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { + event, err := b.createManifestEvent(action, repo, sm) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "manifest" + event.Target.Name = repo.Name() + event.Target.Tag = sm.Tag + + p, err := sm.Payload() + if err != nil { + return nil, err + } + + event.Target.Digest, err = digest.FromBytes(p) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is + // implemented, this should be replaced. + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + if err != nil { + return nil, err + } + + return event, nil +} + +func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { + event, err := b.createLayerEvent(action, repo, dgst) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "layer" + event.Target.Name = repo.Name() + event.Target.Digest = dgst + + var err error + event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst) + if err != nil { + return nil, err + } + + return event, nil +} + +// createEvent creates an event with actor and source populated. +func (b *bridge) createEvent(action string) *Event { + event := createEvent(action) + event.Source = b.source + event.Actor = b.actor + + return event +} + +// createEvent returns a new event, timestamped, with the specified action. +func createEvent(action string) *Event { + return &Event{ + ID: uuid.New(), + Timestamp: time.Now(), + Action: action, + } +} diff --git a/storage/notifications/endpoint.go b/storage/notifications/endpoint.go new file mode 100644 index 00000000..dfdb111c --- /dev/null +++ b/storage/notifications/endpoint.go @@ -0,0 +1,86 @@ +package notifications + +import ( + "net/http" + "time" +) + +// EndpointConfig covers the optional configuration parameters for an active +// endpoint. +type EndpointConfig struct { + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration +} + +// defaults set any zero-valued fields to a reasonable default. +func (ec *EndpointConfig) defaults() { + if ec.Timeout <= 0 { + ec.Timeout = time.Second + } + + if ec.Threshold <= 0 { + ec.Threshold = 10 + } + + if ec.Backoff <= 0 { + ec.Backoff = time.Second + } +} + +// Endpoint is a reliable, queued, thread-safe sink that notify external http +// services when events are written. Writes are non-blocking and always +// succeed for callers but events may be queued internally. +type Endpoint struct { + Sink + url string + name string + + EndpointConfig + + metrics *safeMetrics +} + +// NewEndpoint returns a running endpoint, ready to receive events. +func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { + var endpoint Endpoint + endpoint.name = name + endpoint.url = url + endpoint.EndpointConfig = config + endpoint.defaults() + endpoint.metrics = newSafeMetrics() + + // Configures the inmemory queue, retry, http pipeline. + endpoint.Sink = newHTTPSink( + endpoint.url, endpoint.Timeout, endpoint.Headers, + endpoint.metrics.httpStatusListener()) + endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + + register(&endpoint) + return &endpoint +} + +// Name returns the name of the endpoint, generally used for debugging. +func (e *Endpoint) Name() string { + return e.name +} + +// URL returns the url of the endpoint. +func (e *Endpoint) URL() string { + return e.url +} + +// ReadMetrics populates em with metrics from the endpoint. +func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { + e.metrics.Lock() + defer e.metrics.Unlock() + + *em = e.metrics.EndpointMetrics + // Map still need to copied in a threadsafe manner. + em.Statuses = make(map[string]int) + for k, v := range e.metrics.Statuses { + em.Statuses[k] = v + } +} diff --git a/storage/notifications/event.go b/storage/notifications/event.go index 3c000dc2..f03920ca 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -36,7 +36,7 @@ type Envelope struct { // Event provides the fields required to describe a registry event. type Event struct { // ID provides a unique identifier for the event. - ID string `json:"uuid,omitempty"` + ID string `json:"id,omitempty"` // Timestamp is the time at which the event occurred. Timestamp time.Time `json:"timestamp,omitempty"` @@ -74,6 +74,8 @@ type Event struct { // ActorRecord specifies the agent that initiated the event. For most // situations, this could be from the authorizaton context of the request. +// Data in this record can refer to both the initiating client and the +// generating request. type ActorRecord struct { // Name corresponds to the subject or username associated with the // request context that generated the event. @@ -82,6 +84,22 @@ type ActorRecord struct { // Addr contains the ip or hostname and possibly port of the client // connection that initiated the event. Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // RequestID uniquely identifies the registry request that generated the + // event. + RequestID string `json:"requestID,omitempty"` + + // TODO(stevvooe): Look into setting a session cookie to get this + // without docker daemon. + // SessionID + + // TODO(stevvooe): Push the "Docker-Command" header to replace cookie and + // get the actual command. + // Command } // SourceRecord identifies the registry node that generated the event. Put @@ -93,12 +111,9 @@ type SourceRecord struct { // os.Hostname() along with the running port. Addr string `json:"addr,omitempty"` - // Host is the dns name of the registry cluster, as configured. - Host string `json:"host,omitempty"` - - // RequestID uniquely identifies the registry request that generated the - // event. - RequestID string `json:"request_id,omitempty"` + // InstanceID identifies a running instance of an application. Changes + // after each restart. + InstanceID string `json:"instanceID,omitempty"` } var ( diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 37e43c66..77bd19f9 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -15,7 +15,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { { "events": [ { - "uuid": "asdf-asdf-asdf-asdf-0", + "id": "asdf-asdf-asdf-asdf-0", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -27,16 +27,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-1", + "id": "asdf-asdf-asdf-asdf-1", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -47,16 +47,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-2", + "id": "asdf-asdf-asdf-asdf-2", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -67,12 +67,12 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } } ] @@ -87,11 +87,11 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { var prototype Event prototype.Action = "push" prototype.Timestamp = tm - prototype.Actor.Addr = "hostname.local" + prototype.Actor.Addr = "client.local" prototype.Actor.Name = "test-actor" - prototype.Source.Addr = "hostname.local" - prototype.Source.Host = "registrycluster.local" - prototype.Source.RequestID = "asdfasdf" + prototype.Actor.RequestID = "asdfasdf" + prototype.Actor.Host = "registrycluster.local" + prototype.Source.Addr = "hostname.local:port" var manifestPush Event manifestPush = prototype diff --git a/storage/notifications/http.go b/storage/notifications/http.go new file mode 100644 index 00000000..15b3574c --- /dev/null +++ b/storage/notifications/http.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// httpSink implements a single-flight, http notification endpoint. This is +// very lightweight in that it only makes an attempt at an http request. +// Reliability should be provided by the caller. +type httpSink struct { + url string + + mu sync.Mutex + closed bool + client *http.Client + listeners []httpStatusListener + + // TODO(stevvooe): Allow one to configure the media type accepted by this + // sink and choose the serialization based on that. +} + +// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other +// sinks for increased reliability. +func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { + return &httpSink{ + url: u, + listeners: listeners, + client: &http.Client{ + Transport: &headerRoundTripper{ + Transport: http.DefaultTransport.(*http.Transport), + headers: headers, + }, + Timeout: timeout, + }, + } +} + +// httpStatusListener is called on various outcomes of sending notifications. +type httpStatusListener interface { + success(status int, events ...Event) + failure(status int, events ...Event) + err(err error, events ...Event) +} + +// Accept makes an attempt to notify the endpoint, returning an error if it +// fails. It is the caller's responsibility to retry on error. The events are +// accepted or rejected as a group. +func (hs *httpSink) Write(events ...Event) error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return ErrSinkClosed + } + + envelope := Envelope{ + Events: events, + } + + // TODO(stevvooe): It is not ideal to keep re-encoding the request body on + // retry but we are going to do it to keep the code simple. It is likely + // we could change the event struct to manage its own buffer. + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) + } + + body := bytes.NewReader(p) + resp, err := hs.client.Post(hs.url, EventsMediaType, body) + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + + return fmt.Errorf("%v: error posting: %v", hs, err) + } + + // The notifier will treat any 2xx or 3xx response as accepted by the + // endpoint. + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 400: + for _, listener := range hs.listeners { + listener.success(resp.StatusCode, events...) + } + + // TODO(stevvooe): This is a little accepting: we may want to support + // unsupported media type responses with retries using the correct + // media type. There may also be cases that will never work. + + return nil + default: + for _, listener := range hs.listeners { + listener.failure(resp.StatusCode, events...) + } + return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) + } +} + +// Close the endpoint +func (hs *httpSink) Close() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return fmt.Errorf("httpsink: already closed") + } + + hs.closed = true + return nil +} + +func (hs *httpSink) String() string { + return fmt.Sprintf("httpSink{%s}", hs.url) +} + +type headerRoundTripper struct { + *http.Transport // must be transport to support CancelRequest + headers http.Header +} + +func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + var nreq http.Request + nreq = *req + nreq.Header = make(http.Header) + + merge := func(headers http.Header) { + for k, v := range headers { + nreq.Header[k] = append(nreq.Header[k], v...) + } + } + + merge(req.Header) + merge(hrt.headers) + + return hrt.Transport.RoundTrip(&nreq) +} diff --git a/storage/notifications/http_test.go b/storage/notifications/http_test.go new file mode 100644 index 00000000..c2cfbc02 --- /dev/null +++ b/storage/notifications/http_test.go @@ -0,0 +1,155 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "mime" + "net/http" + "net/http/httptest" + "reflect" + "strconv" + "testing" +) + +// TestHTTPSink mocks out an http endpoint and notifies it under a couple of +// conditions, ensuring correct behavior. +func TestHTTPSink(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + t.Fatalf("unexpected request method: %v", r.Method) + return + } + + // Extract the content type and make sure it matches + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) + return + } + + if mediaType != EventsMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) + return + } + + var envelope Envelope + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&envelope); err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error decoding request body: %v", err) + return + } + + // Let caller choose the status + status, err := strconv.Atoi(r.FormValue("status")) + if err != nil { + t.Logf("error parsing status: %v", err) + + // May just be empty, set status to 200 + status = http.StatusOK + } + + w.WriteHeader(status) + })) + + metrics := newSafeMetrics() + sink := newHTTPSink(server.URL, 0, nil, + &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) + + var expectedMetrics EndpointMetrics + expectedMetrics.Statuses = make(map[string]int) + + for _, tc := range []struct { + events []Event // events to send + url string + failure bool // true if there should be a failure. + statusCode int // if not set, no status code should be incremented. + }{ + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest")}, + }, + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest"), + createTestEvent("push", "library/test", "layer"), + createTestEvent("push", "library/test", "layer"), + }, + }, + { + statusCode: http.StatusTemporaryRedirect, + }, + { + statusCode: http.StatusBadRequest, + failure: true, + }, + { + // Case where connection never goes through. + url: "http://shoudlntresolve/", + failure: true, + }, + } { + + if tc.failure { + expectedMetrics.Failures += len(tc.events) + } else { + expectedMetrics.Successes += len(tc.events) + } + + if tc.statusCode > 0 { + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + } + + url := tc.url + if url == "" { + url = server.URL + "/" + } + // setup endpoint to respond with expected status code. + url += fmt.Sprintf("?status=%v", tc.statusCode) + sink.url = url + + t.Logf("testcase: %v, fail=%v", url, tc.failure) + // Try a simple event emission. + err := sink.Write(tc.events...) + + if !tc.failure { + if err != nil { + t.Fatalf("unexpected error send event: %v", err) + } + } else { + if err == nil { + t.Fatalf("the endpoint should have rejected the request") + } + } + + if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { + t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) + } + } + + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing http sink: %v", err) + } + + // double close returns error + if err := sink.Close(); err == nil { + t.Fatalf("second close should have returned error: %v", err) + } + +} + +func createTestEvent(action, repo, typ string) Event { + event := createEvent(action) + + event.Target.Type = typ + event.Target.Name = repo + + return *event +} diff --git a/storage/notifications/listener.go b/storage/notifications/listener.go new file mode 100644 index 00000000..2d7bb112 --- /dev/null +++ b/storage/notifications/listener.go @@ -0,0 +1,140 @@ +package notifications + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" +) + +// ManifestListener describes a set of methods for listening to events related to manifests. +type ManifestListener interface { + ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error + ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error +} + +// LayerListener describes a listener that can respond to layer related events. +type LayerListener interface { + LayerPushed(repo storage.Repository, layer storage.Layer) error + LayerPulled(repo storage.Repository, layer storage.Layer) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + LayerDeleted(repo storage.Repository, layer storage.Layer) error +} + +// Listener combines all repository events into a single interface. +type Listener interface { + ManifestListener + LayerListener +} + +type repositoryListener struct { + storage.Repository + listener Listener +} + +// Listen dispatches events on the repository to the listener. +func Listen(repo storage.Repository, listener Listener) storage.Repository { + return &repositoryListener{ + Repository: repo, + listener: listener, + } +} + +func (rl *repositoryListener) Manifests() storage.ManifestService { + return &manifestServiceListener{ + ManifestService: rl.Repository.Manifests(), + parent: rl, + } +} + +func (rl *repositoryListener) Layers() storage.LayerService { + return &layerServiceListener{ + LayerService: rl.Repository.Layers(), + parent: rl, + } +} + +type manifestServiceListener struct { + storage.ManifestService + parent *repositoryListener +} + +func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) { + sm, err := msl.ManifestService.Get(tag) + if err == nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest pull to listener: %v", err) + } + } + + return sm, err +} + +func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error { + err := msl.ManifestService.Put(tag, sm) + + if err == nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest push to listener: %v", err) + } + } + + return err +} + +type layerServiceListener struct { + storage.LayerService + parent *repositoryListener +} + +func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { + layer, err := lsl.LayerService.Fetch(dgst) + if err == nil { + if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer pull to listener: %v", err) + } + } + + return layer, err +} + +func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Upload() + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Resume(uuid) + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { + return &layerUploadListener{ + LayerUpload: lu, + parent: lsl, + } +} + +type layerUploadListener struct { + storage.LayerUpload + parent *layerServiceListener +} + +func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { + layer, err := lul.LayerUpload.Finish(dgst) + if err == nil { + if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer push to listener: %v", err) + } + } + + return layer, err +} diff --git a/storage/notifications/listener_test.go b/storage/notifications/listener_test.go new file mode 100644 index 00000000..17af8b15 --- /dev/null +++ b/storage/notifications/listener_test.go @@ -0,0 +1,151 @@ +package notifications + +import ( + "io" + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" +) + +func TestListener(t *testing.T) { + registry := storage.NewRegistryWithDriver(inmemory.New()) + tl := &testListener{ + ops: make(map[string]int), + } + repository := Listen(registry.Repository("foo/bar"), tl) + + // Now take the registry through a number of operations + checkExerciseRepository(t, repository) + + expectedOps := map[string]int{ + "manifest:push": 1, + "manifest:pull": 1, + // "manifest:delete": 0, // deletes not supported for now + "layer:push": 2, + "layer:pull": 2, + // "layer:delete": 0, // deletes not supported for now + } + + if !reflect.DeepEqual(tl.ops, expectedOps) { + t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps) + } + +} + +type testListener struct { + ops map[string]int +} + +func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:push"]++ + + return nil +} + +func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:pull"]++ + return nil +} + +func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:delete"]++ + return nil +} + +func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:push"]++ + return nil +} + +func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:pull"]++ + return nil +} + +func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:delete"]++ + return nil +} + +// checkExerciseRegistry takes the registry through all of its operations, +// carrying out generic checks. +func checkExerciseRepository(t *testing.T, repository storage.Repository) { + // TODO(stevvooe): This would be a nice testutil function. Basically, it + // takes the registry through a common set of operations. This could be + // used to make cross-cutting updates by changing internals that affect + // update counts. Basically, it would make writing tests a lot easier. + + tag := "thetag" + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: repository.Name(), + Tag: tag, + } + + layers := repository.Layers() + for i := 0; i < 2; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating test layer: %v", err) + } + dgst := digest.Digest(ds) + upload, err := layers.Upload() + if err != nil { + t.Fatalf("error creating layer upload: %v", err) + } + + // Use the resumes, as well! + upload, err = layers.Resume(upload.UUID()) + if err != nil { + t.Fatalf("error resuming layer upload: %v", err) + } + + io.Copy(upload, rs) + + if _, err := upload.Finish(dgst); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + + m.FSLayers = append(m.FSLayers, manifest.FSLayer{ + BlobSum: dgst, + }) + + // Then fetch the layers + if _, err := layers.Fetch(dgst); err != nil { + t.Fatalf("error fetching layer: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("unexpected error signing manifest: %v", err) + } + + manifests := repository.Manifests() + + if err := manifests.Put(tag, sm); err != nil { + t.Fatalf("unexpected error putting the manifest: %v", err) + } + + fetched, err := manifests.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if fetched.Tag != fetched.Tag { + t.Fatalf("retrieved unexpected manifest: %v", err) + } +} diff --git a/storage/notifications/metrics.go b/storage/notifications/metrics.go new file mode 100644 index 00000000..2a8ffcbd --- /dev/null +++ b/storage/notifications/metrics.go @@ -0,0 +1,152 @@ +package notifications + +import ( + "expvar" + "fmt" + "net/http" + "sync" +) + +// EndpointMetrics track various actions taken by the endpoint, typically by +// number of events. The goal of this to export it via expvar but we may find +// some other future solution to be better. +type EndpointMetrics struct { + Pending int // events pending in queue + Events int // total events incoming + Successes int // total events written successfully + Failures int // total events failed + Errors int // total events errored + Statuses map[string]int // status code histogram, per call event +} + +// safeMetrics guards the metrics implementation with a lock and provides a +// safe update function. +type safeMetrics struct { + EndpointMetrics + sync.Mutex // protects statuses map +} + +// newSafeMetrics returns safeMetrics with map allocated. +func newSafeMetrics() *safeMetrics { + var sm safeMetrics + sm.Statuses = make(map[string]int) + return &sm +} + +// httpStatusListener returns the listener for the http sink that updates the +// relevent counters. +func (sm *safeMetrics) httpStatusListener() httpStatusListener { + return &endpointMetricsHTTPStatusListener{ + safeMetrics: sm, + } +} + +// eventQueueListener returns a listener that maintains queue related counters. +func (sm *safeMetrics) eventQueueListener() eventQueueListener { + return &endpointMetricsEventQueueListener{ + safeMetrics: sm, + } +} + +// endpointMetricsHTTPStatusListener increments counters related to http sinks +// for the relevent events. +type endpointMetricsHTTPStatusListener struct { + *safeMetrics +} + +var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} + +func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Successes += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Failures += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Errors += len(events) +} + +// endpointMetricsEventQueueListener maintains the incoming events counter and +// the queues pending count. +type endpointMetricsEventQueueListener struct { + *safeMetrics +} + +func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Events += len(events) + eqc.Pending += len(events) +} + +func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Pending -= len(events) +} + +// endpoints is global registry of endpoints used to report metrics to expvar +var endpoints struct { + registered []*Endpoint + mu sync.Mutex +} + +// register places the endpoint into expvar so that stats are tracked. +func register(e *Endpoint) { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + endpoints.registered = append(endpoints.registered, e) +} + +func init() { + // NOTE(stevvooe): Setup registry metrics structure to report to expvar. + // Ideally, we do more metrics through logging but we need some nice + // realtime metrics for queue state for now. + + registry := expvar.Get("registry") + + if registry == nil { + registry = expvar.NewMap("registry") + } + + var notifications expvar.Map + notifications.Init() + notifications.Set("endpoints", expvar.Func(func() interface{} { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + var names []interface{} + for _, v := range endpoints.registered { + var epjson struct { + Name string `json:"name"` + URL string `json:"url"` + EndpointConfig + + Metrics EndpointMetrics + } + + epjson.Name = v.Name() + epjson.URL = v.URL() + epjson.EndpointConfig = v.EndpointConfig + + v.ReadMetrics(&epjson.Metrics) + + names = append(names, epjson) + } + + return names + })) + + registry.(*expvar.Map).Set("notifications", ¬ifications) +} diff --git a/storage/notifications/sinks.go b/storage/notifications/sinks.go new file mode 100644 index 00000000..2bf63e2d --- /dev/null +++ b/storage/notifications/sinks.go @@ -0,0 +1,337 @@ +package notifications + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// NOTE(stevvooe): This file contains definitions for several utility sinks. +// Typically, the broadcaster is the only sink that should be required +// externally, but others are suitable for export if the need arises. Albeit, +// the tight integration with endpoint metrics should be removed. + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan []Event + closed chan chan struct{} +} + +// NewBroadcaster ... +// Add appends one or more sinks to the list of sinks. The broadcaster +// behavior will be affected by the properties of the sink. Generally, the +// sink should accept all messages and deal with reliability on its own. Use +// of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan []Event), + closed: make(chan chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts a block of events to be dispatched to all sinks. This method +// will never fail and should never block (hopefully!). The caller cedes the +// slice memory to the broadcaster and should not modify it after calling +// write. +func (b *Broadcaster) Write(events ...Event) error { + select { + case b.events <- events: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + logrus.Infof("broadcaster: closing") + select { + case <-b.closed: + // already closed + return fmt.Errorf("broadcaster: already closed") + default: + // do a little chan handoff dance to synchronize closing + closed := make(chan struct{}) + b.closed <- closed + close(b.closed) + <-closed + return nil + } +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + for { + select { + case block := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(block...); err != nil { + logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) + } + } + case closing := <-b.closed: + + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil { + logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) + } + } + closing <- struct{}{} + + logrus.Debugf("broadcaster: closed") + return + } + } +} + +// eventQueue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type eventQueue struct { + sink Sink + events *list.List + listeners []eventQueueListener + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// eventQueueListener is called when various events happen on the queue. +type eventQueueListener interface { + ingress(events ...Event) + egress(events ...Event) +} + +// newEventQueue returns a queue to the provided sink. If the updater is non- +// nil, it will be called to update pending metrics on ingress and egress. +func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { + eq := eventQueue{ + sink: sink, + events: list.New(), + listeners: listeners, + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// beend closed. +func (eq *eventQueue) Write(events ...Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + for _, listener := range eq.listeners { + listener.ingress(events...) + } + eq.events.PushBack(events) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *eventQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return fmt.Errorf("eventqueue: already closed") + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + + return eq.sink.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *eventQueue) run() { + for { + block := eq.next() + + if block == nil { + return // nil block means event queue is closed. + } + + if err := eq.sink.Write(block...); err != nil { + logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) + } + + for _, listener := range eq.listeners { + listener.egress(block...) + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *eventQueue) next() []Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.([]Event) + eq.events.Remove(front) + + return block +} + +// retryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Internally, it is a circuit breaker retries to manage reset. +// Concurrent calls to a retrying sink are serialized through the sink, +// meaning that if one is in-flight, another will not proceed. +type retryingSink struct { + mu sync.Mutex + sink Sink + closed bool + + // circuit breaker hueristics + failures struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + } +} + +type retryingSinkListener interface { + active(events ...Event) + retry(events ...Event) +} + +// TODO(stevvooe): We are using circuit break here, which actually doesn't +// make a whole lot of sense for this use case, since we always retry. Move +// this to use bounded exponential backoff. + +// newRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { + rs := &retryingSink{ + sink: sink, + } + rs.failures.threshold = threshold + rs.failures.backoff = backoff + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *retryingSink) Write(events ...Event) error { + rs.mu.Lock() + defer rs.mu.Unlock() + +retry: + + if rs.closed { + return ErrSinkClosed + } + + if !rs.proceed() { + logrus.Warnf("%v encountered too many errors, backing off", rs.sink) + rs.wait(rs.failures.backoff) + goto retry + } + + if err := rs.write(events...); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logrus.Errorf("retryingsink: error writing events: %v, retrying", err) + goto retry + } + + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *retryingSink) Close() error { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.closed { + return fmt.Errorf("retryingsink: already closed") + } + + rs.closed = true + return rs.sink.Close() +} + +// write provides a helper that dispatches failure and success properly. Used +// by write as the single-flight write call. +func (rs *retryingSink) write(events ...Event) error { + if err := rs.sink.Write(events...); err != nil { + rs.failure() + return err + } + + rs.reset() + return nil +} + +// wait backoff time against the sink, unlocking so others can proceed. Should +// only be called by methods that currently have the mutex. +func (rs *retryingSink) wait(backoff time.Duration) { + rs.mu.Unlock() + defer rs.mu.Lock() + + // backoff here + time.Sleep(backoff) +} + +// reset marks a succesful call. +func (rs *retryingSink) reset() { + rs.failures.recent = 0 + rs.failures.last = time.Time{} +} + +// failure records a failure. +func (rs *retryingSink) failure() { + rs.failures.recent++ + rs.failures.last = time.Now().UTC() +} + +// proceed returns true if the call should proceed based on circuit breaker +// hueristics. +func (rs *retryingSink) proceed() bool { + return rs.failures.recent < rs.failures.threshold || + time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) +} diff --git a/storage/notifications/sinks_test.go b/storage/notifications/sinks_test.go new file mode 100644 index 00000000..89756a99 --- /dev/null +++ b/storage/notifications/sinks_test.go @@ -0,0 +1,223 @@ +package notifications + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + + for i := 0; i < 10; i++ { + sinks = append(sinks, &testSink{}) + } + + b := NewBroadcaster(sinks...) + + var block []Event + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := b.Write(block...); err != nil { + t.Fatalf("error writing block of length %d: %v", len(block), err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() // Wait until writes complete + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } + +} + +func TestEventQueue(t *testing.T) { + const nevents = 1000 + var ts testSink + metrics := newSafeMetrics() + eq := newEventQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: &ts, + delay: time.Millisecond * 1, + }, metrics.eventQueueListener()) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= nevents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := eq.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + metrics.Lock() + defer metrics.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + + if metrics.Events != nevents { + t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents) + } + + if metrics.Pending != 0 { + t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0) + } +} + +func TestRetryingSink(t *testing.T) { + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + var ts testSink + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: &ts, + } + s := newRetryingSink(flaky, 3, 10*time.Millisecond) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= 100; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + // Above 50, set the failure rate lower + if i > 50 { + s.mu.Lock() + flaky.rate = 0.90 + s.mu.Unlock() + } + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + defer wg.Done() + if err := s.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != 100 { + t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) + } +} + +type testSink struct { + events []Event + mu sync.Mutex + closed bool +} + +func (ts *testSink) Write(events ...Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.events = append(ts.events, events...) + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.closed = true + + logrus.Infof("closing testSink") + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(events ...Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(events...) +} + +type flakySink struct { + Sink + rate float64 +} + +func (fs *flakySink) Write(events ...Event) error { + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing %d events", len(events)) + } + + return fs.Sink.Write(events...) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err == nil { + t.Fatalf("no error on double close") + } + + // Write after closed should be an error + if err := sink.Write([]Event{}...); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +} From e5de2594ad4a15db602967c40e0121982e64d157 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 27 Jan 2015 23:46:50 -0800 Subject: [PATCH 4/7] Remove decorator package After implementing notifications end to end, it was found that decorating repositories was more straightforward that previously thought. It's unfortunate to can this package, but it led to the techniques employed in storage/notifications/listeners.go. The ultimate result turned out much better. --- storage/decorator/decorator.go | 185 ---------------------------- storage/decorator/decorator_test.go | 138 --------------------- 2 files changed, 323 deletions(-) delete mode 100644 storage/decorator/decorator.go delete mode 100644 storage/decorator/decorator_test.go diff --git a/storage/decorator/decorator.go b/storage/decorator/decorator.go deleted file mode 100644 index 2ebcc94c..00000000 --- a/storage/decorator/decorator.go +++ /dev/null @@ -1,185 +0,0 @@ -package decorator - -import ( - "github.com/docker/distribution/digest" - "github.com/docker/distribution/storage" -) - -// Decorator provides an interface for intercepting object creation within a -// registry. The single method accepts an registry storage object, such as a -// Layer, optionally replacing it upon with an alternative object or a -// wrapper. -// -// For example, if one wants to intercept the instantiation of a layer, an -// implementation might be as follows: -// -// func (md *DecoratorImplementation) Decorate(v interface{}) interface{} { -// switch v := v.(type) { -// case Layer: -// return wrapLayer(v) -// } -// -// // Make sure to return the object or nil if the decorator doesn't require -// // replacement. -// return v -// } -// -// Such a decorator can be used to intercept calls to support implementing -// complex features outside of the storage package. -type Decorator interface { - Decorate(v interface{}) interface{} -} - -// Func provides a shortcut handler for decorators that only need a -// function. Use is similar to http.HandlerFunc. -type Func func(v interface{}) interface{} - -// Decorate allows DecoratorFunc to implement the Decorator interface. -func (df Func) Decorate(v interface{}) interface{} { - return df(v) -} - -// DecorateRegistry the provided registry with decorator. Registries may be -// decorated multiple times. -func DecorateRegistry(registry storage.Registry, decorator Decorator) storage.Registry { - return ®istryDecorator{ - Registry: registry, - decorator: decorator, - } -} - -// registryDecorator intercepts registry object creation with a decorator. -type registryDecorator struct { - storage.Registry - decorator Decorator -} - -// Repository overrides the method of the same name on the Registry, replacing -// the returned instance with a decorator. -func (rd *registryDecorator) Repository(name string) storage.Repository { - delegate := rd.Registry.Repository(name) - decorated := rd.decorator.Decorate(delegate) - if decorated != nil { - repository, ok := decorated.(storage.Repository) - - if ok { - delegate = repository - } - } - - return &repositoryDecorator{ - Repository: delegate, - decorator: rd.decorator, - } -} - -// repositoryDecorator decorates a repository, intercepting calls to Layers -// and Manifests with injected variants. -type repositoryDecorator struct { - storage.Repository - decorator Decorator -} - -// Layers overrides the Layers method of Repository. -func (rd *repositoryDecorator) Layers() storage.LayerService { - delegate := rd.Repository.Layers() - decorated := rd.decorator.Decorate(delegate) - - if decorated != nil { - layers, ok := decorated.(storage.LayerService) - - if ok { - delegate = layers - } - } - - return &layerServiceDecorator{ - LayerService: delegate, - decorator: rd.decorator, - } -} - -// Manifests overrides the Manifests method of Repository. -func (rd *repositoryDecorator) Manifests() storage.ManifestService { - delegate := rd.Repository.Manifests() - decorated := rd.decorator.Decorate(delegate) - - if decorated != nil { - manifests, ok := decorated.(storage.ManifestService) - - if ok { - delegate = manifests - } - } - - // NOTE(stevvooe): We do not have to intercept delegate calls to the - // manifest service since it doesn't produce any interfaces for which - // interception is supported. - return delegate -} - -// layerServiceDecorator intercepts calls that generate Layer and LayerUpload -// instances, replacing them with instances from the decorator. -type layerServiceDecorator struct { - storage.LayerService - decorator Decorator -} - -// Fetch overrides the Fetch method of LayerService. -func (lsd *layerServiceDecorator) Fetch(digest digest.Digest) (storage.Layer, error) { - delegate, err := lsd.LayerService.Fetch(digest) - return decorateLayer(lsd.decorator, delegate), err -} - -// Upload overrides the Upload method of LayerService. -func (lsd *layerServiceDecorator) Upload() (storage.LayerUpload, error) { - delegate, err := lsd.LayerService.Upload() - return decorateLayerUpload(lsd.decorator, delegate), err -} - -// Resume overrides the Resume method of LayerService. -func (lsd *layerServiceDecorator) Resume(uuid string) (storage.LayerUpload, error) { - delegate, err := lsd.LayerService.Resume(uuid) - return decorateLayerUpload(lsd.decorator, delegate), err -} - -// layerUploadDecorator intercepts calls that generate Layer instances, -// replacing them with instances from the decorator. -type layerUploadDecorator struct { - storage.LayerUpload - decorator Decorator -} - -func (lud *layerUploadDecorator) Finish(dgst digest.Digest) (storage.Layer, error) { - delegate, err := lud.LayerUpload.Finish(dgst) - return decorateLayer(lud.decorator, delegate), err -} - -// decorateLayer guarantees that a layer gets correctly decorated. -func decorateLayer(decorator Decorator, delegate storage.Layer) storage.Layer { - decorated := decorator.Decorate(delegate) - if decorated != nil { - layer, ok := decorated.(storage.Layer) - if ok { - delegate = layer - } - } - - return delegate -} - -// decorateLayerUpload guarantees that an upload gets correctly decorated. -func decorateLayerUpload(decorator Decorator, delegate storage.LayerUpload) storage.LayerUpload { - decorated := decorator.Decorate(delegate) - if decorated != nil { - layerUpload, ok := decorated.(storage.LayerUpload) - if ok { - delegate = layerUpload - } - } - - return &layerUploadDecorator{ - LayerUpload: delegate, - decorator: decorator, - } -} diff --git a/storage/decorator/decorator_test.go b/storage/decorator/decorator_test.go deleted file mode 100644 index 213cf755..00000000 --- a/storage/decorator/decorator_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package decorator - -import ( - "io" - "testing" - - "github.com/docker/libtrust" - - "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest" - "github.com/docker/distribution/storage" - "github.com/docker/distribution/storagedriver/inmemory" - "github.com/docker/distribution/testutil" -) - -func TestRegistryDecorator(t *testing.T) { - // Initialize the expected decorations. Call counting is a horrible way to - // test this but should keep this code from being atrocious. - expected := map[string]int{ - "repository": 1, - "manifestservice": 1, - "layerservice": 1, - "layer": 4, - "layerupload": 4, - } - decorated := map[string]int{} - - decorator := Func(func(v interface{}) interface{} { - switch v := v.(type) { - case storage.Repository: - t.Logf("decorate repository: %T", v) - decorated["repository"]++ - case storage.ManifestService: - t.Logf("decorate manifestservice: %T", v) - decorated["manifestservice"]++ - case storage.LayerService: - t.Logf("decorate layerservice: %T", v) - decorated["layerservice"]++ - case storage.Layer: - t.Logf("decorate layer: %T", v) - decorated["layer"]++ - case storage.LayerUpload: - t.Logf("decorate layerupload: %T", v) - decorated["layerupload"]++ - default: - t.Fatalf("unexpected object decorated: %v", v) - } - - return v - }) - - registry := storage.NewRegistryWithDriver(inmemory.New()) - registry = DecorateRegistry(registry, decorator) - - // Now take the registry through a number of operations - checkExerciseRegistry(t, registry) - - for component, calls := range expected { - if decorated[component] != calls { - t.Fatalf("%v was not decorated expected number of times: %d != %d", component, decorated[component], calls) - } - } - -} - -// checkExerciseRegistry takes the registry through all of its operations, -// carrying out generic checks. -func checkExerciseRegistry(t *testing.T, registry storage.Registry) { - name := "foo/bar" - tag := "thetag" - repository := registry.Repository(name) - m := manifest.Manifest{ - Versioned: manifest.Versioned{ - SchemaVersion: 1, - }, - Name: name, - Tag: tag, - } - - layers := repository.Layers() - for i := 0; i < 2; i++ { - rs, ds, err := testutil.CreateRandomTarFile() - if err != nil { - t.Fatalf("error creating test layer: %v", err) - } - dgst := digest.Digest(ds) - upload, err := layers.Upload() - if err != nil { - t.Fatalf("error creating layer upload: %v", err) - } - - // Use the resumes, as well! - upload, err = layers.Resume(upload.UUID()) - if err != nil { - t.Fatalf("error resuming layer upload: %v", err) - } - - io.Copy(upload, rs) - - if _, err := upload.Finish(dgst); err != nil { - t.Fatalf("unexpected error finishing upload: %v", err) - } - - m.FSLayers = append(m.FSLayers, manifest.FSLayer{ - BlobSum: dgst, - }) - - // Then fetch the layers - if _, err := layers.Fetch(dgst); err != nil { - t.Fatalf("error fetching layer: %v", err) - } - } - - pk, err := libtrust.GenerateECP256PrivateKey() - if err != nil { - t.Fatalf("unexpected error generating key: %v", err) - } - - sm, err := manifest.Sign(&m, pk) - if err != nil { - t.Fatalf("unexpected error signing manifest: %v", err) - } - - manifests := repository.Manifests() - - if err := manifests.Put(tag, sm); err != nil { - t.Fatalf("unexpected error putting the manifest: %v", err) - } - - fetched, err := manifests.Get(tag) - if err != nil { - t.Fatalf("unexpected error fetching manifest: %v", err) - } - - if fetched.Tag != fetched.Tag { - t.Fatalf("retrieved unexpected manifest: %v", err) - } -} From 499382dd0b9e1b2603c9d61af34dbf14365371a8 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 28 Jan 2015 15:45:25 -0800 Subject: [PATCH 5/7] Add debug server to support pprof and expvar If configured, a debug http server will be started to serve default registered endpoints, such as pprof and expvar. The endpoint should be secured carefully and not available to external traffic. It is disabled by default but the development config has been modified to make it available on localhost. Signed-off-by: Stephen J Day --- cmd/registry/config.yml | 3 +++ cmd/registry/main.go | 15 +++++++++++++++ configuration/configuration.go | 8 ++++++++ 3 files changed, 26 insertions(+) diff --git a/cmd/registry/config.yml b/cmd/registry/config.yml index bf79ca8f..4919e6db 100644 --- a/cmd/registry/config.yml +++ b/cmd/registry/config.yml @@ -5,3 +5,6 @@ storage: rootdirectory: /tmp/registry-dev http: addr: :5000 + secret: asecretforlocaldevelopment + debug: + addr: localhost:5001 diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 73c85ae1..98f3d5bd 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -1,6 +1,7 @@ package main import ( + _ "expvar" "flag" "fmt" "net/http" @@ -47,6 +48,10 @@ func main() { handler = handlers.CombinedLoggingHandler(os.Stdout, handler) log.SetLevel(logLevel(config.Loglevel)) + if config.HTTP.Debug.Addr != "" { + go debugServer(config.HTTP.Debug.Addr) + } + if config.HTTP.TLS.Certificate == "" { log.Infof("listening on %v", config.HTTP.Addr) if err := http.ListenAndServe(config.HTTP.Addr, handler); err != nil { @@ -142,3 +147,13 @@ func configureReporting(app *registry.App) http.Handler { return handler } + +// debugServer starts the debug server with pprof, expvar among other +// endpoints. The addr should not be exposed externally. For most of these to +// work, tls cannot be enabled on the endpoint, so it is generally separate. +func debugServer(addr string) { + log.Infof("debug server listening %v", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("error listening on debug interface: %v", err) + } +} diff --git a/configuration/configuration.go b/configuration/configuration.go index 8dd71e78..0cf5bc02 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -54,6 +54,14 @@ type Configuration struct { // Certificate. Key string `yaml:"key"` } `yaml:"tls"` + + // Debug configures the http debug interface, if specified. This can + // include services such as pprof, expvar and other data that should + // not be exposed externally. Left disabled by default. + Debug struct { + // Addr specifies the bind address for the debug server. + Addr string `yaml:"addr"` + } `yaml:"debug"` } `yaml:"http"` } From 0a29b59e1495b36e78874cb04ac4ce93d23b4e87 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 28 Jan 2015 15:55:18 -0800 Subject: [PATCH 6/7] Webhook notification support in registry webapp Endpoints are now created at applications startup time, using notification configuration. The instances are then added to a Broadcaster instance, which becomes the main event sink for the application. At request time, an event bridge is configured to listen to repository method calls. The actor and source of the eventBridge are created from the requeest context and application, respectively. The result is notifications are dispatched with calls to the context's Repository instance and are queued to each endpoint via the broadcaster. This commit also adds the concept of a RequestID and App.InstanceID. The request id uniquely identifies each request and the InstanceID uniquely identifies a run of the registry. These identifiers can be used in the future to correlate log messages with generated events to support rich debugging. The fields of the app were slightly reorganized for clarity and a few horrid util functions have been removed. Signed-off-by: Stephen J Day --- cmd/registry/config.yml | 16 ++++ configuration/configuration.go | 26 ++++++ configuration/configuration_test.go | 30 +++++++ registry/app.go | 121 +++++++++++++++++++++++----- registry/context.go | 3 + registry/util.go | 27 ------- storage/notifications/bridge.go | 4 +- storage/notifications/event.go | 7 +- storage/notifications/event_test.go | 6 +- 9 files changed, 185 insertions(+), 55 deletions(-) delete mode 100644 registry/util.go diff --git a/cmd/registry/config.yml b/cmd/registry/config.yml index 4919e6db..bb3ade11 100644 --- a/cmd/registry/config.yml +++ b/cmd/registry/config.yml @@ -8,3 +8,19 @@ http: secret: asecretforlocaldevelopment debug: addr: localhost:5001 +notifications: + endpoints: + - name: local-8082 + url: http://localhost:5003/callback + headers: + Authorization: [Bearer ] + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true + - name: local-8083 + url: http://localhost:8083/callback + timeout: 1s + threshold: 10 + backoff: 1s + disabled: true diff --git a/configuration/configuration.go b/configuration/configuration.go index 0cf5bc02..ed086ba5 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -4,8 +4,10 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "reflect" "strings" + "time" ) // Configuration is a versioned registry configuration, intended to be provided by a yaml file, and @@ -63,6 +65,10 @@ type Configuration struct { Addr string `yaml:"addr"` } `yaml:"debug"` } `yaml:"http"` + + // Notifications specifies configuration about various endpoint to which + // registry events are dispatched. + Notifications Notifications `yaml:"notifications"` } // v0_1Configuration is a Version 0.1 Configuration struct @@ -240,6 +246,26 @@ func (auth Auth) MarshalYAML() (interface{}, error) { return map[string]Parameters(auth), nil } +// Notifications configures multiple http endpoints. +type Notifications struct { + // Endpoints is a list of http configurations for endpoints that + // respond to webhook notifications. In the future, we may allow other + // kinds of endpoints, such as external queues. + Endpoints []Endpoint `yaml:"endpoints"` +} + +// Endpoint describes the configuration of an http webhook notification +// endpoint. +type Endpoint struct { + Name string `yaml:"name"` // identifies the endpoint in the registry instance. + Disabled bool `yaml:"disabled"` // disables the endpoint + URL string `yaml:"url"` // post url for the endpoint. + Headers http.Header `yaml:"headers"` // static headers that should be added to all requests + Timeout time.Duration `yaml:"timeout"` // HTTP timeout + Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure + Backoff time.Duration `yaml:"backoff"` // backoff duration +} + // Reporting defines error reporting methods. type Reporting struct { // Bugsnag configures error reporting for Bugsnag (bugsnag.com). diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index ad845a41..5a6abf90 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -2,6 +2,7 @@ package configuration import ( "bytes" + "net/http" "os" "testing" @@ -40,6 +41,17 @@ var configStruct = Configuration{ APIKey: "BugsnagApiKey", }, }, + Notifications: Notifications{ + Endpoints: []Endpoint{ + { + Name: "endpoint-1", + URL: "http://example.com", + Headers: http.Header{ + "Authorization": []string{"Bearer "}, + }, + }, + }, + }, } // configYamlV0_1 is a Version 0.1 yaml document representing configStruct @@ -61,6 +73,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] reporting: bugsnag: apikey: BugsnagApiKey @@ -76,6 +94,12 @@ auth: silly: realm: silly service: silly +notifications: + endpoints: + - name: endpoint-1 + url: http://example.com + headers: + Authorization: [Bearer ] ` type ConfigSuite struct { @@ -129,6 +153,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *C) { suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}} suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}} suite.expectedConfig.Reporting = Reporting{} + suite.expectedConfig.Notifications = Notifications{} os.Setenv("REGISTRY_STORAGE", "filesystem") os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot") @@ -292,5 +317,10 @@ func copyConfig(config Configuration) *Configuration { configCopy.Auth.setParameter(k, v) } + configCopy.Notifications = Notifications{Endpoints: []Endpoint{}} + for _, v := range config.Notifications.Endpoints { + configCopy.Notifications.Endpoints = append(configCopy.Notifications.Endpoints, v) + } + return configCopy } diff --git a/registry/app.go b/registry/app.go index b5cb6776..e7c96b74 100644 --- a/registry/app.go +++ b/registry/app.go @@ -2,16 +2,19 @@ package registry import ( "fmt" + "net" "net/http" + "os" + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" "github.com/docker/distribution/auth" "github.com/docker/distribution/configuration" "github.com/docker/distribution/storage" + "github.com/docker/distribution/storage/notifications" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" - - log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" ) @@ -21,17 +24,22 @@ import ( type App struct { Config configuration.Configuration - router *mux.Router + // InstanceID is a unique id assigned to the application on each creation. + // Provides information in the logs and context to identify restarts. + InstanceID string - // driver maintains the app global storage driver instance. - driver storagedriver.StorageDriver + router *mux.Router // main application router, configured with dispatchers + driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. + registry storage.Registry // registry is the primary registry backend for the app instance. + accessController auth.AccessController // main access controller for application - // registry is the primary registry backend for the app instance. - registry storage.Registry + // events contains notification related configuration. + events struct { + sink notifications.Sink + source notifications.SourceRecord + } - layerHandler storage.LayerHandler - - accessController auth.AccessController + layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider } // NewApp takes a configuration and returns a configured app, ready to serve @@ -39,8 +47,9 @@ type App struct { // handlers accordingly. func NewApp(configuration configuration.Configuration) *App { app := &App{ - Config: configuration, - router: v2.Router(), + Config: configuration, + InstanceID: uuid.New(), + router: v2.Router(), } // Register the handler dispatchers. @@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App { app.register(v2.RouteNameBlobUpload, layerUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher) - driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + var err error + app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) if err != nil { // TODO(stevvooe): Move the creation of a service into a protected @@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App { panic(err) } - app.driver = driver + app.configureEvents(&configuration) app.registry = storage.NewRegistryWithDriver(app.driver) authType := configuration.Auth.Type() @@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App { layerHandlerType := configuration.LayerHandler.Type() if layerHandlerType != "" { - lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver) + lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) if err != nil { panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) } @@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App { return app } -func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Set a header with the Docker Distribution API Version for all responses. - w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") - app.router.ServeHTTP(w, r) -} - // register a handler with the application, by route name. The handler will be // passed through the application filters and context will be constructed at // request time. @@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) { app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch)) } +// configureEvents prepares the event sink for action. +func (app *App) configureEvents(configuration *configuration.Configuration) { + // Configure all of the endpoint sinks. + var sinks []notifications.Sink + for _, endpoint := range configuration.Notifications.Endpoints { + if endpoint.Disabled { + log.Infof("endpoint %s disabled, skipping", endpoint.Name) + continue + } + + log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) + endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + }) + + sinks = append(sinks, endpoint) + } + + // NOTE(stevvooe): Moving to a new queueing implementation is as easy as + // replacing broadcaster with a rabbitmq implementation. It's recommended + // that the registry instances also act as the workers to keep deployment + // simple. + app.events.sink = notifications.NewBroadcaster(sinks...) + + // Populate registry event source + hostname, err := os.Hostname() + if err != nil { + hostname = configuration.HTTP.Addr + } else { + // try to pick the port off the config + _, port, err := net.SplitHostPort(configuration.HTTP.Addr) + if err == nil { + hostname = net.JoinHostPort(hostname, port) + } + } + + app.events.source = notifications.SourceRecord{ + Addr: hostname, + InstanceID: app.InstanceID, + } +} + +func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() // ensure that request body is always closed. + + // Set a header with the Docker Distribution API Version for all responses. + w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") + app.router.ServeHTTP(w, r) +} + // dispatchFunc takes a context and request and returns a constructed handler // for the route. The dispatcher will use this to dynamically create request // specific handlers for each endpoint without creating a new router for each @@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { return } + // decorate the authorized repository with an event bridge. + context.Repository = notifications.Listen( + context.Repository, app.eventBridge(context, r)) + context.log = log.WithField("name", context.Repository.Name()) handler := dispatch(context, r) ssrw := &singleStatusResponseWriter{ResponseWriter: w} - context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their @@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context { vars := mux.Vars(r) context := &Context{ App: app, + RequestID: uuid.New(), urlBuilder: v2.NewURLBuilderFromRequest(r), } @@ -268,6 +329,22 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont return nil } +// eventBridge returns a bridge for the current request, configured with the +// correct actor and source. +func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { + // TODO(stevvooe): Need to extract user data from request context using + // auth system. Would prefer to do this during logging refactor and + // addition of user and google context type. + actor := notifications.ActorRecord{ + Name: "--todo--", + Addr: r.RemoteAddr, + Host: r.Host, + RequestID: ctx.RequestID, + } + + return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink) +} + // apiBase implements a simple yes-man for doing overall checks against the // api. This can support auth roundtrips to support docker login. func apiBase(w http.ResponseWriter, r *http.Request) { diff --git a/registry/context.go b/registry/context.go index 8e8d0fed..eaa603a8 100644 --- a/registry/context.go +++ b/registry/context.go @@ -13,6 +13,9 @@ type Context struct { // App points to the application structure that created this context. *App + // RequestID is the unique id of the request. + RequestID string + // Repository is the repository for the current request. All requests // should be scoped to a single repository. This field may be nil. Repository storage.Repository diff --git a/registry/util.go b/registry/util.go deleted file mode 100644 index 976ddf31..00000000 --- a/registry/util.go +++ /dev/null @@ -1,27 +0,0 @@ -package registry - -import ( - "net/http" - "reflect" - "runtime" - - "github.com/gorilla/handlers" -) - -// functionName returns the name of the function fn. -func functionName(fn interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() -} - -// resolveHandlerName attempts to resolve a nice, pretty name for the passed -// in handler. -func resolveHandlerName(method string, handler http.Handler) string { - switch v := handler.(type) { - case handlers.MethodHandler: - return functionName(v[method]) - case http.HandlerFunc: - return functionName(v) - default: - return functionName(handler.ServeHTTP) - } -} diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go index 2ff0dff6..28326cce 100644 --- a/storage/notifications/bridge.go +++ b/storage/notifications/bridge.go @@ -72,7 +72,7 @@ func (b *bridge) createManifestEventAndWrite(action string, repo storage.Reposit func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { event := b.createEvent(action) - event.Target.Type = "manifest" + event.Target.Type = EventTargetTypeManifest event.Target.Name = repo.Name() event.Target.Tag = sm.Tag @@ -107,7 +107,7 @@ func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { event := b.createEvent(action) - event.Target.Type = "layer" + event.Target.Type = EventTargetTypeBlob event.Target.Name = repo.Name() event.Target.Digest = dgst diff --git a/storage/notifications/event.go b/storage/notifications/event.go index f03920ca..fb2094d7 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -12,7 +12,12 @@ const ( EventActionPull = "pull" EventActionPush = "push" EventActionDelete = "delete" - EventActionPing = "ping" +) + +// EventTargetType constants used in Target section of Event. +const ( + EventTargetTypeManifest = "manifest" + EventTargetTypeBlob = "blob" ) // EventsMediaType is the mediatype for the json event envelope. If the Event, diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 77bd19f9..7bb9fa01 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -97,7 +97,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { manifestPush = prototype manifestPush.ID = "asdf-asdf-asdf-asdf-0" manifestPush.Target.Digest = "sha256:0123456789abcdef0" - manifestPush.Target.Type = "manifest" + manifestPush.Target.Type = EventTargetTypeManifest manifestPush.Target.Name = "library/test" manifestPush.Target.Tag = "latest" manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" @@ -106,7 +106,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { layerPush0 = prototype layerPush0.ID = "asdf-asdf-asdf-asdf-1" layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" - layerPush0.Target.Type = "blob" + layerPush0.Target.Type = EventTargetTypeBlob layerPush0.Target.Name = "library/test" layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" @@ -114,7 +114,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { layerPush1 = prototype layerPush1.ID = "asdf-asdf-asdf-asdf-2" layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" - layerPush1.Target.Type = "blob" + layerPush1.Target.Type = EventTargetTypeBlob layerPush1.Target.Name = "library/test" layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest" From 080e329cb1dfa559a758b78ab18942243a1d3924 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 3 Feb 2015 13:28:10 -0800 Subject: [PATCH 7/7] Separate request data from actor in Event To clarify the role of actor, the request data that initiates an event has been separated. The ActorRecord is pared down to just the username. This eliminates confusion about where event related data should be added. Signed-off-by: Stephen J Day --- registry/app.go | 8 +++--- storage/notifications/bridge.go | 35 +++++++++++++++++++------- storage/notifications/event.go | 36 ++++++++++++++++++--------- storage/notifications/event_test.go | 38 ++++++++++++++++++++--------- 4 files changed, 79 insertions(+), 38 deletions(-) diff --git a/registry/app.go b/registry/app.go index e7c96b74..53759a1e 100644 --- a/registry/app.go +++ b/registry/app.go @@ -336,13 +336,11 @@ func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listene // auth system. Would prefer to do this during logging refactor and // addition of user and google context type. actor := notifications.ActorRecord{ - Name: "--todo--", - Addr: r.RemoteAddr, - Host: r.Host, - RequestID: ctx.RequestID, + Name: "--todo--", } + request := notifications.NewRequestRecord(ctx.RequestID, r) - return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink) + return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink) } // apiBase implements a simple yes-man for doing overall checks against the diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go index 28326cce..d6f41ba0 100644 --- a/storage/notifications/bridge.go +++ b/storage/notifications/bridge.go @@ -1,6 +1,7 @@ package notifications import ( + "net/http" "time" "github.com/docker/distribution/manifest" @@ -11,10 +12,11 @@ import ( ) type bridge struct { - ub URLBuilder - actor ActorRecord - source SourceRecord - sink Sink + ub URLBuilder + actor ActorRecord + source SourceRecord + request RequestRecord + sink Sink } var _ Listener = &bridge{} @@ -28,12 +30,26 @@ type URLBuilder interface { // NewBridge returns a notification listener that writes records to sink, // using the actor and source. Any urls populated in the events created by // this bridge will be created using the URLBuilder. -func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, sink Sink) Listener { +// TODO(stevvooe): Update this to simply take a context.Context object. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener { return &bridge{ - ub: ub, - actor: actor, - source: source, - sink: sink, + ub: ub, + actor: actor, + source: source, + request: request, + sink: sink, + } +} + +// NewRequestRecord builds a RequestRecord for use in NewBridge from an +// http.Request, associating it with a request id. +func NewRequestRecord(id string, r *http.Request) RequestRecord { + return RequestRecord{ + ID: id, + Addr: r.RemoteAddr, + Host: r.Host, + Method: r.Method, + UserAgent: r.UserAgent(), } } @@ -125,6 +141,7 @@ func (b *bridge) createEvent(action string) *Event { event := createEvent(action) event.Source = b.source event.Actor = b.actor + event.Request = b.request return event } diff --git a/storage/notifications/event.go b/storage/notifications/event.go index fb2094d7..c23766fa 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -67,6 +67,9 @@ type Event struct { URL string `json:"url,omitempty"` } `json:"target,omitempty"` + // Request covers the request that generated the event. + Request RequestRecord `json:"request,omitempty"` + // Actor specifies the agent that initiated the event. For most // situations, this could be from the authorizaton context of the request. Actor ActorRecord `json:"actor,omitempty"` @@ -86,18 +89,6 @@ type ActorRecord struct { // request context that generated the event. Name string `json:"name,omitempty"` - // Addr contains the ip or hostname and possibly port of the client - // connection that initiated the event. - Addr string `json:"addr,omitempty"` - - // Host is the externally accessible host name of the registry instance, - // as specified by the http host header on incoming requests. - Host string `json:"host,omitempty"` - - // RequestID uniquely identifies the registry request that generated the - // event. - RequestID string `json:"requestID,omitempty"` - // TODO(stevvooe): Look into setting a session cookie to get this // without docker daemon. // SessionID @@ -107,6 +98,27 @@ type ActorRecord struct { // Command } +// RequestRecord covers the request that generated the event. +type RequestRecord struct { + // ID uniquely identifies the request that initiated the event. + ID string `json:"id"` + + // Addr contains the ip or hostname and possibly port of the client + // connection that initiated the event. This is the RemoteAddr from + // the standard http request. + Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // Method has the request method that generated the event. + Method string `json:"method"` + + // UserAgent contains the user agent header of the request. + UserAgent string `json:"useragent"` +} + // SourceRecord identifies the registry node that generated the event. Put // differently, while the actor "initiates" the event, the source "generates" // it. diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 7bb9fa01..cc2180ac 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -25,11 +25,15 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { "tag": "latest", "url": "http://example.com/v2/library/test/manifests/latest" }, - "actor": { - "name": "test-actor", + "request": { + "id": "asdfasdf", "addr": "client.local", "host": "registrycluster.local", - "requestID": "asdfasdf" + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" }, "source": { "addr": "hostname.local:port" @@ -45,11 +49,15 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { "digest": "tarsum.v2+sha256:0123456789abcdef1", "url": "http://example.com/v2/library/test/manifests/latest" }, - "actor": { - "name": "test-actor", + "request": { + "id": "asdfasdf", "addr": "client.local", "host": "registrycluster.local", - "requestID": "asdfasdf" + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" }, "source": { "addr": "hostname.local:port" @@ -65,11 +73,15 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { "digest": "tarsum.v2+sha256:0123456789abcdef2", "url": "http://example.com/v2/library/test/manifests/latest" }, - "actor": { - "name": "test-actor", + "request": { + "id": "asdfasdf", "addr": "client.local", "host": "registrycluster.local", - "requestID": "asdfasdf" + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" }, "source": { "addr": "hostname.local:port" @@ -87,10 +99,12 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { var prototype Event prototype.Action = "push" prototype.Timestamp = tm - prototype.Actor.Addr = "client.local" prototype.Actor.Name = "test-actor" - prototype.Actor.RequestID = "asdfasdf" - prototype.Actor.Host = "registrycluster.local" + prototype.Request.ID = "asdfasdf" + prototype.Request.Addr = "client.local" + prototype.Request.Host = "registrycluster.local" + prototype.Request.Method = "PUT" + prototype.Request.UserAgent = "test/0.1" prototype.Source.Addr = "hostname.local:port" var manifestPush Event