Use go-events package

TBD: Queue not converted yet

Signed-off-by: Elliot Pahl <elliot.pahl@gmail.com>
This commit is contained in:
Elliot Pahl 2018-03-14 11:08:11 +11:00 committed by Derek McGowan
parent 2d1126ecc1
commit 800cb95821
No known key found for this signature in database
GPG key ID: F58C5D0A4405ACDB
25 changed files with 1354 additions and 530 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/distribution/context"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/uuid"
events "github.com/docker/go-events"
"github.com/opencontainers/go-digest"
)
@ -17,7 +18,7 @@ type bridge struct {
actor ActorRecord
source SourceRecord
request RequestRecord
sink Sink
sink events.Sink
}
var _ Listener = &bridge{}
@ -32,7 +33,7 @@ type URLBuilder interface {
// using the actor and source. Any urls populated in the events created by
// this bridge will be created using the URLBuilder.
// TODO(stevvooe): Update this to simply take a context.Context object.
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink, includeReferences bool) Listener {
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink events.Sink, includeReferences bool) Listener {
return &bridge{
ub: ub,
includeReferences: includeReferences,

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/distribution/reference"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/uuid"
events "github.com/docker/go-events"
"github.com/docker/libtrust"
"github.com/opencontainers/go-digest"
)
@ -46,8 +47,8 @@ var (
)
func TestEventBridgeManifestPulled(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkCommonManifest(t, EventActionPull, event)
return nil
}))
@ -59,8 +60,8 @@ func TestEventBridgeManifestPulled(t *testing.T) {
}
func TestEventBridgeManifestPushed(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkCommonManifest(t, EventActionPush, event)
return nil
}))
@ -72,10 +73,10 @@ func TestEventBridgeManifestPushed(t *testing.T) {
}
func TestEventBridgeManifestPushedWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkCommonManifest(t, EventActionPush, event)
if event.(Event).Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target)
}
return nil
@ -88,10 +89,10 @@ func TestEventBridgeManifestPushedWithTag(t *testing.T) {
}
func TestEventBridgeManifestPulledWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkCommonManifest(t, EventActionPull, event)
if event.(Event).Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target)
}
return nil
@ -104,10 +105,10 @@ func TestEventBridgeManifestPulledWithTag(t *testing.T) {
}
func TestEventBridgeManifestDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkDeleted(t, EventActionDelete, events...)
if events[0].Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", events[0].Target.Digest, dgst)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkDeleted(t, EventActionDelete, event)
if event.(Event).Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst)
}
return nil
}))
@ -119,10 +120,10 @@ func TestEventBridgeManifestDeleted(t *testing.T) {
}
func TestEventBridgeTagDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkDeleted(t, EventActionDelete, events...)
if events[0].Target.Tag != m.Tag {
t.Fatalf("unexpected tag on event target: %q != %q", events[0].Target.Tag, m.Tag)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkDeleted(t, EventActionDelete, event)
if event.(Event).Target.Tag != m.Tag {
t.Fatalf("unexpected tag on event target: %q != %q", event.(Event).Target.Tag, m.Tag)
}
return nil
}))
@ -134,8 +135,8 @@ func TestEventBridgeTagDeleted(t *testing.T) {
}
func TestEventBridgeRepoDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkDeleted(t, EventActionDelete, events...)
l := createTestEnv(t, testSinkFn(func(event events.Event) error {
checkDeleted(t, EventActionDelete, event)
return nil
}))
@ -162,36 +163,29 @@ func createTestEnv(t *testing.T, fn testSinkFn) Listener {
return NewBridge(ub, source, actor, request, fn, true)
}
func checkDeleted(t *testing.T, action string, events ...Event) {
if len(events) != 1 {
t.Fatalf("unexpected number of events: %v != 1", len(events))
func checkDeleted(t *testing.T, action string, event events.Event) {
if event.(Event).Source != source {
t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source)
}
event := events[0]
if event.Source != source {
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
if event.(Event).Request != request {
t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request)
}
if event.Request != request {
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
if event.(Event).Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor)
}
if event.Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
}
if event.Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
if event.(Event).Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo)
}
}
func checkCommonManifest(t *testing.T, action string, events ...Event) {
checkCommon(t, events...)
func checkCommonManifest(t *testing.T, action string, event events.Event) {
checkCommon(t, event)
event := events[0]
if event.Action != action {
t.Fatalf("unexpected event action: %q != %q", event.Action, action)
if event.(Event).Action != action {
t.Fatalf("unexpected event action: %q != %q", event.(Event).Action, action)
}
repoRef, _ := reference.WithName(repo)
@ -201,57 +195,51 @@ func checkCommonManifest(t *testing.T, action string, events ...Event) {
t.Fatalf("error building expected url: %v", err)
}
if event.Target.URL != u {
t.Fatalf("incorrect url passed: \n%q != \n%q", event.Target.URL, u)
if event.(Event).Target.URL != u {
t.Fatalf("incorrect url passed: \n%q != \n%q", event.(Event).Target.URL, u)
}
if len(event.Target.References) != len(layers) {
t.Fatalf("unexpected number of references %v != %v", len(event.Target.References), len(layers))
if len(event.(Event).Target.References) != len(layers) {
t.Fatalf("unexpected number of references %v != %v", len(event.(Event).Target.References), len(layers))
}
for i, targetReference := range event.Target.References {
for i, targetReference := range event.(Event).Target.References {
if targetReference.Digest != layers[i].BlobSum {
t.Fatalf("unexpected reference: %q != %q", targetReference.Digest, layers[i].BlobSum)
}
}
}
func checkCommon(t *testing.T, events ...Event) {
if len(events) != 1 {
t.Fatalf("unexpected number of events: %v != 1", len(events))
func checkCommon(t *testing.T, event events.Event) {
if event.(Event).Source != source {
t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source)
}
event := events[0]
if event.Source != source {
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
if event.(Event).Request != request {
t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request)
}
if event.Request != request {
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
if event.(Event).Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor)
}
if event.Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
if event.(Event).Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst)
}
if event.Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
if event.(Event).Target.Length != int64(len(payload)) {
t.Fatalf("unexpected target length: %v != %v", event.(Event).Target.Length, len(payload))
}
if event.Target.Length != int64(len(payload)) {
t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload))
}
if event.Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
if event.(Event).Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo)
}
}
type testSinkFn func(events ...Event) error
type testSinkFn func(event events.Event) error
func (tsf testSinkFn) Write(events ...Event) error {
return tsf(events...)
func (tsf testSinkFn) Write(event events.Event) error {
return tsf(event)
}
func (tsf testSinkFn) Close() error { return nil }

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/docker/distribution/configuration"
events "github.com/docker/go-events"
)
// EndpointConfig covers the optional configuration parameters for an active
@ -42,7 +43,7 @@ func (ec *EndpointConfig) defaults() {
// services when events are written. Writes are non-blocking and always
// succeed for callers but events may be queued internally.
type Endpoint struct {
Sink
events.Sink
url string
name string
@ -64,7 +65,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
endpoint.Sink = newHTTPSink(
endpoint.url, endpoint.Timeout, endpoint.Headers,
endpoint.Transport, endpoint.metrics.httpStatusListener())
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff))
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...)
endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions)

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/docker/distribution"
events "github.com/docker/go-events"
)
// EventAction constants used in action field of Event.
@ -30,7 +31,7 @@ const (
type Envelope struct {
// Events make up the contents of the envelope. Events present in a single
// envelope are not necessarily related.
Events []Event `json:"events,omitempty"`
Events []events.Event `json:"events,omitempty"`
}
// TODO(stevvooe): The event type should be separate from the json format. It
@ -148,16 +149,3 @@ var (
// retries will not be successful.
ErrSinkClosed = fmt.Errorf("sink: closed")
)
// Sink accepts and sends events.
type Sink interface {
// Write writes one or more events to the sink. If no error is returned,
// the caller will assume that all events have been committed and will not
// try to send them again. If an error is received, the caller may retry
// sending the event. The caller should cede the slice of memory to the
// sink and not modify it after calling this method.
Write(events ...Event) error
// Close the sink, possibly waiting for pending events to flush.
Close() error
}

View file

@ -7,6 +7,8 @@ import (
"net/http"
"sync"
"time"
events "github.com/docker/go-events"
)
// httpSink implements a single-flight, http notification endpoint. This is
@ -45,15 +47,15 @@ func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport
// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
success(status int, events ...Event)
failure(status int, events ...Event)
err(err error, events ...Event)
success(status int, event events.Event)
failure(status int, events events.Event)
err(err error, events events.Event)
}
// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(events ...Event) error {
func (hs *httpSink) Write(event events.Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
@ -63,7 +65,7 @@ func (hs *httpSink) Write(events ...Event) error {
}
envelope := Envelope{
Events: events,
Events: []events.Event{event},
}
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
@ -73,7 +75,7 @@ func (hs *httpSink) Write(events ...Event) error {
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
listener.err(err, event)
}
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
}
@ -82,7 +84,7 @@ func (hs *httpSink) Write(events ...Event) error {
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
listener.err(err, event)
}
return fmt.Errorf("%v: error posting: %v", hs, err)
@ -94,7 +96,7 @@ func (hs *httpSink) Write(events ...Event) error {
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400:
for _, listener := range hs.listeners {
listener.success(resp.StatusCode, events...)
listener.success(resp.StatusCode, event)
}
// TODO(stevvooe): This is a little accepting: we may want to support
@ -104,7 +106,7 @@ func (hs *httpSink) Write(events ...Event) error {
return nil
default:
for _, listener := range hs.listeners {
listener.failure(resp.StatusCode, events...)
listener.failure(resp.StatusCode, event)
}
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
}

View file

@ -14,6 +14,7 @@ import (
"testing"
"github.com/docker/distribution/manifest/schema1"
events "github.com/docker/go-events"
)
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
@ -68,8 +69,8 @@ func TestHTTPSink(t *testing.T) {
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
// first make sure that the default transport gives x509 untrusted cert error
events := []Event{}
err := sink.Write(events...)
event := Event{}
err := sink.Write(event)
if !strings.Contains(err.Error(), "x509") && !strings.Contains(err.Error(), "unknown ca") {
t.Fatal("TLS server with default transport should give unknown CA error")
}
@ -83,12 +84,13 @@ func TestHTTPSink(t *testing.T) {
}
sink = newHTTPSink(server.URL, 0, nil, tr,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
err = sink.Write(events...)
err = sink.Write(event)
if err != nil {
t.Fatalf("unexpected error writing events: %v", err)
t.Fatalf("unexpected error writing event: %v", err)
}
// reset server to standard http server and sink to a basic sink
metrics = newSafeMetrics("")
server = httptest.NewServer(serverHandler)
sink = newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
@ -111,46 +113,52 @@ func TestHTTPSink(t *testing.T) {
}()
for _, tc := range []struct {
events []Event // events to send
event events.Event // events to send
url string
failure bool // true if there should be a failure.
isFailure bool // true if there should be a failure.
isError bool // true if the request returns an error
statusCode int // if not set, no status code should be incremented.
}{
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest)},
event: createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
},
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
createTestEvent("push", "library/test", layerMediaType),
createTestEvent("push", "library/test", layerMediaType),
},
event: createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
},
{
statusCode: http.StatusOK,
event: createTestEvent("push", "library/test", layerMediaType),
},
{
statusCode: http.StatusOK,
event: createTestEvent("push", "library/test", layerMediaType),
},
{
statusCode: http.StatusTemporaryRedirect,
},
{
statusCode: http.StatusBadRequest,
failure: true,
isFailure: true,
},
{
// Case where connection is immediately closed
url: closeL.Addr().String(),
failure: true,
url: "http://" + closeL.Addr().String(),
isError: true,
},
} {
if tc.failure {
expectedMetrics.Failures += len(tc.events)
if tc.isFailure {
expectedMetrics.Failures++
} else if tc.isError {
expectedMetrics.Errors++
} else {
expectedMetrics.Successes += len(tc.events)
expectedMetrics.Successes++
}
if tc.statusCode > 0 {
expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events)
expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))]++
}
url := tc.url
@ -161,11 +169,11 @@ func TestHTTPSink(t *testing.T) {
url += fmt.Sprintf("?status=%v", tc.statusCode)
sink.url = url
t.Logf("testcase: %v, fail=%v", url, tc.failure)
t.Logf("testcase: %v, fail=%v, error=%v", url, tc.isFailure, tc.isError)
// Try a simple event emission.
err := sink.Write(tc.events...)
err := sink.Write(tc.event)
if !tc.failure {
if !tc.isFailure && !tc.isError {
if err != nil {
t.Fatalf("unexpected error send event: %v", err)
}
@ -173,6 +181,7 @@ func TestHTTPSink(t *testing.T) {
if err == nil {
t.Fatalf("the endpoint should have rejected the request")
}
t.Logf("write error: %v", err)
}
if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {

View file

@ -7,6 +7,7 @@ import (
"sync"
prometheus "github.com/docker/distribution/metrics"
events "github.com/docker/go-events"
"github.com/docker/go-metrics"
)
@ -70,32 +71,32 @@ type endpointMetricsHTTPStatusListener struct {
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
func (emsl *endpointMetricsHTTPStatusListener) success(status int, event events.Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Successes += len(events)
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
emsl.Successes++
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(float64(len(events)))
eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
}
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, event events.Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Failures += len(events)
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
emsl.Failures++
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(float64(len(events)))
eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
}
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
func (emsl *endpointMetricsHTTPStatusListener) err(err error, event events.Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Errors += len(events)
emsl.Errors++
eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(float64(len(events)))
eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
}
// endpointMetricsEventQueueListener maintains the incoming events counter and
@ -104,20 +105,20 @@ type endpointMetricsEventQueueListener struct {
*safeMetrics
}
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
func (eqc *endpointMetricsEventQueueListener) ingress(event events.Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Events += len(events)
eqc.Pending += len(events)
eqc.Events++
eqc.Pending++
eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
pendingGauge.WithValues(eqc.EndpointName).Inc(float64(len(events)))
pendingGauge.WithValues(eqc.EndpointName).Inc(1)
}
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
func (eqc *endpointMetricsEventQueueListener) egress(event events.Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Pending -= len(events)
eqc.Pending--
pendingGauge.WithValues(eqc.EndpointName).Dec(1)
}

View file

@ -4,107 +4,16 @@ import (
"container/list"
"fmt"
"sync"
"time"
events "github.com/docker/go-events"
"github.com/sirupsen/logrus"
)
// NOTE(stevvooe): This file contains definitions for several utility sinks.
// Typically, the broadcaster is the only sink that should be required
// externally, but others are suitable for export if the need arises. Albeit,
// the tight integration with endpoint metrics should be removed.
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
sinks []Sink
events chan []Event
closed chan chan struct{}
}
// NewBroadcaster ...
// Add appends one or more sinks to the list of sinks. The broadcaster
// behavior will be affected by the properties of the sink. Generally, the
// sink should accept all messages and deal with reliability on its own. Use
// of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
b := Broadcaster{
sinks: sinks,
events: make(chan []Event),
closed: make(chan chan struct{}),
}
// Start the broadcaster
go b.run()
return &b
}
// Write accepts a block of events to be dispatched to all sinks. This method
// will never fail and should never block (hopefully!). The caller cedes the
// slice memory to the broadcaster and should not modify it after calling
// write.
func (b *Broadcaster) Write(events ...Event) error {
select {
case b.events <- events:
case <-b.closed:
return ErrSinkClosed
}
return nil
}
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
logrus.Infof("broadcaster: closing")
select {
case <-b.closed:
// already closed
return fmt.Errorf("broadcaster: already closed")
default:
// do a little chan handoff dance to synchronize closing
closed := make(chan struct{})
b.closed <- closed
close(b.closed)
<-closed
return nil
}
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
for {
select {
case block := <-b.events:
for _, sink := range b.sinks {
if err := sink.Write(block...); err != nil {
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
}
}
case closing := <-b.closed:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil {
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
}
}
closing <- struct{}{}
logrus.Debugf("broadcaster: closed")
return
}
}
}
// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
sink Sink
sink events.Sink
events *list.List
listeners []eventQueueListener
cond *sync.Cond
@ -114,13 +23,13 @@ type eventQueue struct {
// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
ingress(events ...Event)
egress(events ...Event)
ingress(event events.Event)
egress(event events.Event)
}
// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
func newEventQueue(sink events.Sink, listeners ...eventQueueListener) *eventQueue {
eq := eventQueue{
sink: sink,
events: list.New(),
@ -134,7 +43,7 @@ func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
// Write accepts the events into the queue, only failing if the queue has
// beend closed.
func (eq *eventQueue) Write(events ...Event) error {
func (eq *eventQueue) Write(event events.Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
@ -143,9 +52,9 @@ func (eq *eventQueue) Write(events ...Event) error {
}
for _, listener := range eq.listeners {
listener.ingress(events...)
listener.ingress(event)
}
eq.events.PushBack(events)
eq.events.PushBack(event)
eq.cond.Signal() // signal waiters
return nil
@ -171,18 +80,18 @@ func (eq *eventQueue) Close() error {
// run is the main goroutine to flush events to the target sink.
func (eq *eventQueue) run() {
for {
block := eq.next()
event := eq.next()
if block == nil {
if event == nil {
return // nil block means event queue is closed.
}
if err := eq.sink.Write(block...); err != nil {
if err := eq.sink.Write(event); err != nil {
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
}
for _, listener := range eq.listeners {
listener.egress(block...)
listener.egress(event)
}
}
}
@ -190,7 +99,7 @@ func (eq *eventQueue) run() {
// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *eventQueue) next() []Event {
func (eq *eventQueue) next() events.Event {
eq.mu.Lock()
defer eq.mu.Unlock()
@ -204,7 +113,7 @@ func (eq *eventQueue) next() []Event {
}
front := eq.events.Front()
block := front.Value.([]Event)
block := front.Value.(events.Event)
eq.events.Remove(front)
return block
@ -213,12 +122,12 @@ func (eq *eventQueue) next() []Event {
// ignoredSink discards events with ignored target media types and actions.
// passes the rest along.
type ignoredSink struct {
Sink
events.Sink
ignoreMediaTypes map[string]bool
ignoreActions map[string]bool
}
func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink {
func newIgnoredSink(sink events.Sink, ignored []string, ignoreActions []string) events.Sink {
if len(ignored) == 0 {
return sink
}
@ -242,146 +151,14 @@ func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink {
// Write discards events with ignored target media types and passes the rest
// along.
func (imts *ignoredSink) Write(events ...Event) error {
var kept []Event
for _, e := range events {
if !imts.ignoreMediaTypes[e.Target.MediaType] {
kept = append(kept, e)
}
}
if len(kept) == 0 {
func (imts *ignoredSink) Write(event events.Event) error {
if imts.ignoreMediaTypes[event.(Event).Target.MediaType] || imts.ignoreActions[event.(Event).Action] {
return nil
}
var results []Event
for _, e := range kept {
if !imts.ignoreActions[e.Action] {
results = append(results, e)
}
}
if len(results) == 0 {
return nil
}
return imts.Sink.Write(results...)
return imts.Sink.Write(event)
}
// retryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Internally, it is a circuit breaker retries to manage reset.
// Concurrent calls to a retrying sink are serialized through the sink,
// meaning that if one is in-flight, another will not proceed.
type retryingSink struct {
mu sync.Mutex
sink Sink
closed bool
// circuit breaker heuristics
failures struct {
threshold int
recent int
last time.Time
backoff time.Duration // time after which we retry after failure.
}
}
// TODO(stevvooe): We are using circuit break here, which actually doesn't
// make a whole lot of sense for this use case, since we always retry. Move
// this to use bounded exponential backoff.
// newRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
rs := &retryingSink{
sink: sink,
}
rs.failures.threshold = threshold
rs.failures.backoff = backoff
return rs
}
// Write attempts to flush the events to the downstream sink until it succeeds
// or the sink is closed.
func (rs *retryingSink) Write(events ...Event) error {
rs.mu.Lock()
defer rs.mu.Unlock()
retry:
if rs.closed {
return ErrSinkClosed
}
if !rs.proceed() {
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
rs.wait(rs.failures.backoff)
goto retry
}
if err := rs.write(events...); err != nil {
if err == ErrSinkClosed {
// terminal!
return err
}
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
goto retry
}
func (imts *ignoredSink) Close() error {
return nil
}
// Close closes the sink and the underlying sink.
func (rs *retryingSink) Close() error {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.closed {
return fmt.Errorf("retryingsink: already closed")
}
rs.closed = true
return rs.sink.Close()
}
// write provides a helper that dispatches failure and success properly. Used
// by write as the single-flight write call.
func (rs *retryingSink) write(events ...Event) error {
if err := rs.sink.Write(events...); err != nil {
rs.failure()
return err
}
rs.reset()
return nil
}
// wait backoff time against the sink, unlocking so others can proceed. Should
// only be called by methods that currently have the mutex.
func (rs *retryingSink) wait(backoff time.Duration) {
rs.mu.Unlock()
defer rs.mu.Lock()
// backoff here
time.Sleep(backoff)
}
// reset marks a successful call.
func (rs *retryingSink) reset() {
rs.failures.recent = 0
rs.failures.last = time.Time{}
}
// failure records a failure.
func (rs *retryingSink) failure() {
rs.failures.recent++
rs.failures.last = time.Now().UTC()
}
// proceed returns true if the call should proceed based on circuit breaker
// heuristics.
func (rs *retryingSink) proceed() bool {
return rs.failures.recent < rs.failures.threshold ||
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
}

View file

@ -1,68 +1,17 @@
package notifications
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
events "github.com/docker/go-events"
"github.com/sirupsen/logrus"
"testing"
)
func TestBroadcaster(t *testing.T) {
const nEvents = 1000
var sinks []Sink
for i := 0; i < 10; i++ {
sinks = append(sinks, &testSink{})
}
b := NewBroadcaster(sinks...)
var block []Event
var wg sync.WaitGroup
for i := 1; i <= nEvents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := b.Write(block...); err != nil {
t.Errorf("error writing block of length %d: %v", len(block), err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait() // Wait until writes complete
if t.Failed() {
t.FailNow()
}
checkClose(t, b)
// Iterate through the sinks and check that they all have the expected length.
for _, sink := range sinks {
ts := sink.(*testSink)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != nEvents {
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
}
}
func TestEventQueue(t *testing.T) {
const nevents = 1000
var ts testSink
@ -75,20 +24,16 @@ func TestEventQueue(t *testing.T) {
}, metrics.eventQueueListener())
var wg sync.WaitGroup
var block []Event
var event events.Event
for i := 1; i <= nevents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := eq.Write(block...); err != nil {
t.Errorf("error writing event block: %v", err)
}
wg.Done()
}(block...)
block = nil
}
event = createTestEvent("push", "library/test", "blob")
wg.Add(1)
go func(event events.Event) {
if err := eq.Write(event); err != nil {
t.Errorf("error writing event block: %v", err)
}
wg.Done()
}(event)
}
wg.Wait()
@ -102,8 +47,8 @@ func TestEventQueue(t *testing.T) {
metrics.Lock()
defer metrics.Unlock()
if len(ts.events) != nevents {
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
if ts.count != nevents {
t.Fatalf("events did not make it to the sink: %d != %d", ts.count, 1000)
}
if !ts.closed {
@ -126,16 +71,14 @@ func TestIgnoredSink(t *testing.T) {
type testcase struct {
ignoreMediaTypes []string
ignoreActions []string
expected []Event
expected events.Event
}
cases := []testcase{
{nil, nil, []Event{blob, manifest}},
{[]string{"other"}, []string{"other"}, []Event{blob, manifest}},
{[]string{"blob"}, []string{"other"}, []Event{manifest}},
{nil, nil, blob},
{[]string{"other"}, []string{"other"}, blob},
{[]string{"blob", "manifest"}, []string{"other"}, nil},
{[]string{"other"}, []string{"push"}, []Event{manifest}},
{[]string{"other"}, []string{"pull"}, []Event{blob}},
{[]string{"other"}, []string{"pull"}, blob},
{[]string{"other"}, []string{"pull", "push"}, nil},
}
@ -143,78 +86,54 @@ func TestIgnoredSink(t *testing.T) {
ts := &testSink{}
s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)
if err := s.Write(blob, manifest); err != nil {
if err := s.Write(blob); err != nil {
t.Fatalf("error writing event: %v", err)
}
ts.mu.Lock()
if !reflect.DeepEqual(ts.events, c.expected) {
t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
if !reflect.DeepEqual(ts.event, c.expected) {
t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected)
}
ts.mu.Unlock()
}
cases = []testcase{
{nil, nil, manifest},
{[]string{"other"}, []string{"other"}, manifest},
{[]string{"blob"}, []string{"other"}, manifest},
{[]string{"blob", "manifest"}, []string{"other"}, nil},
{[]string{"other"}, []string{"push"}, manifest},
{[]string{"other"}, []string{"pull", "push"}, nil},
}
for _, c := range cases {
ts := &testSink{}
s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)
if err := s.Write(manifest); err != nil {
t.Fatalf("error writing event: %v", err)
}
ts.mu.Lock()
if !reflect.DeepEqual(ts.event, c.expected) {
t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected)
}
ts.mu.Unlock()
}
}
func TestRetryingSink(t *testing.T) {
// Make a sync that fails most of the time, ensuring that all the events
// make it through.
var ts testSink
flaky := &flakySink{
rate: 1.0, // start out always failing.
Sink: &ts,
}
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
var wg sync.WaitGroup
var block []Event
for i := 1; i <= 100; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
// Above 50, set the failure rate lower
if i > 50 {
s.mu.Lock()
flaky.rate = 0.90
s.mu.Unlock()
}
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
defer wg.Done()
if err := s.Write(block...); err != nil {
t.Errorf("error writing event block: %v", err)
}
}(block...)
block = nil
}
}
wg.Wait()
if t.Failed() {
t.FailNow()
}
checkClose(t, s)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != 100 {
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
}
}
type testSink struct {
events []Event
event events.Event
count int
mu sync.Mutex
closed bool
}
func (ts *testSink) Write(events ...Event) error {
func (ts *testSink) Write(event events.Event) error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.events = append(ts.events, events...)
ts.event = event
ts.count++
return nil
}
@ -228,29 +147,16 @@ func (ts *testSink) Close() error {
}
type delayedSink struct {
Sink
events.Sink
delay time.Duration
}
func (ds *delayedSink) Write(events ...Event) error {
func (ds *delayedSink) Write(event events.Event) error {
time.Sleep(ds.delay)
return ds.Sink.Write(events...)
return ds.Sink.Write(event)
}
type flakySink struct {
Sink
rate float64
}
func (fs *flakySink) Write(events ...Event) error {
if rand.Float64() < fs.rate {
return fmt.Errorf("error writing %d events", len(events))
}
return fs.Sink.Write(events...)
}
func checkClose(t *testing.T, sink Sink) {
func checkClose(t *testing.T, sink events.Sink) {
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
@ -261,7 +167,7 @@ func checkClose(t *testing.T, sink Sink) {
}
// Write after closed should be an error
if err := sink.Write([]Event{}...); err == nil {
if err := sink.Write(Event{}); err == nil {
t.Fatalf("write after closed did not have an error")
} else if err != ErrSinkClosed {
t.Fatalf("error should be ErrSinkClosed")