package broadcaster import ( "errors" "io" "sync" ) // Buffered keeps track of one or more observers watching the progress // of an operation. For example, if multiple clients are trying to pull an // image, they share a Buffered struct for the download operation. type Buffered struct { sync.Mutex // c is a channel that observers block on, waiting for the operation // to finish. c chan struct{} // cond is a condition variable used to wake up observers when there's // new data available. cond *sync.Cond // history is a buffer of the progress output so far, so a new observer // can catch up. The history is stored as a slice of separate byte // slices, so that if the writer is a WriteFlusher, the flushes will // happen in the right places. history [][]byte // wg is a WaitGroup used to wait for all writes to finish on Close wg sync.WaitGroup // result is the argument passed to the first call of Close, and // returned to callers of Wait result error } // NewBuffered returns an initialized Buffered structure. func NewBuffered() *Buffered { b := &Buffered{ c: make(chan struct{}), } b.cond = sync.NewCond(b) return b } // closed returns true if and only if the broadcaster has been closed func (broadcaster *Buffered) closed() bool { select { case <-broadcaster.c: return true default: return false } } // receiveWrites runs as a goroutine so that writes don't block the Write // function. It writes the new data in broadcaster.history each time there's // activity on the broadcaster.cond condition variable. func (broadcaster *Buffered) receiveWrites(observer io.Writer) { n := 0 broadcaster.Lock() // The condition variable wait is at the end of this loop, so that the // first iteration will write the history so far. for { newData := broadcaster.history[n:] // Make a copy of newData so we can release the lock sendData := make([][]byte, len(newData), len(newData)) copy(sendData, newData) broadcaster.Unlock() for len(sendData) > 0 { _, err := observer.Write(sendData[0]) if err != nil { broadcaster.wg.Done() return } n++ sendData = sendData[1:] } broadcaster.Lock() // If we are behind, we need to catch up instead of waiting // or handling a closure. if len(broadcaster.history) != n { continue } // detect closure of the broadcast writer if broadcaster.closed() { broadcaster.Unlock() broadcaster.wg.Done() return } broadcaster.cond.Wait() // Mutex is still locked as the loop continues } } // Write adds data to the history buffer, and also writes it to all current // observers. func (broadcaster *Buffered) Write(p []byte) (n int, err error) { broadcaster.Lock() defer broadcaster.Unlock() // Is the broadcaster closed? If so, the write should fail. if broadcaster.closed() { return 0, errors.New("attempted write to a closed broadcaster.Buffered") } // Add message in p to the history slice newEntry := make([]byte, len(p), len(p)) copy(newEntry, p) broadcaster.history = append(broadcaster.history, newEntry) broadcaster.cond.Broadcast() return len(p), nil } // Add adds an observer to the broadcaster. The new observer receives the // data from the history buffer, and also all subsequent data. func (broadcaster *Buffered) Add(w io.Writer) error { // The lock is acquired here so that Add can't race with Close broadcaster.Lock() defer broadcaster.Unlock() if broadcaster.closed() { return errors.New("attempted to add observer to a closed broadcaster.Buffered") } broadcaster.wg.Add(1) go broadcaster.receiveWrites(w) return nil } // CloseWithError signals to all observers that the operation has finished. Its // argument is a result that should be returned to waiters blocking on Wait. func (broadcaster *Buffered) CloseWithError(result error) { broadcaster.Lock() if broadcaster.closed() { broadcaster.Unlock() return } broadcaster.result = result close(broadcaster.c) broadcaster.cond.Broadcast() broadcaster.Unlock() // Don't return until all writers have caught up. broadcaster.wg.Wait() } // Close signals to all observers that the operation has finished. It causes // all calls to Wait to return nil. func (broadcaster *Buffered) Close() { broadcaster.CloseWithError(nil) } // Wait blocks until the operation is marked as completed by the Close method, // and all writer goroutines have completed. It returns the argument that was // passed to Close. func (broadcaster *Buffered) Wait() error { <-broadcaster.c broadcaster.wg.Wait() return broadcaster.result }