diff --git a/ioutils/writeflusher.go b/ioutils/writeflusher.go index cedb9a0..2b35a26 100644 --- a/ioutils/writeflusher.go +++ b/ioutils/writeflusher.go @@ -1,32 +1,54 @@ package ioutils import ( + "errors" "io" "net/http" "sync" ) -// WriteFlusher wraps the Write and Flush operation. +// WriteFlusher wraps the Write and Flush operation ensuring that every write +// is a flush. In addition, the Close method can be called to intercept +// Read/Write calls if the targets lifecycle has already ended. type WriteFlusher struct { - sync.Mutex + mu sync.Mutex w io.Writer flusher http.Flusher flushed bool + closed error + + // TODO(stevvooe): Use channel for closed instead, remove mutex. Using a + // channel will allow one to properly order the operations. } +var errWriteFlusherClosed = errors.New("writeflusher: closed") + func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - wf.Lock() - defer wf.Unlock() + wf.mu.Lock() + defer wf.mu.Unlock() + if wf.closed != nil { + return 0, wf.closed + } + n, err = wf.w.Write(b) - wf.flushed = true - wf.flusher.Flush() + wf.flush() // every write is a flush. return n, err } // Flush the stream immediately. func (wf *WriteFlusher) Flush() { - wf.Lock() - defer wf.Unlock() + wf.mu.Lock() + defer wf.mu.Unlock() + + wf.flush() +} + +// flush the stream immediately without taking a lock. Used internally. +func (wf *WriteFlusher) flush() { + if wf.closed != nil { + return + } + wf.flushed = true wf.flusher.Flush() } @@ -34,11 +56,30 @@ func (wf *WriteFlusher) Flush() { // Flushed returns the state of flushed. // If it's flushed, return true, or else it return false. func (wf *WriteFlusher) Flushed() bool { - wf.Lock() - defer wf.Unlock() + // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to + // be used to detect whether or a response code has been issued or not. + // Another hook should be used instead. + wf.mu.Lock() + defer wf.mu.Unlock() + return wf.flushed } +// Close closes the write flusher, disallowing any further writes to the +// target. After the flusher is closed, all calls to write or flush will +// result in an error. +func (wf *WriteFlusher) Close() error { + wf.mu.Lock() + defer wf.mu.Unlock() + + if wf.closed != nil { + return wf.closed + } + + wf.closed = errWriteFlusherClosed + return nil +} + // NewWriteFlusher returns a new WriteFlusher. func NewWriteFlusher(w io.Writer) *WriteFlusher { var flusher http.Flusher