Cleanup WriteFlusher

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2015-12-18 16:06:28 -05:00
parent 24854e692c
commit 7982606371

View file

@ -1,9 +1,7 @@
package ioutils package ioutils
import ( import (
"errors"
"io" "io"
"net/http"
"sync" "sync"
) )
@ -11,45 +9,43 @@ import (
// is a flush. In addition, the Close method can be called to intercept // is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended. // Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct { type WriteFlusher struct {
mu sync.Mutex w io.Writer
w io.Writer flusher flusher
flusher http.Flusher flushed chan struct{}
flushed bool flushedOnce sync.Once
closed error closed chan struct{}
closeLock sync.Mutex
// TODO(stevvooe): Use channel for closed instead, remove mutex. Using a
// channel will allow one to properly order the operations.
} }
var errWriteFlusherClosed = errors.New("writeflusher: closed") type flusher interface {
Flush()
}
var errWriteFlusherClosed = io.EOF
func (wf *WriteFlusher) Write(b []byte) (n int, err error) { func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
wf.mu.Lock() select {
defer wf.mu.Unlock() case <-wf.closed:
if wf.closed != nil { return 0, errWriteFlusherClosed
return 0, wf.closed default:
} }
n, err = wf.w.Write(b) n, err = wf.w.Write(b)
wf.flush() // every write is a flush. wf.Flush() // every write is a flush.
return n, err return n, err
} }
// Flush the stream immediately. // Flush the stream immediately.
func (wf *WriteFlusher) Flush() { func (wf *WriteFlusher) Flush() {
wf.mu.Lock() select {
defer wf.mu.Unlock() case <-wf.closed:
wf.flush()
}
// flush the stream immediately without taking a lock. Used internally.
func (wf *WriteFlusher) flush() {
if wf.closed != nil {
return return
default:
} }
wf.flushed = true wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush() wf.flusher.Flush()
} }
@ -59,34 +55,38 @@ func (wf *WriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not. // be used to detect whether or a response code has been issued or not.
// Another hook should be used instead. // Another hook should be used instead.
wf.mu.Lock() var flushed bool
defer wf.mu.Unlock() select {
case <-wf.flushed:
return wf.flushed flushed = true
default:
}
return flushed
} }
// Close closes the write flusher, disallowing any further writes to the // Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will // target. After the flusher is closed, all calls to write or flush will
// result in an error. // result in an error.
func (wf *WriteFlusher) Close() error { func (wf *WriteFlusher) Close() error {
wf.mu.Lock() wf.closeLock.Lock()
defer wf.mu.Unlock() defer wf.closeLock.Unlock()
if wf.closed != nil { select {
return wf.closed case <-wf.closed:
return errWriteFlusherClosed
default:
close(wf.closed)
} }
wf.closed = errWriteFlusherClosed
return nil return nil
} }
// NewWriteFlusher returns a new WriteFlusher. // NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher { func NewWriteFlusher(w io.Writer) *WriteFlusher {
var flusher http.Flusher var fl flusher
if f, ok := w.(http.Flusher); ok { if f, ok := w.(flusher); ok {
flusher = f fl = f
} else { } else {
flusher = &NopFlusher{} fl = &NopFlusher{}
} }
return &WriteFlusher{w: w, flusher: flusher} return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
} }