diff --git a/go.mod b/go.mod index 39fd4a71..545b8f0c 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c // indirect github.com/dnaeon/go-vcr v1.0.1 // indirect + github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c github.com/docker/go-metrics v0.0.1 github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7 diff --git a/go.sum b/go.sum index 5a8000e8..d5f3fb83 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c h1:KJAnOBuY9cTKVq github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dnaeon/go-vcr v1.0.1 h1:r8L/HqC0Hje5AXMu1ooW8oyQyOFv4GxqpL0nRP7SLLY= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= +github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c h1:+pKlWGMw7gf6bQ+oDZB4KHQFypsfjYlq/C4rfL7D3g8= +github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQV8= github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 h1:ZClxb8laGDf5arXfYcAtECDFgAgHklGI8CxgjHnXKJ4= diff --git a/notifications/bridge.go b/notifications/bridge.go index 86af43f3..bf62806d 100644 --- a/notifications/bridge.go +++ b/notifications/bridge.go @@ -8,6 +8,7 @@ import ( "github.com/docker/distribution/context" "github.com/docker/distribution/reference" "github.com/docker/distribution/uuid" + events "github.com/docker/go-events" "github.com/opencontainers/go-digest" ) @@ -17,7 +18,7 @@ type bridge struct { actor ActorRecord source SourceRecord request RequestRecord - sink Sink + sink events.Sink } var _ Listener = &bridge{} @@ -32,7 +33,7 @@ type URLBuilder interface { // using the actor and source. Any urls populated in the events created by // this bridge will be created using the URLBuilder. // TODO(stevvooe): Update this to simply take a context.Context object. -func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink, includeReferences bool) Listener { +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink events.Sink, includeReferences bool) Listener { return &bridge{ ub: ub, includeReferences: includeReferences, diff --git a/notifications/bridge_test.go b/notifications/bridge_test.go index 7fab4271..a09be657 100644 --- a/notifications/bridge_test.go +++ b/notifications/bridge_test.go @@ -8,6 +8,7 @@ import ( "github.com/docker/distribution/reference" v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/uuid" + events "github.com/docker/go-events" "github.com/docker/libtrust" "github.com/opencontainers/go-digest" ) @@ -46,8 +47,8 @@ var ( ) func TestEventBridgeManifestPulled(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkCommonManifest(t, EventActionPull, events...) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkCommonManifest(t, EventActionPull, event) return nil })) @@ -59,8 +60,8 @@ func TestEventBridgeManifestPulled(t *testing.T) { } func TestEventBridgeManifestPushed(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkCommonManifest(t, EventActionPush, events...) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkCommonManifest(t, EventActionPush, event) return nil })) @@ -72,10 +73,10 @@ func TestEventBridgeManifestPushed(t *testing.T) { } func TestEventBridgeManifestPushedWithTag(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkCommonManifest(t, EventActionPush, events...) - if events[0].Target.Tag != "latest" { - t.Fatalf("missing or unexpected tag: %#v", events[0].Target) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkCommonManifest(t, EventActionPush, event) + if event.(Event).Target.Tag != "latest" { + t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target) } return nil @@ -88,10 +89,10 @@ func TestEventBridgeManifestPushedWithTag(t *testing.T) { } func TestEventBridgeManifestPulledWithTag(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkCommonManifest(t, EventActionPull, events...) - if events[0].Target.Tag != "latest" { - t.Fatalf("missing or unexpected tag: %#v", events[0].Target) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkCommonManifest(t, EventActionPull, event) + if event.(Event).Target.Tag != "latest" { + t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target) } return nil @@ -104,10 +105,10 @@ func TestEventBridgeManifestPulledWithTag(t *testing.T) { } func TestEventBridgeManifestDeleted(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkDeleted(t, EventActionDelete, events...) - if events[0].Target.Digest != dgst { - t.Fatalf("unexpected digest on event target: %q != %q", events[0].Target.Digest, dgst) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkDeleted(t, EventActionDelete, event) + if event.(Event).Target.Digest != dgst { + t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst) } return nil })) @@ -119,10 +120,10 @@ func TestEventBridgeManifestDeleted(t *testing.T) { } func TestEventBridgeTagDeleted(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkDeleted(t, EventActionDelete, events...) - if events[0].Target.Tag != m.Tag { - t.Fatalf("unexpected tag on event target: %q != %q", events[0].Target.Tag, m.Tag) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkDeleted(t, EventActionDelete, event) + if event.(Event).Target.Tag != m.Tag { + t.Fatalf("unexpected tag on event target: %q != %q", event.(Event).Target.Tag, m.Tag) } return nil })) @@ -134,8 +135,8 @@ func TestEventBridgeTagDeleted(t *testing.T) { } func TestEventBridgeRepoDeleted(t *testing.T) { - l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkDeleted(t, EventActionDelete, events...) + l := createTestEnv(t, testSinkFn(func(event events.Event) error { + checkDeleted(t, EventActionDelete, event) return nil })) @@ -162,36 +163,29 @@ func createTestEnv(t *testing.T, fn testSinkFn) Listener { return NewBridge(ub, source, actor, request, fn, true) } -func checkDeleted(t *testing.T, action string, events ...Event) { - if len(events) != 1 { - t.Fatalf("unexpected number of events: %v != 1", len(events)) +func checkDeleted(t *testing.T, action string, event events.Event) { + if event.(Event).Source != source { + t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source) } - event := events[0] - - if event.Source != source { - t.Fatalf("source not equal: %#v != %#v", event.Source, source) + if event.(Event).Request != request { + t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request) } - if event.Request != request { - t.Fatalf("request not equal: %#v != %#v", event.Request, request) + if event.(Event).Actor != actor { + t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor) } - if event.Actor != actor { - t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) - } - - if event.Target.Repository != repo { - t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) + if event.(Event).Target.Repository != repo { + t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo) } } -func checkCommonManifest(t *testing.T, action string, events ...Event) { - checkCommon(t, events...) +func checkCommonManifest(t *testing.T, action string, event events.Event) { + checkCommon(t, event) - event := events[0] - if event.Action != action { - t.Fatalf("unexpected event action: %q != %q", event.Action, action) + if event.(Event).Action != action { + t.Fatalf("unexpected event action: %q != %q", event.(Event).Action, action) } repoRef, _ := reference.WithName(repo) @@ -201,57 +195,51 @@ func checkCommonManifest(t *testing.T, action string, events ...Event) { t.Fatalf("error building expected url: %v", err) } - if event.Target.URL != u { - t.Fatalf("incorrect url passed: \n%q != \n%q", event.Target.URL, u) + if event.(Event).Target.URL != u { + t.Fatalf("incorrect url passed: \n%q != \n%q", event.(Event).Target.URL, u) } - if len(event.Target.References) != len(layers) { - t.Fatalf("unexpected number of references %v != %v", len(event.Target.References), len(layers)) + if len(event.(Event).Target.References) != len(layers) { + t.Fatalf("unexpected number of references %v != %v", len(event.(Event).Target.References), len(layers)) } - for i, targetReference := range event.Target.References { + for i, targetReference := range event.(Event).Target.References { if targetReference.Digest != layers[i].BlobSum { t.Fatalf("unexpected reference: %q != %q", targetReference.Digest, layers[i].BlobSum) } } } -func checkCommon(t *testing.T, events ...Event) { - if len(events) != 1 { - t.Fatalf("unexpected number of events: %v != 1", len(events)) +func checkCommon(t *testing.T, event events.Event) { + if event.(Event).Source != source { + t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source) } - event := events[0] - - if event.Source != source { - t.Fatalf("source not equal: %#v != %#v", event.Source, source) + if event.(Event).Request != request { + t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request) } - if event.Request != request { - t.Fatalf("request not equal: %#v != %#v", event.Request, request) + if event.(Event).Actor != actor { + t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor) } - if event.Actor != actor { - t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) + if event.(Event).Target.Digest != dgst { + t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst) } - if event.Target.Digest != dgst { - t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst) + if event.(Event).Target.Length != int64(len(payload)) { + t.Fatalf("unexpected target length: %v != %v", event.(Event).Target.Length, len(payload)) } - if event.Target.Length != int64(len(payload)) { - t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload)) - } - - if event.Target.Repository != repo { - t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) + if event.(Event).Target.Repository != repo { + t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo) } } -type testSinkFn func(events ...Event) error +type testSinkFn func(event events.Event) error -func (tsf testSinkFn) Write(events ...Event) error { - return tsf(events...) +func (tsf testSinkFn) Write(event events.Event) error { + return tsf(event) } func (tsf testSinkFn) Close() error { return nil } diff --git a/notifications/endpoint.go b/notifications/endpoint.go index 854f1dd6..da6b9622 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/docker/distribution/configuration" + events "github.com/docker/go-events" ) // EndpointConfig covers the optional configuration parameters for an active @@ -42,7 +43,7 @@ func (ec *EndpointConfig) defaults() { // services when events are written. Writes are non-blocking and always // succeed for callers but events may be queued internally. type Endpoint struct { - Sink + events.Sink url string name string @@ -64,7 +65,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.Sink = newHTTPSink( endpoint.url, endpoint.Timeout, endpoint.Headers, endpoint.Transport, endpoint.metrics.httpStatusListener()) - endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff)) endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...) endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions) diff --git a/notifications/event.go b/notifications/event.go index 54940a46..864783e1 100644 --- a/notifications/event.go +++ b/notifications/event.go @@ -5,6 +5,7 @@ import ( "time" "github.com/docker/distribution" + events "github.com/docker/go-events" ) // EventAction constants used in action field of Event. @@ -30,7 +31,7 @@ const ( type Envelope struct { // Events make up the contents of the envelope. Events present in a single // envelope are not necessarily related. - Events []Event `json:"events,omitempty"` + Events []events.Event `json:"events,omitempty"` } // TODO(stevvooe): The event type should be separate from the json format. It @@ -148,16 +149,3 @@ var ( // retries will not be successful. ErrSinkClosed = fmt.Errorf("sink: closed") ) - -// Sink accepts and sends events. -type Sink interface { - // Write writes one or more events to the sink. If no error is returned, - // the caller will assume that all events have been committed and will not - // try to send them again. If an error is received, the caller may retry - // sending the event. The caller should cede the slice of memory to the - // sink and not modify it after calling this method. - Write(events ...Event) error - - // Close the sink, possibly waiting for pending events to flush. - Close() error -} diff --git a/notifications/http.go b/notifications/http.go index 46f47af2..5d625fd4 100644 --- a/notifications/http.go +++ b/notifications/http.go @@ -7,6 +7,8 @@ import ( "net/http" "sync" "time" + + events "github.com/docker/go-events" ) // httpSink implements a single-flight, http notification endpoint. This is @@ -45,15 +47,15 @@ func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport // httpStatusListener is called on various outcomes of sending notifications. type httpStatusListener interface { - success(status int, events ...Event) - failure(status int, events ...Event) - err(err error, events ...Event) + success(status int, event events.Event) + failure(status int, events events.Event) + err(err error, events events.Event) } // Accept makes an attempt to notify the endpoint, returning an error if it // fails. It is the caller's responsibility to retry on error. The events are // accepted or rejected as a group. -func (hs *httpSink) Write(events ...Event) error { +func (hs *httpSink) Write(event events.Event) error { hs.mu.Lock() defer hs.mu.Unlock() defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections() @@ -63,7 +65,7 @@ func (hs *httpSink) Write(events ...Event) error { } envelope := Envelope{ - Events: events, + Events: []events.Event{event}, } // TODO(stevvooe): It is not ideal to keep re-encoding the request body on @@ -73,7 +75,7 @@ func (hs *httpSink) Write(events ...Event) error { p, err := json.MarshalIndent(envelope, "", " ") if err != nil { for _, listener := range hs.listeners { - listener.err(err, events...) + listener.err(err, event) } return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) } @@ -82,7 +84,7 @@ func (hs *httpSink) Write(events ...Event) error { resp, err := hs.client.Post(hs.url, EventsMediaType, body) if err != nil { for _, listener := range hs.listeners { - listener.err(err, events...) + listener.err(err, event) } return fmt.Errorf("%v: error posting: %v", hs, err) @@ -94,7 +96,7 @@ func (hs *httpSink) Write(events ...Event) error { switch { case resp.StatusCode >= 200 && resp.StatusCode < 400: for _, listener := range hs.listeners { - listener.success(resp.StatusCode, events...) + listener.success(resp.StatusCode, event) } // TODO(stevvooe): This is a little accepting: we may want to support @@ -104,7 +106,7 @@ func (hs *httpSink) Write(events ...Event) error { return nil default: for _, listener := range hs.listeners { - listener.failure(resp.StatusCode, events...) + listener.failure(resp.StatusCode, event) } return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) } diff --git a/notifications/http_test.go b/notifications/http_test.go index 5bf5d5d6..e878dcb0 100644 --- a/notifications/http_test.go +++ b/notifications/http_test.go @@ -14,6 +14,7 @@ import ( "testing" "github.com/docker/distribution/manifest/schema1" + events "github.com/docker/go-events" ) // TestHTTPSink mocks out an http endpoint and notifies it under a couple of @@ -68,8 +69,8 @@ func TestHTTPSink(t *testing.T) { &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) // first make sure that the default transport gives x509 untrusted cert error - events := []Event{} - err := sink.Write(events...) + event := Event{} + err := sink.Write(event) if !strings.Contains(err.Error(), "x509") && !strings.Contains(err.Error(), "unknown ca") { t.Fatal("TLS server with default transport should give unknown CA error") } @@ -83,12 +84,13 @@ func TestHTTPSink(t *testing.T) { } sink = newHTTPSink(server.URL, 0, nil, tr, &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) - err = sink.Write(events...) + err = sink.Write(event) if err != nil { - t.Fatalf("unexpected error writing events: %v", err) + t.Fatalf("unexpected error writing event: %v", err) } // reset server to standard http server and sink to a basic sink + metrics = newSafeMetrics("") server = httptest.NewServer(serverHandler) sink = newHTTPSink(server.URL, 0, nil, nil, &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) @@ -111,46 +113,52 @@ func TestHTTPSink(t *testing.T) { }() for _, tc := range []struct { - events []Event // events to send + event events.Event // events to send url string - failure bool // true if there should be a failure. + isFailure bool // true if there should be a failure. + isError bool // true if the request returns an error statusCode int // if not set, no status code should be incremented. }{ { statusCode: http.StatusOK, - events: []Event{ - createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest)}, + event: createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest), }, { statusCode: http.StatusOK, - events: []Event{ - createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest), - createTestEvent("push", "library/test", layerMediaType), - createTestEvent("push", "library/test", layerMediaType), - }, + event: createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest), + }, + { + statusCode: http.StatusOK, + event: createTestEvent("push", "library/test", layerMediaType), + }, + { + statusCode: http.StatusOK, + event: createTestEvent("push", "library/test", layerMediaType), }, { statusCode: http.StatusTemporaryRedirect, }, { statusCode: http.StatusBadRequest, - failure: true, + isFailure: true, }, { // Case where connection is immediately closed - url: closeL.Addr().String(), - failure: true, + url: "http://" + closeL.Addr().String(), + isError: true, }, } { - if tc.failure { - expectedMetrics.Failures += len(tc.events) + if tc.isFailure { + expectedMetrics.Failures++ + } else if tc.isError { + expectedMetrics.Errors++ } else { - expectedMetrics.Successes += len(tc.events) + expectedMetrics.Successes++ } if tc.statusCode > 0 { - expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))]++ } url := tc.url @@ -161,11 +169,11 @@ func TestHTTPSink(t *testing.T) { url += fmt.Sprintf("?status=%v", tc.statusCode) sink.url = url - t.Logf("testcase: %v, fail=%v", url, tc.failure) + t.Logf("testcase: %v, fail=%v, error=%v", url, tc.isFailure, tc.isError) // Try a simple event emission. - err := sink.Write(tc.events...) + err := sink.Write(tc.event) - if !tc.failure { + if !tc.isFailure && !tc.isError { if err != nil { t.Fatalf("unexpected error send event: %v", err) } @@ -173,6 +181,7 @@ func TestHTTPSink(t *testing.T) { if err == nil { t.Fatalf("the endpoint should have rejected the request") } + t.Logf("write error: %v", err) } if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { diff --git a/notifications/metrics.go b/notifications/metrics.go index 657b2aa0..d9432f0a 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -7,6 +7,7 @@ import ( "sync" prometheus "github.com/docker/distribution/metrics" + events "github.com/docker/go-events" "github.com/docker/go-metrics" ) @@ -70,32 +71,32 @@ type endpointMetricsHTTPStatusListener struct { var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} -func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { +func (emsl *endpointMetricsHTTPStatusListener) success(status int, event events.Event) { emsl.safeMetrics.Lock() defer emsl.safeMetrics.Unlock() - emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) - emsl.Successes += len(events) + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++ + emsl.Successes++ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) - eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(float64(len(events))) + eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1) } -func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, event events.Event) { emsl.safeMetrics.Lock() defer emsl.safeMetrics.Unlock() - emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) - emsl.Failures += len(events) + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++ + emsl.Failures++ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) - eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(float64(len(events))) + eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1) } -func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { +func (emsl *endpointMetricsHTTPStatusListener) err(err error, event events.Event) { emsl.safeMetrics.Lock() defer emsl.safeMetrics.Unlock() - emsl.Errors += len(events) + emsl.Errors++ - eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(float64(len(events))) + eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -104,20 +105,20 @@ type endpointMetricsEventQueueListener struct { *safeMetrics } -func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { +func (eqc *endpointMetricsEventQueueListener) ingress(event events.Event) { eqc.Lock() defer eqc.Unlock() - eqc.Events += len(events) - eqc.Pending += len(events) + eqc.Events++ + eqc.Pending++ eventsCounter.WithValues("Events", eqc.EndpointName).Inc() - pendingGauge.WithValues(eqc.EndpointName).Inc(float64(len(events))) + pendingGauge.WithValues(eqc.EndpointName).Inc(1) } -func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { +func (eqc *endpointMetricsEventQueueListener) egress(event events.Event) { eqc.Lock() defer eqc.Unlock() - eqc.Pending -= len(events) + eqc.Pending-- pendingGauge.WithValues(eqc.EndpointName).Dec(1) } diff --git a/notifications/sinks.go b/notifications/sinks.go index 816f75d7..9334a087 100644 --- a/notifications/sinks.go +++ b/notifications/sinks.go @@ -4,107 +4,16 @@ import ( "container/list" "fmt" "sync" - "time" + events "github.com/docker/go-events" "github.com/sirupsen/logrus" ) -// NOTE(stevvooe): This file contains definitions for several utility sinks. -// Typically, the broadcaster is the only sink that should be required -// externally, but others are suitable for export if the need arises. Albeit, -// the tight integration with endpoint metrics should be removed. - -// Broadcaster sends events to multiple, reliable Sinks. The goal of this -// component is to dispatch events to configured endpoints. Reliability can be -// provided by wrapping incoming sinks. -type Broadcaster struct { - sinks []Sink - events chan []Event - closed chan chan struct{} -} - -// NewBroadcaster ... -// Add appends one or more sinks to the list of sinks. The broadcaster -// behavior will be affected by the properties of the sink. Generally, the -// sink should accept all messages and deal with reliability on its own. Use -// of EventQueue and RetryingSink should be used here. -func NewBroadcaster(sinks ...Sink) *Broadcaster { - b := Broadcaster{ - sinks: sinks, - events: make(chan []Event), - closed: make(chan chan struct{}), - } - - // Start the broadcaster - go b.run() - - return &b -} - -// Write accepts a block of events to be dispatched to all sinks. This method -// will never fail and should never block (hopefully!). The caller cedes the -// slice memory to the broadcaster and should not modify it after calling -// write. -func (b *Broadcaster) Write(events ...Event) error { - select { - case b.events <- events: - case <-b.closed: - return ErrSinkClosed - } - return nil -} - -// Close the broadcaster, ensuring that all messages are flushed to the -// underlying sink before returning. -func (b *Broadcaster) Close() error { - logrus.Infof("broadcaster: closing") - select { - case <-b.closed: - // already closed - return fmt.Errorf("broadcaster: already closed") - default: - // do a little chan handoff dance to synchronize closing - closed := make(chan struct{}) - b.closed <- closed - close(b.closed) - <-closed - return nil - } -} - -// run is the main broadcast loop, started when the broadcaster is created. -// Under normal conditions, it waits for events on the event channel. After -// Close is called, this goroutine will exit. -func (b *Broadcaster) run() { - for { - select { - case block := <-b.events: - for _, sink := range b.sinks { - if err := sink.Write(block...); err != nil { - logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) - } - } - case closing := <-b.closed: - - // close all the underlying sinks - for _, sink := range b.sinks { - if err := sink.Close(); err != nil { - logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) - } - } - closing <- struct{}{} - - logrus.Debugf("broadcaster: closed") - return - } - } -} - // eventQueue accepts all messages into a queue for asynchronous consumption // by a sink. It is unbounded and thread safe but the sink must be reliable or // events will be dropped. type eventQueue struct { - sink Sink + sink events.Sink events *list.List listeners []eventQueueListener cond *sync.Cond @@ -114,13 +23,13 @@ type eventQueue struct { // eventQueueListener is called when various events happen on the queue. type eventQueueListener interface { - ingress(events ...Event) - egress(events ...Event) + ingress(event events.Event) + egress(event events.Event) } // newEventQueue returns a queue to the provided sink. If the updater is non- // nil, it will be called to update pending metrics on ingress and egress. -func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { +func newEventQueue(sink events.Sink, listeners ...eventQueueListener) *eventQueue { eq := eventQueue{ sink: sink, events: list.New(), @@ -134,7 +43,7 @@ func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { // Write accepts the events into the queue, only failing if the queue has // beend closed. -func (eq *eventQueue) Write(events ...Event) error { +func (eq *eventQueue) Write(event events.Event) error { eq.mu.Lock() defer eq.mu.Unlock() @@ -143,9 +52,9 @@ func (eq *eventQueue) Write(events ...Event) error { } for _, listener := range eq.listeners { - listener.ingress(events...) + listener.ingress(event) } - eq.events.PushBack(events) + eq.events.PushBack(event) eq.cond.Signal() // signal waiters return nil @@ -171,18 +80,18 @@ func (eq *eventQueue) Close() error { // run is the main goroutine to flush events to the target sink. func (eq *eventQueue) run() { for { - block := eq.next() + event := eq.next() - if block == nil { + if event == nil { return // nil block means event queue is closed. } - if err := eq.sink.Write(block...); err != nil { + if err := eq.sink.Write(event); err != nil { logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) } for _, listener := range eq.listeners { - listener.egress(block...) + listener.egress(event) } } } @@ -190,7 +99,7 @@ func (eq *eventQueue) run() { // next encompasses the critical section of the run loop. When the queue is // empty, it will block on the condition. If new data arrives, it will wake // and return a block. When closed, a nil slice will be returned. -func (eq *eventQueue) next() []Event { +func (eq *eventQueue) next() events.Event { eq.mu.Lock() defer eq.mu.Unlock() @@ -204,7 +113,7 @@ func (eq *eventQueue) next() []Event { } front := eq.events.Front() - block := front.Value.([]Event) + block := front.Value.(events.Event) eq.events.Remove(front) return block @@ -213,12 +122,12 @@ func (eq *eventQueue) next() []Event { // ignoredSink discards events with ignored target media types and actions. // passes the rest along. type ignoredSink struct { - Sink + events.Sink ignoreMediaTypes map[string]bool ignoreActions map[string]bool } -func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink { +func newIgnoredSink(sink events.Sink, ignored []string, ignoreActions []string) events.Sink { if len(ignored) == 0 { return sink } @@ -242,146 +151,14 @@ func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink { // Write discards events with ignored target media types and passes the rest // along. -func (imts *ignoredSink) Write(events ...Event) error { - var kept []Event - for _, e := range events { - if !imts.ignoreMediaTypes[e.Target.MediaType] { - kept = append(kept, e) - } - } - if len(kept) == 0 { +func (imts *ignoredSink) Write(event events.Event) error { + if imts.ignoreMediaTypes[event.(Event).Target.MediaType] || imts.ignoreActions[event.(Event).Action] { return nil } - var results []Event - for _, e := range kept { - if !imts.ignoreActions[e.Action] { - results = append(results, e) - } - } - if len(results) == 0 { - return nil - } - return imts.Sink.Write(results...) + return imts.Sink.Write(event) } -// retryingSink retries the write until success or an ErrSinkClosed is -// returned. Underlying sink must have p > 0 of succeeding or the sink will -// block. Internally, it is a circuit breaker retries to manage reset. -// Concurrent calls to a retrying sink are serialized through the sink, -// meaning that if one is in-flight, another will not proceed. -type retryingSink struct { - mu sync.Mutex - sink Sink - closed bool - - // circuit breaker heuristics - failures struct { - threshold int - recent int - last time.Time - backoff time.Duration // time after which we retry after failure. - } -} - -// TODO(stevvooe): We are using circuit break here, which actually doesn't -// make a whole lot of sense for this use case, since we always retry. Move -// this to use bounded exponential backoff. - -// newRetryingSink returns a sink that will retry writes to a sink, backing -// off on failure. Parameters threshold and backoff adjust the behavior of the -// circuit breaker. -func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { - rs := &retryingSink{ - sink: sink, - } - rs.failures.threshold = threshold - rs.failures.backoff = backoff - - return rs -} - -// Write attempts to flush the events to the downstream sink until it succeeds -// or the sink is closed. -func (rs *retryingSink) Write(events ...Event) error { - rs.mu.Lock() - defer rs.mu.Unlock() - -retry: - - if rs.closed { - return ErrSinkClosed - } - - if !rs.proceed() { - logrus.Warnf("%v encountered too many errors, backing off", rs.sink) - rs.wait(rs.failures.backoff) - goto retry - } - - if err := rs.write(events...); err != nil { - if err == ErrSinkClosed { - // terminal! - return err - } - - logrus.Errorf("retryingsink: error writing events: %v, retrying", err) - goto retry - } - +func (imts *ignoredSink) Close() error { return nil } - -// Close closes the sink and the underlying sink. -func (rs *retryingSink) Close() error { - rs.mu.Lock() - defer rs.mu.Unlock() - - if rs.closed { - return fmt.Errorf("retryingsink: already closed") - } - - rs.closed = true - return rs.sink.Close() -} - -// write provides a helper that dispatches failure and success properly. Used -// by write as the single-flight write call. -func (rs *retryingSink) write(events ...Event) error { - if err := rs.sink.Write(events...); err != nil { - rs.failure() - return err - } - - rs.reset() - return nil -} - -// wait backoff time against the sink, unlocking so others can proceed. Should -// only be called by methods that currently have the mutex. -func (rs *retryingSink) wait(backoff time.Duration) { - rs.mu.Unlock() - defer rs.mu.Lock() - - // backoff here - time.Sleep(backoff) -} - -// reset marks a successful call. -func (rs *retryingSink) reset() { - rs.failures.recent = 0 - rs.failures.last = time.Time{} -} - -// failure records a failure. -func (rs *retryingSink) failure() { - rs.failures.recent++ - rs.failures.last = time.Now().UTC() -} - -// proceed returns true if the call should proceed based on circuit breaker -// heuristics. -func (rs *retryingSink) proceed() bool { - return rs.failures.recent < rs.failures.threshold || - time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) -} diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 4a69486b..6c3d302e 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -1,68 +1,17 @@ package notifications import ( - "fmt" - "math/rand" "reflect" "sync" "time" + events "github.com/docker/go-events" + "github.com/sirupsen/logrus" "testing" ) -func TestBroadcaster(t *testing.T) { - const nEvents = 1000 - var sinks []Sink - - for i := 0; i < 10; i++ { - sinks = append(sinks, &testSink{}) - } - - b := NewBroadcaster(sinks...) - - var block []Event - var wg sync.WaitGroup - for i := 1; i <= nEvents; i++ { - block = append(block, createTestEvent("push", "library/test", "blob")) - - if i%10 == 0 && i > 0 { - wg.Add(1) - go func(block ...Event) { - if err := b.Write(block...); err != nil { - t.Errorf("error writing block of length %d: %v", len(block), err) - } - wg.Done() - }(block...) - - block = nil - } - } - - wg.Wait() // Wait until writes complete - if t.Failed() { - t.FailNow() - } - checkClose(t, b) - - // Iterate through the sinks and check that they all have the expected length. - for _, sink := range sinks { - ts := sink.(*testSink) - ts.mu.Lock() - defer ts.mu.Unlock() - - if len(ts.events) != nEvents { - t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) - } - - if !ts.closed { - t.Fatalf("sink should have been closed") - } - } - -} - func TestEventQueue(t *testing.T) { const nevents = 1000 var ts testSink @@ -75,20 +24,16 @@ func TestEventQueue(t *testing.T) { }, metrics.eventQueueListener()) var wg sync.WaitGroup - var block []Event + var event events.Event for i := 1; i <= nevents; i++ { - block = append(block, createTestEvent("push", "library/test", "blob")) - if i%10 == 0 && i > 0 { - wg.Add(1) - go func(block ...Event) { - if err := eq.Write(block...); err != nil { - t.Errorf("error writing event block: %v", err) - } - wg.Done() - }(block...) - - block = nil - } + event = createTestEvent("push", "library/test", "blob") + wg.Add(1) + go func(event events.Event) { + if err := eq.Write(event); err != nil { + t.Errorf("error writing event block: %v", err) + } + wg.Done() + }(event) } wg.Wait() @@ -102,8 +47,8 @@ func TestEventQueue(t *testing.T) { metrics.Lock() defer metrics.Unlock() - if len(ts.events) != nevents { - t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + if ts.count != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", ts.count, 1000) } if !ts.closed { @@ -126,16 +71,14 @@ func TestIgnoredSink(t *testing.T) { type testcase struct { ignoreMediaTypes []string ignoreActions []string - expected []Event + expected events.Event } cases := []testcase{ - {nil, nil, []Event{blob, manifest}}, - {[]string{"other"}, []string{"other"}, []Event{blob, manifest}}, - {[]string{"blob"}, []string{"other"}, []Event{manifest}}, + {nil, nil, blob}, + {[]string{"other"}, []string{"other"}, blob}, {[]string{"blob", "manifest"}, []string{"other"}, nil}, - {[]string{"other"}, []string{"push"}, []Event{manifest}}, - {[]string{"other"}, []string{"pull"}, []Event{blob}}, + {[]string{"other"}, []string{"pull"}, blob}, {[]string{"other"}, []string{"pull", "push"}, nil}, } @@ -143,78 +86,54 @@ func TestIgnoredSink(t *testing.T) { ts := &testSink{} s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions) - if err := s.Write(blob, manifest); err != nil { + if err := s.Write(blob); err != nil { t.Fatalf("error writing event: %v", err) } ts.mu.Lock() - if !reflect.DeepEqual(ts.events, c.expected) { - t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected) + if !reflect.DeepEqual(ts.event, c.expected) { + t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected) + } + ts.mu.Unlock() + } + + cases = []testcase{ + {nil, nil, manifest}, + {[]string{"other"}, []string{"other"}, manifest}, + {[]string{"blob"}, []string{"other"}, manifest}, + {[]string{"blob", "manifest"}, []string{"other"}, nil}, + {[]string{"other"}, []string{"push"}, manifest}, + {[]string{"other"}, []string{"pull", "push"}, nil}, + } + + for _, c := range cases { + ts := &testSink{} + s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions) + + if err := s.Write(manifest); err != nil { + t.Fatalf("error writing event: %v", err) + } + + ts.mu.Lock() + if !reflect.DeepEqual(ts.event, c.expected) { + t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected) } ts.mu.Unlock() } } -func TestRetryingSink(t *testing.T) { - - // Make a sync that fails most of the time, ensuring that all the events - // make it through. - var ts testSink - flaky := &flakySink{ - rate: 1.0, // start out always failing. - Sink: &ts, - } - s := newRetryingSink(flaky, 3, 10*time.Millisecond) - - var wg sync.WaitGroup - var block []Event - for i := 1; i <= 100; i++ { - block = append(block, createTestEvent("push", "library/test", "blob")) - - // Above 50, set the failure rate lower - if i > 50 { - s.mu.Lock() - flaky.rate = 0.90 - s.mu.Unlock() - } - - if i%10 == 0 && i > 0 { - wg.Add(1) - go func(block ...Event) { - defer wg.Done() - if err := s.Write(block...); err != nil { - t.Errorf("error writing event block: %v", err) - } - }(block...) - - block = nil - } - } - - wg.Wait() - if t.Failed() { - t.FailNow() - } - checkClose(t, s) - - ts.mu.Lock() - defer ts.mu.Unlock() - - if len(ts.events) != 100 { - t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) - } -} - type testSink struct { - events []Event + event events.Event + count int mu sync.Mutex closed bool } -func (ts *testSink) Write(events ...Event) error { +func (ts *testSink) Write(event events.Event) error { ts.mu.Lock() defer ts.mu.Unlock() - ts.events = append(ts.events, events...) + ts.event = event + ts.count++ return nil } @@ -228,29 +147,16 @@ func (ts *testSink) Close() error { } type delayedSink struct { - Sink + events.Sink delay time.Duration } -func (ds *delayedSink) Write(events ...Event) error { +func (ds *delayedSink) Write(event events.Event) error { time.Sleep(ds.delay) - return ds.Sink.Write(events...) + return ds.Sink.Write(event) } -type flakySink struct { - Sink - rate float64 -} - -func (fs *flakySink) Write(events ...Event) error { - if rand.Float64() < fs.rate { - return fmt.Errorf("error writing %d events", len(events)) - } - - return fs.Sink.Write(events...) -} - -func checkClose(t *testing.T, sink Sink) { +func checkClose(t *testing.T, sink events.Sink) { if err := sink.Close(); err != nil { t.Fatalf("unexpected error closing: %v", err) } @@ -261,7 +167,7 @@ func checkClose(t *testing.T, sink Sink) { } // Write after closed should be an error - if err := sink.Write([]Event{}...); err == nil { + if err := sink.Write(Event{}); err == nil { t.Fatalf("write after closed did not have an error") } else if err != ErrSinkClosed { t.Fatalf("error should be ErrSinkClosed") diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 3ab507d2..74682c34 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -36,6 +36,7 @@ import ( "github.com/docker/distribution/registry/storage/driver/factory" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" "github.com/docker/distribution/version" + events "github.com/docker/go-events" "github.com/docker/go-metrics" "github.com/docker/libtrust" "github.com/garyburd/redigo/redis" @@ -70,7 +71,7 @@ type App struct { // events contains notification related configuration. events struct { - sink notifications.Sink + sink events.Sink source notifications.SourceRecord } @@ -446,7 +447,7 @@ func (app *App) register(routeName string, dispatch dispatchFunc) { // configureEvents prepares the event sink for action. func (app *App) configureEvents(configuration *configuration.Configuration) { // Configure all of the endpoint sinks. - var sinks []notifications.Sink + var sinks []events.Sink for _, endpoint := range configuration.Notifications.Endpoints { if endpoint.Disabled { dcontext.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name) @@ -470,7 +471,7 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { // replacing broadcaster with a rabbitmq implementation. It's recommended // that the registry instances also act as the workers to keep deployment // simple. - app.events.sink = notifications.NewBroadcaster(sinks...) + app.events.sink = events.NewBroadcaster(sinks...) // Populate registry event source hostname, err := os.Hostname() diff --git a/vendor/github.com/docker/go-events/.gitignore b/vendor/github.com/docker/go-events/.gitignore new file mode 100644 index 00000000..daf913b1 --- /dev/null +++ b/vendor/github.com/docker/go-events/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/docker/go-events/CONTRIBUTING.md b/vendor/github.com/docker/go-events/CONTRIBUTING.md new file mode 100644 index 00000000..d813af77 --- /dev/null +++ b/vendor/github.com/docker/go-events/CONTRIBUTING.md @@ -0,0 +1,70 @@ +# Contributing to Docker open source projects + +Want to hack on go-events? Awesome! Here are instructions to get you started. + +go-events is part of the [Docker](https://www.docker.com) project, and +follows the same rules and principles. If you're already familiar with the way +Docker does things, you'll feel right at home. + +Otherwise, go read Docker's +[contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), +[issue triaging](https://github.com/docker/docker/blob/master/project/ISSUE-TRIAGE.md), +[review process](https://github.com/docker/docker/blob/master/project/REVIEWING.md) and +[branches and tags](https://github.com/docker/docker/blob/master/project/BRANCHES-AND-TAGS.md). + +For an in-depth description of our contribution process, visit the +contributors guide: [Understand how to contribute](https://docs.docker.com/opensource/workflow/make-a-contribution/) + +### Sign your work + +The sign-off is a simple line at the end of the explanation for the patch. Your +signature certifies that you wrote the patch or otherwise have the right to pass +it on as an open-source patch. The rules are pretty simple: if you can certify +the below (from [developercertificate.org](http://developercertificate.org/)): + +``` +Developer Certificate of Origin +Version 1.1 + +Copyright (C) 2004, 2006 The Linux Foundation and its contributors. +660 York Street, Suite 102, +San Francisco, CA 94110 USA + +Everyone is permitted to copy and distribute verbatim copies of this +license document, but changing it is not allowed. + +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or in part by me and I + have the right to submit it under the open source license + indicated in the file; or + +(b) The contribution is based upon previous work that, to the best + of my knowledge, is covered under an appropriate open source + license and I have the right under that license to submit that + work with modifications, whether created in whole or in part + by me, under the same open source license (unless I am + permitted to submit under a different license), as indicated + in the file; or + +(c) The contribution was provided directly to me by some other + person who certified (a), (b) or (c) and I have not modified + it. + +(d) I understand and agree that this project and the contribution + are public and that a record of the contribution (including all + personal information I submit with it, including my sign-off) is + maintained indefinitely and may be redistributed consistent with + this project or the open source license(s) involved. +``` + +Then you just add a line to every git commit message: + + Signed-off-by: Joe Smith + +Use your real name (sorry, no pseudonyms or anonymous contributions.) + +If you set your `user.name` and `user.email` git configs, you can sign your +commit automatically with `git commit -s`. diff --git a/vendor/github.com/docker/go-events/LICENSE b/vendor/github.com/docker/go-events/LICENSE new file mode 100644 index 00000000..6d630cf5 --- /dev/null +++ b/vendor/github.com/docker/go-events/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2016 Docker, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/docker/go-events/MAINTAINERS b/vendor/github.com/docker/go-events/MAINTAINERS new file mode 100644 index 00000000..e414d82e --- /dev/null +++ b/vendor/github.com/docker/go-events/MAINTAINERS @@ -0,0 +1,46 @@ +# go-events maintainers file +# +# This file describes who runs the docker/go-events project and how. +# This is a living document - if you see something out of date or missing, speak up! +# +# It is structured to be consumable by both humans and programs. +# To extract its contents programmatically, use any TOML-compliant parser. +# +# This file is compiled into the MAINTAINERS file in docker/opensource. +# +[Org] + [Org."Core maintainers"] + people = [ + "aaronlehmann", + "aluzzardi", + "lk4d4", + "stevvooe", + ] + +[people] + +# A reference list of all people associated with the project. +# All other sections should refer to people by their canonical key +# in the people section. + + # ADD YOURSELF HERE IN ALPHABETICAL ORDER + + [people.aaronlehmann] + Name = "Aaron Lehmann" + Email = "aaron.lehmann@docker.com" + GitHub = "aaronlehmann" + + [people.aluzzardi] + Name = "Andrea Luzzardi" + Email = "al@docker.com" + GitHub = "aluzzardi" + + [people.lk4d4] + Name = "Alexander Morozov" + Email = "lk4d4@docker.com" + GitHub = "lk4d4" + + [people.stevvooe] + Name = "Stephen Day" + Email = "stephen.day@docker.com" + GitHub = "stevvooe" diff --git a/vendor/github.com/docker/go-events/README.md b/vendor/github.com/docker/go-events/README.md new file mode 100644 index 00000000..0acafc27 --- /dev/null +++ b/vendor/github.com/docker/go-events/README.md @@ -0,0 +1,117 @@ +# Docker Events Package + +[![GoDoc](https://godoc.org/github.com/docker/go-events?status.svg)](https://godoc.org/github.com/docker/go-events) +[![Circle CI](https://circleci.com/gh/docker/go-events.svg?style=shield)](https://circleci.com/gh/docker/go-events) + +The Docker `events` package implements a composable event distribution package +for Go. + +Originally created to implement the [notifications in Docker Registry +2](https://github.com/docker/distribution/blob/master/docs/notifications.md), +we've found the pattern to be useful in other applications. This package is +most of the same code with slightly updated interfaces. Much of the internals +have been made available. + +## Usage + +The `events` package centers around a `Sink` type. Events are written with +calls to `Sink.Write(event Event)`. Sinks can be wired up in various +configurations to achieve interesting behavior. + +The canonical example is that employed by the +[docker/distribution/notifications](https://godoc.org/github.com/docker/distribution/notifications) +package. Let's say we have a type `httpSink` where we'd like to queue +notifications. As a rule, it should send a single http request and return an +error if it fails: + +```go +func (h *httpSink) Write(event Event) error { + p, err := json.Marshal(event) + if err != nil { + return err + } + body := bytes.NewReader(p) + resp, err := h.client.Post(h.url, "application/json", body) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.Status != 200 { + return errors.New("unexpected status") + } + + return nil +} + +// implement (*httpSink).Close() +``` + +With just that, we can start using components from this package. One can call +`(*httpSink).Write` to send events as the body of a post request to a +configured URL. + +### Retries + +HTTP can be unreliable. The first feature we'd like is to have some retry: + +```go +hs := newHTTPSink(/*...*/) +retry := NewRetryingSink(hs, NewBreaker(5, time.Second)) +``` + +We now have a sink that will retry events against the `httpSink` until they +succeed. The retry will backoff for one second after 5 consecutive failures +using the breaker strategy. + +### Queues + +This isn't quite enough. We we want a sink that doesn't block while we are +waiting for events to be sent. Let's add a `Queue`: + +```go +queue := NewQueue(retry) +``` + +Now, we have an unbounded queue that will work through all events sent with +`(*Queue).Write`. Events can be added asynchronously to the queue without +blocking the current execution path. This is ideal for use in an http request. + +### Broadcast + +It usually turns out that you want to send to more than one listener. We can +use `Broadcaster` to support this: + +```go +var broadcast = NewBroadcaster() // make it available somewhere in your application. +broadcast.Add(queue) // add your queue! +broadcast.Add(queue2) // and another! +``` + +With the above, we can now call `broadcast.Write` in our http handlers and have +all the events distributed to each queue. Because the events are queued, not +listener blocks another. + +### Extending + +For the most part, the above is sufficient for a lot of applications. However, +extending the above functionality can be done implementing your own `Sink`. The +behavior and semantics of the sink can be completely dependent on the +application requirements. The interface is provided below for reference: + +```go +type Sink { + Write(Event) error + Close() error +} +``` + +Application behavior can be controlled by how `Write` behaves. The examples +above are designed to queue the message and return as quickly as possible. +Other implementations may block until the event is committed to durable +storage. + +## Copyright and license + +Copyright © 2016 Docker, Inc. go-events is licensed under the Apache License, +Version 2.0. See [LICENSE](LICENSE) for the full license text. diff --git a/vendor/github.com/docker/go-events/broadcast.go b/vendor/github.com/docker/go-events/broadcast.go new file mode 100644 index 00000000..5120078d --- /dev/null +++ b/vendor/github.com/docker/go-events/broadcast.go @@ -0,0 +1,178 @@ +package events + +import ( + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan Event + adds chan configureRequest + removes chan configureRequest + + shutdown chan struct{} + closed chan struct{} + once sync.Once +} + +// NewBroadcaster appends one or more sinks to the list of sinks. The +// broadcaster behavior will be affected by the properties of the sink. +// Generally, the sink should accept all messages and deal with reliability on +// its own. Use of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan Event), + adds: make(chan configureRequest), + removes: make(chan configureRequest), + shutdown: make(chan struct{}), + closed: make(chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts an event to be dispatched to all sinks. This method will never +// fail and should never block (hopefully!). The caller cedes the memory to the +// broadcaster and should not modify it after calling write. +func (b *Broadcaster) Write(event Event) error { + select { + case b.events <- event: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Add the sink to the broadcaster. +// +// The provided sink must be comparable with equality. Typically, this just +// works with a regular pointer type. +func (b *Broadcaster) Add(sink Sink) error { + return b.configure(b.adds, sink) +} + +// Remove the provided sink. +func (b *Broadcaster) Remove(sink Sink) error { + return b.configure(b.removes, sink) +} + +type configureRequest struct { + sink Sink + response chan error +} + +func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error { + response := make(chan error, 1) + + for { + select { + case ch <- configureRequest{ + sink: sink, + response: response}: + ch = nil + case err := <-response: + return err + case <-b.closed: + return ErrSinkClosed + } + } +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + b.once.Do(func() { + close(b.shutdown) + }) + + <-b.closed + return nil +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + defer close(b.closed) + remove := func(target Sink) { + for i, sink := range b.sinks { + if sink == target { + b.sinks = append(b.sinks[:i], b.sinks[i+1:]...) + break + } + } + } + + for { + select { + case event := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(event); err != nil { + if err == ErrSinkClosed { + // remove closed sinks + remove(sink) + continue + } + logrus.WithField("event", event).WithField("events.sink", sink).WithError(err). + Errorf("broadcaster: dropping event") + } + } + case request := <-b.adds: + // while we have to iterate for add/remove, common iteration for + // send is faster against slice. + + var found bool + for _, sink := range b.sinks { + if request.sink == sink { + found = true + break + } + } + + if !found { + b.sinks = append(b.sinks, request.sink) + } + // b.sinks[request.sink] = struct{}{} + request.response <- nil + case request := <-b.removes: + remove(request.sink) + request.response <- nil + case <-b.shutdown: + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil && err != ErrSinkClosed { + logrus.WithField("events.sink", sink).WithError(err). + Errorf("broadcaster: closing sink failed") + } + } + return + } + } +} + +func (b *Broadcaster) String() string { + // Serialize copy of this broadcaster without the sync.Once, to avoid + // a data race. + + b2 := map[string]interface{}{ + "sinks": b.sinks, + "events": b.events, + "adds": b.adds, + "removes": b.removes, + + "shutdown": b.shutdown, + "closed": b.closed, + } + + return fmt.Sprint(b2) +} diff --git a/vendor/github.com/docker/go-events/channel.go b/vendor/github.com/docker/go-events/channel.go new file mode 100644 index 00000000..802cf51f --- /dev/null +++ b/vendor/github.com/docker/go-events/channel.go @@ -0,0 +1,61 @@ +package events + +import ( + "fmt" + "sync" +) + +// Channel provides a sink that can be listened on. The writer and channel +// listener must operate in separate goroutines. +// +// Consumers should listen on Channel.C until Closed is closed. +type Channel struct { + C chan Event + + closed chan struct{} + once sync.Once +} + +// NewChannel returns a channel. If buffer is zero, the channel is +// unbuffered. +func NewChannel(buffer int) *Channel { + return &Channel{ + C: make(chan Event, buffer), + closed: make(chan struct{}), + } +} + +// Done returns a channel that will always proceed once the sink is closed. +func (ch *Channel) Done() chan struct{} { + return ch.closed +} + +// Write the event to the channel. Must be called in a separate goroutine from +// the listener. +func (ch *Channel) Write(event Event) error { + select { + case ch.C <- event: + return nil + case <-ch.closed: + return ErrSinkClosed + } +} + +// Close the channel sink. +func (ch *Channel) Close() error { + ch.once.Do(func() { + close(ch.closed) + }) + + return nil +} + +func (ch *Channel) String() string { + // Serialize a copy of the Channel that doesn't contain the sync.Once, + // to avoid a data race. + ch2 := map[string]interface{}{ + "C": ch.C, + "closed": ch.closed, + } + return fmt.Sprint(ch2) +} diff --git a/vendor/github.com/docker/go-events/errors.go b/vendor/github.com/docker/go-events/errors.go new file mode 100644 index 00000000..56db7c25 --- /dev/null +++ b/vendor/github.com/docker/go-events/errors.go @@ -0,0 +1,10 @@ +package events + +import "fmt" + +var ( + // ErrSinkClosed is returned if a write is issued to a sink that has been + // closed. If encountered, the error should be considered terminal and + // retries will not be successful. + ErrSinkClosed = fmt.Errorf("events: sink closed") +) diff --git a/vendor/github.com/docker/go-events/event.go b/vendor/github.com/docker/go-events/event.go new file mode 100644 index 00000000..f0f1d9ea --- /dev/null +++ b/vendor/github.com/docker/go-events/event.go @@ -0,0 +1,15 @@ +package events + +// Event marks items that can be sent as events. +type Event interface{} + +// Sink accepts and sends events. +type Sink interface { + // Write an event to the Sink. If no error is returned, the caller will + // assume that all events have been committed to the sink. If an error is + // received, the caller may retry sending the event. + Write(event Event) error + + // Close the sink, possibly waiting for pending events to flush. + Close() error +} diff --git a/vendor/github.com/docker/go-events/filter.go b/vendor/github.com/docker/go-events/filter.go new file mode 100644 index 00000000..e6c0eb69 --- /dev/null +++ b/vendor/github.com/docker/go-events/filter.go @@ -0,0 +1,52 @@ +package events + +// Matcher matches events. +type Matcher interface { + Match(event Event) bool +} + +// MatcherFunc implements matcher with just a function. +type MatcherFunc func(event Event) bool + +// Match calls the wrapped function. +func (fn MatcherFunc) Match(event Event) bool { + return fn(event) +} + +// Filter provides an event sink that sends only events that are accepted by a +// Matcher. No methods on filter are goroutine safe. +type Filter struct { + dst Sink + matcher Matcher + closed bool +} + +// NewFilter returns a new filter that will send to events to dst that return +// true for Matcher. +func NewFilter(dst Sink, matcher Matcher) Sink { + return &Filter{dst: dst, matcher: matcher} +} + +// Write an event to the filter. +func (f *Filter) Write(event Event) error { + if f.closed { + return ErrSinkClosed + } + + if f.matcher.Match(event) { + return f.dst.Write(event) + } + + return nil +} + +// Close the filter and allow no more events to pass through. +func (f *Filter) Close() error { + // TODO(stevvooe): Not all sinks should have Close. + if f.closed { + return nil + } + + f.closed = true + return f.dst.Close() +} diff --git a/vendor/github.com/docker/go-events/queue.go b/vendor/github.com/docker/go-events/queue.go new file mode 100644 index 00000000..4bb770af --- /dev/null +++ b/vendor/github.com/docker/go-events/queue.go @@ -0,0 +1,111 @@ +package events + +import ( + "container/list" + "sync" + + "github.com/sirupsen/logrus" +) + +// Queue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type Queue struct { + dst Sink + events *list.List + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// NewQueue returns a queue to the provided Sink dst. +func NewQueue(dst Sink) *Queue { + eq := Queue{ + dst: dst, + events: list.New(), + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// been closed. +func (eq *Queue) Write(event Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + eq.events.PushBack(event) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *Queue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return nil + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + return eq.dst.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *Queue) run() { + for { + event := eq.next() + + if event == nil { + return // nil block means event queue is closed. + } + + if err := eq.dst.Write(event); err != nil { + // TODO(aaronl): Dropping events could be bad depending + // on the application. We should have a way of + // communicating this condition. However, logging + // at a log level above debug may not be appropriate. + // Eventually, go-events should not use logrus at all, + // and should bubble up conditions like this through + // error values. + logrus.WithFields(logrus.Fields{ + "event": event, + "sink": eq.dst, + }).WithError(err).Debug("eventqueue: dropped event") + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *Queue) next() Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.(Event) + eq.events.Remove(front) + + return block +} diff --git a/vendor/github.com/docker/go-events/retry.go b/vendor/github.com/docker/go-events/retry.go new file mode 100644 index 00000000..b7f0a542 --- /dev/null +++ b/vendor/github.com/docker/go-events/retry.go @@ -0,0 +1,260 @@ +package events + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" +) + +// RetryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Retry is configured with a RetryStrategy. Concurrent calls to a +// retrying sink are serialized through the sink, meaning that if one is +// in-flight, another will not proceed. +type RetryingSink struct { + sink Sink + strategy RetryStrategy + closed chan struct{} + once sync.Once +} + +// NewRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink { + rs := &RetryingSink{ + sink: sink, + strategy: strategy, + closed: make(chan struct{}), + } + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *RetryingSink) Write(event Event) error { + logger := logrus.WithField("event", event) + +retry: + select { + case <-rs.closed: + return ErrSinkClosed + default: + } + + if backoff := rs.strategy.Proceed(event); backoff > 0 { + select { + case <-time.After(backoff): + // TODO(stevvooe): This branch holds up the next try. Before, we + // would simply break to the "retry" label and then possibly wait + // again. However, this requires all retry strategies to have a + // large probability of probing the sync for success, rather than + // just backing off and sending the request. + case <-rs.closed: + return ErrSinkClosed + } + } + + if err := rs.sink.Write(event); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logger := logger.WithError(err) // shadow!! + + if rs.strategy.Failure(event, err) { + logger.Errorf("retryingsink: dropped event") + return nil + } + + logger.Errorf("retryingsink: error writing event, retrying") + goto retry + } + + rs.strategy.Success(event) + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *RetryingSink) Close() error { + rs.once.Do(func() { + close(rs.closed) + }) + + return nil +} + +func (rs *RetryingSink) String() string { + // Serialize a copy of the RetryingSink without the sync.Once, to avoid + // a data race. + rs2 := map[string]interface{}{ + "sink": rs.sink, + "strategy": rs.strategy, + "closed": rs.closed, + } + return fmt.Sprint(rs2) +} + +// RetryStrategy defines a strategy for retrying event sink writes. +// +// All methods should be goroutine safe. +type RetryStrategy interface { + // Proceed is called before every event send. If proceed returns a + // positive, non-zero integer, the retryer will back off by the provided + // duration. + // + // An event is provided, by may be ignored. + Proceed(event Event) time.Duration + + // Failure reports a failure to the strategy. If this method returns true, + // the event should be dropped. + Failure(event Event, err error) bool + + // Success should be called when an event is sent successfully. + Success(event Event) +} + +// Breaker implements a circuit breaker retry strategy. +// +// The current implementation never drops events. +type Breaker struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + mu sync.Mutex +} + +var _ RetryStrategy = &Breaker{} + +// NewBreaker returns a breaker that will backoff after the threshold has been +// tripped. A Breaker is thread safe and may be shared by many goroutines. +func NewBreaker(threshold int, backoff time.Duration) *Breaker { + return &Breaker{ + threshold: threshold, + backoff: backoff, + } +} + +// Proceed checks the failures against the threshold. +func (b *Breaker) Proceed(event Event) time.Duration { + b.mu.Lock() + defer b.mu.Unlock() + + if b.recent < b.threshold { + return 0 + } + + return b.last.Add(b.backoff).Sub(time.Now()) +} + +// Success resets the breaker. +func (b *Breaker) Success(event Event) { + b.mu.Lock() + defer b.mu.Unlock() + + b.recent = 0 + b.last = time.Time{} +} + +// Failure records the failure and latest failure time. +func (b *Breaker) Failure(event Event, err error) bool { + b.mu.Lock() + defer b.mu.Unlock() + + b.recent++ + b.last = time.Now().UTC() + return false // never drop events. +} + +var ( + // DefaultExponentialBackoffConfig provides a default configuration for + // exponential backoff. + DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ + Base: time.Second, + Factor: time.Second, + Max: 20 * time.Second, + } +) + +// ExponentialBackoffConfig configures backoff parameters. +// +// Note that these parameters operate on the upper bound for choosing a random +// value. For example, at Base=1s, a random value in [0,1s) will be chosen for +// the backoff value. +type ExponentialBackoffConfig struct { + // Base is the minimum bound for backing off after failure. + Base time.Duration + + // Factor sets the amount of time by which the backoff grows with each + // failure. + Factor time.Duration + + // Max is the absolute maxiumum bound for a single backoff. + Max time.Duration +} + +// ExponentialBackoff implements random backoff with exponentially increasing +// bounds as the number consecutive failures increase. +type ExponentialBackoff struct { + failures uint64 // consecutive failure counter (needs to be 64-bit aligned) + config ExponentialBackoffConfig +} + +// NewExponentialBackoff returns an exponential backoff strategy with the +// desired config. If config is nil, the default is returned. +func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff { + return &ExponentialBackoff{ + config: config, + } +} + +// Proceed returns the next randomly bound exponential backoff time. +func (b *ExponentialBackoff) Proceed(event Event) time.Duration { + return b.backoff(atomic.LoadUint64(&b.failures)) +} + +// Success resets the failures counter. +func (b *ExponentialBackoff) Success(event Event) { + atomic.StoreUint64(&b.failures, 0) +} + +// Failure increments the failure counter. +func (b *ExponentialBackoff) Failure(event Event, err error) bool { + atomic.AddUint64(&b.failures, 1) + return false +} + +// backoff calculates the amount of time to wait based on the number of +// consecutive failures. +func (b *ExponentialBackoff) backoff(failures uint64) time.Duration { + if failures <= 0 { + // proceed normally when there are no failures. + return 0 + } + + factor := b.config.Factor + if factor <= 0 { + factor = DefaultExponentialBackoffConfig.Factor + } + + backoff := b.config.Base + factor*time.Duration(1<<(failures-1)) + + max := b.config.Max + if max <= 0 { + max = DefaultExponentialBackoffConfig.Max + } + + if backoff > max || backoff < 0 { + backoff = max + } + + // Choose a uniformly distributed value from [0, backoff). + return time.Duration(rand.Int63n(int64(backoff))) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5c7bb28f..8dbb66ce 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -61,6 +61,8 @@ github.com/denverdino/aliyungo/oss github.com/denverdino/aliyungo/util # github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c github.com/dgrijalva/jwt-go +# github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c +github.com/docker/go-events # github.com/docker/go-metrics v0.0.1 github.com/docker/go-metrics # github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1