diff --git a/pubsub/publisher_test.go b/pubsub/publisher_test.go index c19059a..5e99e46 100644 --- a/pubsub/publisher_test.go +++ b/pubsub/publisher_test.go @@ -1,6 +1,7 @@ package pubsub import ( + "fmt" "testing" "time" ) @@ -61,3 +62,61 @@ func TestClosePublisher(t *testing.T) { } } } + +const sampleText = "test" + +type testSubscriber struct { + dataCh chan interface{} + ch chan error +} + +func (s *testSubscriber) Wait() error { + return <-s.ch +} + +func newTestSubscriber(p *Publisher) *testSubscriber { + ts := &testSubscriber{ + dataCh: p.Subscribe(), + ch: make(chan error), + } + go func() { + for data := range ts.dataCh { + s, ok := data.(string) + if !ok { + ts.ch <- fmt.Errorf("Unexpected type %T", data) + break + } + if s != sampleText { + ts.ch <- fmt.Errorf("Unexpected text %s", s) + break + } + } + close(ts.ch) + }() + return ts +} + +func BenchmarkPubSub(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + p := NewPublisher(0, 1024) + var subs [](*testSubscriber) + for j := 0; j < 50; j++ { + subs = append(subs, newTestSubscriber(p)) + } + b.StartTimer() + for j := 0; j < 1000; j++ { + p.Publish(sampleText) + } + time.AfterFunc(1*time.Second, func() { + for _, s := range subs { + p.Evict(s.dataCh) + } + }) + for _, s := range subs { + if err := s.Wait(); err != nil { + b.Fatal(err) + } + } + } +}