9f0c8d6616
This changeset implements webhook notification endpoints for dispatching registry events. Repository instances can be decorated by a listener that converts calls into context-aware events, using a bridge. Events generated in the bridge are written to a sink. Implementations of sink include a broadcast and endpoint sink which can be used to configure event dispatch. Endpoints represent a webhook notification target, with queueing and retries built in. They can be added to a Broadcaster, which is a simple sink that writes a block of events to several sinks, to provide a complete dispatch mechanism. The main caveat to the current approach is that all unsent notifications are inmemory. Best effort is made to ensure that notifications are not dropped, to the point where queues may back up on faulty endpoints. If the endpoint is fixed, the events will be retried and all messages will go through. Internally, this functionality is all made up of Sink objects. The queuing functionality is implemented with an eventQueue sink and retries are implemented with retryingSink. Replacing the inmemory queuing with something persistent should be as simple as replacing broadcaster with a remote queue and that sets up the sinks to be local workers listening to that remote queue. Metrics are kept for each endpoint and exported via expvar. This may not be a permanent appraoch but should provide enough information for troubleshooting notification problems. Signed-off-by: Stephen J Day <stephen.day@docker.com>
223 lines
4.4 KiB
Go
223 lines
4.4 KiB
Go
package notifications
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
|
|
"testing"
|
|
)
|
|
|
|
func TestBroadcaster(t *testing.T) {
|
|
const nEvents = 1000
|
|
var sinks []Sink
|
|
|
|
for i := 0; i < 10; i++ {
|
|
sinks = append(sinks, &testSink{})
|
|
}
|
|
|
|
b := NewBroadcaster(sinks...)
|
|
|
|
var block []Event
|
|
var wg sync.WaitGroup
|
|
for i := 1; i <= nEvents; i++ {
|
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
|
|
|
if i%10 == 0 && i > 0 {
|
|
wg.Add(1)
|
|
go func(block ...Event) {
|
|
if err := b.Write(block...); err != nil {
|
|
t.Fatalf("error writing block of length %d: %v", len(block), err)
|
|
}
|
|
wg.Done()
|
|
}(block...)
|
|
|
|
block = nil
|
|
}
|
|
}
|
|
|
|
wg.Wait() // Wait until writes complete
|
|
checkClose(t, b)
|
|
|
|
// Iterate through the sinks and check that they all have the expected length.
|
|
for _, sink := range sinks {
|
|
ts := sink.(*testSink)
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
|
|
if len(ts.events) != nEvents {
|
|
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
|
|
}
|
|
|
|
if !ts.closed {
|
|
t.Fatalf("sink should have been closed")
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func TestEventQueue(t *testing.T) {
|
|
const nevents = 1000
|
|
var ts testSink
|
|
metrics := newSafeMetrics()
|
|
eq := newEventQueue(
|
|
// delayed sync simulates destination slower than channel comms
|
|
&delayedSink{
|
|
Sink: &ts,
|
|
delay: time.Millisecond * 1,
|
|
}, metrics.eventQueueListener())
|
|
|
|
var wg sync.WaitGroup
|
|
var block []Event
|
|
for i := 1; i <= nevents; i++ {
|
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
|
if i%10 == 0 && i > 0 {
|
|
wg.Add(1)
|
|
go func(block ...Event) {
|
|
if err := eq.Write(block...); err != nil {
|
|
t.Fatalf("error writing event block: %v", err)
|
|
}
|
|
wg.Done()
|
|
}(block...)
|
|
|
|
block = nil
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
checkClose(t, eq)
|
|
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
metrics.Lock()
|
|
defer metrics.Unlock()
|
|
|
|
if len(ts.events) != nevents {
|
|
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
|
|
}
|
|
|
|
if !ts.closed {
|
|
t.Fatalf("sink should have been closed")
|
|
}
|
|
|
|
if metrics.Events != nevents {
|
|
t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
|
|
}
|
|
|
|
if metrics.Pending != 0 {
|
|
t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
|
|
}
|
|
}
|
|
|
|
func TestRetryingSink(t *testing.T) {
|
|
|
|
// Make a sync that fails most of the time, ensuring that all the events
|
|
// make it through.
|
|
var ts testSink
|
|
flaky := &flakySink{
|
|
rate: 1.0, // start out always failing.
|
|
Sink: &ts,
|
|
}
|
|
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
|
|
|
|
var wg sync.WaitGroup
|
|
var block []Event
|
|
for i := 1; i <= 100; i++ {
|
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
|
|
|
// Above 50, set the failure rate lower
|
|
if i > 50 {
|
|
s.mu.Lock()
|
|
flaky.rate = 0.90
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
if i%10 == 0 && i > 0 {
|
|
wg.Add(1)
|
|
go func(block ...Event) {
|
|
defer wg.Done()
|
|
if err := s.Write(block...); err != nil {
|
|
t.Fatalf("error writing event block: %v", err)
|
|
}
|
|
}(block...)
|
|
|
|
block = nil
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
checkClose(t, s)
|
|
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
|
|
if len(ts.events) != 100 {
|
|
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
|
|
}
|
|
}
|
|
|
|
type testSink struct {
|
|
events []Event
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
func (ts *testSink) Write(events ...Event) error {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.events = append(ts.events, events...)
|
|
return nil
|
|
}
|
|
|
|
func (ts *testSink) Close() error {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.closed = true
|
|
|
|
logrus.Infof("closing testSink")
|
|
return nil
|
|
}
|
|
|
|
type delayedSink struct {
|
|
Sink
|
|
delay time.Duration
|
|
}
|
|
|
|
func (ds *delayedSink) Write(events ...Event) error {
|
|
time.Sleep(ds.delay)
|
|
return ds.Sink.Write(events...)
|
|
}
|
|
|
|
type flakySink struct {
|
|
Sink
|
|
rate float64
|
|
}
|
|
|
|
func (fs *flakySink) Write(events ...Event) error {
|
|
if rand.Float64() < fs.rate {
|
|
return fmt.Errorf("error writing %d events", len(events))
|
|
}
|
|
|
|
return fs.Sink.Write(events...)
|
|
}
|
|
|
|
func checkClose(t *testing.T, sink Sink) {
|
|
if err := sink.Close(); err != nil {
|
|
t.Fatalf("unexpected error closing: %v", err)
|
|
}
|
|
|
|
// second close should not crash but should return an error.
|
|
if err := sink.Close(); err == nil {
|
|
t.Fatalf("no error on double close")
|
|
}
|
|
|
|
// Write after closed should be an error
|
|
if err := sink.Write([]Event{}...); err == nil {
|
|
t.Fatalf("write after closed did not have an error")
|
|
} else if err != ErrSinkClosed {
|
|
t.Fatalf("error should be ErrSinkClosed")
|
|
}
|
|
}
|