Cap the amount of buffering done by BytesPipe

Turn BytesPipe's Read and Write functions into blocking, goroutine-safe
functions. Add a CloseWithError function to propagate an error code to
the Read function.

Adjust tests to work with the blocking Read and Write functions.

Remove BufReader, since now its users can use BytesPipe directly.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-09-29 10:58:08 -07:00
parent 442d9a021a
commit 039582f49e
4 changed files with 106 additions and 270 deletions

View file

@ -1,16 +1,32 @@
package ioutils
import (
"errors"
"io"
"sync"
)
// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6
// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
// All written data could be read only once. Also BytesPipe is allocating
// and releasing new byte slices to adjust to current needs, so there won't be
// overgrown buffer after high load peak.
// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
// blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6
// ErrClosed is returned when Write is called on a closed BytesPipe.
var ErrClosed = errors.New("write to closed BytesPipe")
// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct {
mu sync.Mutex
wait *sync.Cond
buf [][]byte // slice of byte-slices of buffered data
lastRead int // index in the first slice to a read point
bufLen int // length of data buffered over the slices
closeErr error // error to return from next Read. set to nil if not closed.
}
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
@ -20,15 +36,23 @@ func NewBytesPipe(buf []byte) *BytesPipe {
if cap(buf) == 0 {
buf = make([]byte, 0, 64)
}
return &BytesPipe{
bp := &BytesPipe{
buf: [][]byte{buf[:0]},
}
bp.wait = sync.NewCond(&bp.mu)
return bp
}
// Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (n int, err error) {
func (bp *BytesPipe) Write(p []byte) (int, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
written := 0
for {
if bp.closeErr != nil {
return written, ErrClosed
}
// write data to the last buffer
b := bp.buf[len(bp.buf)-1]
// copy data to the current empty allocated area
@ -38,6 +62,8 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
// include written data in last buffer
bp.buf[len(bp.buf)-1] = b[:len(b)+n]
written += n
// if there was enough room to write all then break
if len(p) == n {
break
@ -45,15 +71,40 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
// more data: write to the next slice
p = p[n:]
// block if too much data is still in the buffer
for bp.bufLen >= blockThreshold {
bp.wait.Wait()
}
// allocate slice that has twice the size of the last unless maximum reached
nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
if maxCap < nextCap {
if nextCap > maxCap {
nextCap = maxCap
}
// add new byte slice to the buffers slice and continue writing
bp.buf = append(bp.buf, make([]byte, 0, nextCap))
}
return
bp.wait.Broadcast()
return written, nil
}
// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
bp.mu.Lock()
if err != nil {
bp.closeErr = err
} else {
bp.closeErr = io.EOF
}
bp.wait.Broadcast()
bp.mu.Unlock()
return nil
}
// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
return bp.CloseWithError(nil)
}
func (bp *BytesPipe) len() int {
@ -63,6 +114,17 @@ func (bp *BytesPipe) len() int {
// Read reads bytes from BytesPipe.
// Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.mu.Lock()
defer bp.mu.Unlock()
if bp.len() == 0 {
if bp.closeErr != nil {
return 0, bp.closeErr
}
bp.wait.Wait()
if bp.len() == 0 && bp.closeErr != nil {
return 0, bp.closeErr
}
}
for {
read := copy(p, bp.buf[0][bp.lastRead:])
n += read
@ -85,5 +147,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.buf[0] = nil // throw away old slice
bp.buf = bp.buf[1:] // switch to next
}
bp.wait.Broadcast()
return
}