Make Broadcaster Wait function wait for all writers to finish before returning

Also, use the channel to determine if the broadcaster is closed,
removing the redundant isClosed variable.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-09-10 10:08:02 -07:00
parent 572a785718
commit 5bebf3cbac

View file

@ -24,9 +24,6 @@ type Broadcaster struct {
history [][]byte history [][]byte
// wg is a WaitGroup used to wait for all writes to finish on Close // wg is a WaitGroup used to wait for all writes to finish on Close
wg sync.WaitGroup wg sync.WaitGroup
// isClosed is set to true when Close is called to avoid closing c
// multiple times.
isClosed bool
// result is the argument passed to the first call of Close, and // result is the argument passed to the first call of Close, and
// returned to callers of Wait // returned to callers of Wait
result error result error
@ -141,11 +138,10 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error {
// argument is a result that should be returned to waiters blocking on Wait. // argument is a result that should be returned to waiters blocking on Wait.
func (broadcaster *Broadcaster) CloseWithError(result error) { func (broadcaster *Broadcaster) CloseWithError(result error) {
broadcaster.Lock() broadcaster.Lock()
if broadcaster.isClosed { if broadcaster.closed() {
broadcaster.Unlock() broadcaster.Unlock()
return return
} }
broadcaster.isClosed = true
broadcaster.result = result broadcaster.result = result
close(broadcaster.c) close(broadcaster.c)
broadcaster.cond.Broadcast() broadcaster.cond.Broadcast()
@ -161,9 +157,11 @@ func (broadcaster *Broadcaster) Close() {
broadcaster.CloseWithError(nil) broadcaster.CloseWithError(nil)
} }
// Wait blocks until the operation is marked as completed by the Done method. // Wait blocks until the operation is marked as completed by the Close method,
// It returns the argument that was passed to Close. // and all writer goroutines have completed. It returns the argument that was
// passed to Close.
func (broadcaster *Broadcaster) Wait() error { func (broadcaster *Broadcaster) Wait() error {
<-broadcaster.c <-broadcaster.c
broadcaster.wg.Wait()
return broadcaster.result return broadcaster.result
} }