From 2d6672b0e10edd19931dd45d63a5682834fd5b01 Mon Sep 17 00:00:00 2001 From: Tibor Vass Date: Tue, 6 Oct 2015 13:24:49 -0400 Subject: [PATCH] Move types from progressreader and broadcastwriter to broadcaster progressreader.Broadcaster becomes broadcaster.Buffered and broadcastwriter.Writer becomes broadcaster.Unbuffered. The package broadcastwriter is thus renamed to broadcaster. Signed-off-by: Tibor Vass --- .../broadcaster.go => broadcaster/buffered.go | 34 +++++++++---------- .../unbuffered.go | 19 ++++------- .../unbuffered_test.go | 30 ++++++++-------- 3 files changed, 39 insertions(+), 44 deletions(-) rename progressreader/broadcaster.go => broadcaster/buffered.go (80%) rename broadcastwriter/broadcastwriter.go => broadcaster/unbuffered.go (64%) rename broadcastwriter/broadcastwriter_test.go => broadcaster/unbuffered_test.go (87%) diff --git a/progressreader/broadcaster.go b/broadcaster/buffered.go similarity index 80% rename from progressreader/broadcaster.go rename to broadcaster/buffered.go index a48ff22..57f5f97 100644 --- a/progressreader/broadcaster.go +++ b/broadcaster/buffered.go @@ -1,4 +1,4 @@ -package progressreader +package broadcaster import ( "errors" @@ -6,10 +6,10 @@ import ( "sync" ) -// Broadcaster keeps track of one or more observers watching the progress +// Buffered keeps track of one or more observers watching the progress // of an operation. For example, if multiple clients are trying to pull an -// image, they share a Broadcaster for the download operation. -type Broadcaster struct { +// image, they share a Buffered struct for the download operation. +type Buffered struct { sync.Mutex // c is a channel that observers block on, waiting for the operation // to finish. @@ -29,9 +29,9 @@ type Broadcaster struct { result error } -// NewBroadcaster returns a Broadcaster structure -func NewBroadcaster() *Broadcaster { - b := &Broadcaster{ +// NewBuffered returns an initialized Buffered structure. +func NewBuffered() *Buffered { + b := &Buffered{ c: make(chan struct{}), } b.cond = sync.NewCond(b) @@ -39,7 +39,7 @@ func NewBroadcaster() *Broadcaster { } // closed returns true if and only if the broadcaster has been closed -func (broadcaster *Broadcaster) closed() bool { +func (broadcaster *Buffered) closed() bool { select { case <-broadcaster.c: return true @@ -51,7 +51,7 @@ func (broadcaster *Broadcaster) closed() bool { // receiveWrites runs as a goroutine so that writes don't block the Write // function. It writes the new data in broadcaster.history each time there's // activity on the broadcaster.cond condition variable. -func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { +func (broadcaster *Buffered) receiveWrites(observer io.Writer) { n := 0 broadcaster.Lock() @@ -98,13 +98,13 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { // Write adds data to the history buffer, and also writes it to all current // observers. -func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { +func (broadcaster *Buffered) Write(p []byte) (n int, err error) { broadcaster.Lock() defer broadcaster.Unlock() // Is the broadcaster closed? If so, the write should fail. if broadcaster.closed() { - return 0, errors.New("attempted write to closed progressreader Broadcaster") + return 0, errors.New("attempted write to a closed broadcaster.Buffered") } // Add message in p to the history slice @@ -117,15 +117,15 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { return len(p), nil } -// Add adds an observer to the Broadcaster. The new observer receives the +// Add adds an observer to the broadcaster. The new observer receives the // data from the history buffer, and also all subsequent data. -func (broadcaster *Broadcaster) Add(w io.Writer) error { +func (broadcaster *Buffered) Add(w io.Writer) error { // The lock is acquired here so that Add can't race with Close broadcaster.Lock() defer broadcaster.Unlock() if broadcaster.closed() { - return errors.New("attempted to add observer to closed progressreader Broadcaster") + return errors.New("attempted to add observer to a closed broadcaster.Buffered") } broadcaster.wg.Add(1) @@ -136,7 +136,7 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error { // CloseWithError signals to all observers that the operation has finished. Its // argument is a result that should be returned to waiters blocking on Wait. -func (broadcaster *Broadcaster) CloseWithError(result error) { +func (broadcaster *Buffered) CloseWithError(result error) { broadcaster.Lock() if broadcaster.closed() { broadcaster.Unlock() @@ -153,14 +153,14 @@ func (broadcaster *Broadcaster) CloseWithError(result error) { // Close signals to all observers that the operation has finished. It causes // all calls to Wait to return nil. -func (broadcaster *Broadcaster) Close() { +func (broadcaster *Buffered) Close() { broadcaster.CloseWithError(nil) } // Wait blocks until the operation is marked as completed by the Close method, // and all writer goroutines have completed. It returns the argument that was // passed to Close. -func (broadcaster *Broadcaster) Wait() error { +func (broadcaster *Buffered) Wait() error { <-broadcaster.c broadcaster.wg.Wait() return broadcaster.result diff --git a/broadcastwriter/broadcastwriter.go b/broadcaster/unbuffered.go similarity index 64% rename from broadcastwriter/broadcastwriter.go rename to broadcaster/unbuffered.go index e49810c..784d65d 100644 --- a/broadcastwriter/broadcastwriter.go +++ b/broadcaster/unbuffered.go @@ -1,18 +1,18 @@ -package broadcastwriter +package broadcaster import ( "io" "sync" ) -// BroadcastWriter accumulate multiple io.WriteCloser by stream. -type BroadcastWriter struct { +// Unbuffered accumulates multiple io.WriteCloser by stream. +type Unbuffered struct { mu sync.Mutex writers []io.WriteCloser } -// AddWriter adds new io.WriteCloser. -func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { +// Add adds new io.WriteCloser. +func (w *Unbuffered) Add(writer io.WriteCloser) { w.mu.Lock() w.writers = append(w.writers, writer) w.mu.Unlock() @@ -20,7 +20,7 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { // Write writes bytes to all writers. Failed writers will be evicted during // this call. -func (w *BroadcastWriter) Write(p []byte) (n int, err error) { +func (w *Unbuffered) Write(p []byte) (n int, err error) { w.mu.Lock() var evict []int for i, sw := range w.writers { @@ -38,7 +38,7 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { // Clean closes and removes all writers. Last non-eol-terminated part of data // will be saved. -func (w *BroadcastWriter) Clean() error { +func (w *Unbuffered) Clean() error { w.mu.Lock() for _, sw := range w.writers { sw.Close() @@ -47,8 +47,3 @@ func (w *BroadcastWriter) Clean() error { w.mu.Unlock() return nil } - -// New creates a new BroadcastWriter. -func New() *BroadcastWriter { - return &BroadcastWriter{} -} diff --git a/broadcastwriter/broadcastwriter_test.go b/broadcaster/unbuffered_test.go similarity index 87% rename from broadcastwriter/broadcastwriter_test.go rename to broadcaster/unbuffered_test.go index 1ff4cae..9f8e72b 100644 --- a/broadcastwriter/broadcastwriter_test.go +++ b/broadcaster/unbuffered_test.go @@ -1,4 +1,4 @@ -package broadcastwriter +package broadcaster import ( "bytes" @@ -28,14 +28,14 @@ func (dw *dummyWriter) Close() error { return nil } -func TestBroadcastWriter(t *testing.T) { - writer := New() +func TestUnbuffered(t *testing.T) { + writer := new(Unbuffered) // Test 1: Both bufferA and bufferB should contain "foo" bufferA := &dummyWriter{} - writer.AddWriter(bufferA) + writer.Add(bufferA) bufferB := &dummyWriter{} - writer.AddWriter(bufferB) + writer.Add(bufferB) writer.Write([]byte("foo")) if bufferA.String() != "foo" { @@ -49,7 +49,7 @@ func TestBroadcastWriter(t *testing.T) { // Test2: bufferA and bufferB should contain "foobar", // while bufferC should only contain "bar" bufferC := &dummyWriter{} - writer.AddWriter(bufferC) + writer.Add(bufferC) writer.Write([]byte("bar")) if bufferA.String() != "foobar" { @@ -87,7 +87,7 @@ func TestBroadcastWriter(t *testing.T) { bufferB.failOnWrite = true bufferC.failOnWrite = true bufferD := &dummyWriter{} - writer.AddWriter(bufferD) + writer.Add(bufferD) writer.Write([]byte("yo")) writer.Write([]byte("ink")) if strings.Contains(bufferB.String(), "yoink") { @@ -114,24 +114,24 @@ func (d devNullCloser) Write(buf []byte) (int, error) { } // This test checks for races. It is only useful when run with the race detector. -func TestRaceBroadcastWriter(t *testing.T) { - writer := New() +func TestRaceUnbuffered(t *testing.T) { + writer := new(Unbuffered) c := make(chan bool) go func() { - writer.AddWriter(devNullCloser(0)) + writer.Add(devNullCloser(0)) c <- true }() writer.Write([]byte("hello")) <-c } -func BenchmarkBroadcastWriter(b *testing.B) { - writer := New() +func BenchmarkUnbuffered(b *testing.B) { + writer := new(Unbuffered) setUpWriter := func() { for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0)) - writer.AddWriter(devNullCloser(0)) - writer.AddWriter(devNullCloser(0)) + writer.Add(devNullCloser(0)) + writer.Add(devNullCloser(0)) + writer.Add(devNullCloser(0)) } } testLine := "Line that thinks that it is log line from docker"