Merge pull request #17877 from aaronlehmann/capped-bytespipe

Cap the amount of buffering done by BytesPipe
This commit is contained in:
unclejack 2015-11-14 00:51:11 +02:00
commit a27dd533e1
4 changed files with 106 additions and 270 deletions

View file

@ -1,16 +1,32 @@
package ioutils package ioutils
import (
"errors"
"io"
"sync"
)
// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6 const maxCap = 1e6
// BytesPipe is io.ReadWriter which works similarly to pipe(queue). // blockThreshold is the minimum number of bytes in the buffer which will cause
// All written data could be read only once. Also BytesPipe is allocating // a write to BytesPipe to block when allocating a new slice.
// and releasing new byte slices to adjust to current needs, so there won't be const blockThreshold = 1e6
// overgrown buffer after high load peak.
// BytesPipe isn't goroutine-safe, caller must synchronize it if needed. // ErrClosed is returned when Write is called on a closed BytesPipe.
var ErrClosed = errors.New("write to closed BytesPipe")
// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct { type BytesPipe struct {
mu sync.Mutex
wait *sync.Cond
buf [][]byte // slice of byte-slices of buffered data buf [][]byte // slice of byte-slices of buffered data
lastRead int // index in the first slice to a read point lastRead int // index in the first slice to a read point
bufLen int // length of data buffered over the slices bufLen int // length of data buffered over the slices
closeErr error // error to return from next Read. set to nil if not closed.
} }
// NewBytesPipe creates new BytesPipe, initialized by specified slice. // NewBytesPipe creates new BytesPipe, initialized by specified slice.
@ -20,15 +36,23 @@ func NewBytesPipe(buf []byte) *BytesPipe {
if cap(buf) == 0 { if cap(buf) == 0 {
buf = make([]byte, 0, 64) buf = make([]byte, 0, 64)
} }
return &BytesPipe{ bp := &BytesPipe{
buf: [][]byte{buf[:0]}, buf: [][]byte{buf[:0]},
} }
bp.wait = sync.NewCond(&bp.mu)
return bp
} }
// Write writes p to BytesPipe. // Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing. // It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (n int, err error) { func (bp *BytesPipe) Write(p []byte) (int, error) {
bp.mu.Lock()
defer bp.mu.Unlock()
written := 0
for { for {
if bp.closeErr != nil {
return written, ErrClosed
}
// write data to the last buffer // write data to the last buffer
b := bp.buf[len(bp.buf)-1] b := bp.buf[len(bp.buf)-1]
// copy data to the current empty allocated area // copy data to the current empty allocated area
@ -38,6 +62,8 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
// include written data in last buffer // include written data in last buffer
bp.buf[len(bp.buf)-1] = b[:len(b)+n] bp.buf[len(bp.buf)-1] = b[:len(b)+n]
written += n
// if there was enough room to write all then break // if there was enough room to write all then break
if len(p) == n { if len(p) == n {
break break
@ -45,15 +71,40 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) {
// more data: write to the next slice // more data: write to the next slice
p = p[n:] p = p[n:]
// block if too much data is still in the buffer
for bp.bufLen >= blockThreshold {
bp.wait.Wait()
}
// allocate slice that has twice the size of the last unless maximum reached // allocate slice that has twice the size of the last unless maximum reached
nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
if maxCap < nextCap { if nextCap > maxCap {
nextCap = maxCap nextCap = maxCap
} }
// add new byte slice to the buffers slice and continue writing // add new byte slice to the buffers slice and continue writing
bp.buf = append(bp.buf, make([]byte, 0, nextCap)) bp.buf = append(bp.buf, make([]byte, 0, nextCap))
} }
return bp.wait.Broadcast()
return written, nil
}
// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
bp.mu.Lock()
if err != nil {
bp.closeErr = err
} else {
bp.closeErr = io.EOF
}
bp.wait.Broadcast()
bp.mu.Unlock()
return nil
}
// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
return bp.CloseWithError(nil)
} }
func (bp *BytesPipe) len() int { func (bp *BytesPipe) len() int {
@ -63,6 +114,17 @@ func (bp *BytesPipe) len() int {
// Read reads bytes from BytesPipe. // Read reads bytes from BytesPipe.
// Data could be read only once. // Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) { func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.mu.Lock()
defer bp.mu.Unlock()
if bp.len() == 0 {
if bp.closeErr != nil {
return 0, bp.closeErr
}
bp.wait.Wait()
if bp.len() == 0 && bp.closeErr != nil {
return 0, bp.closeErr
}
}
for { for {
read := copy(p, bp.buf[0][bp.lastRead:]) read := copy(p, bp.buf[0][bp.lastRead:])
n += read n += read
@ -85,5 +147,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.buf[0] = nil // throw away old slice bp.buf[0] = nil // throw away old slice
bp.buf = bp.buf[1:] // switch to next bp.buf = bp.buf[1:] // switch to next
} }
bp.wait.Broadcast()
return return
} }

View file

@ -3,7 +3,9 @@ package ioutils
import ( import (
"crypto/sha1" "crypto/sha1"
"encoding/hex" "encoding/hex"
"math/rand"
"testing" "testing"
"time"
) )
func TestBytesPipeRead(t *testing.T) { func TestBytesPipeRead(t *testing.T) {
@ -86,17 +88,12 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
// write/read through buffer // write/read through buffer
buf := NewBytesPipe(nil) buf := NewBytesPipe(nil)
hash.Reset() hash.Reset()
for i := 0; i < c.iterations; i++ {
for w := 0; w < c.writesPerLoop; w++ { done := make(chan struct{})
buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
} go func() {
for r := 0; r < c.readsPerLoop; r++ { // random delay before read starts
p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)]) <-time.After(time.Duration(rand.Intn(10)) * time.Millisecond)
n, _ := buf.Read(p)
hash.Write(p[:n])
}
}
// read rest of the data from buffer
for i := 0; ; i++ { for i := 0; ; i++ {
p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)]) p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
n, _ := buf.Read(p) n, _ := buf.Read(p)
@ -105,6 +102,18 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
} }
hash.Write(p[:n]) hash.Write(p[:n])
} }
close(done)
}()
for i := 0; i < c.iterations; i++ {
for w := 0; w < c.writesPerLoop; w++ {
buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
}
}
buf.Close()
<-done
actual := hex.EncodeToString(hash.Sum(nil)) actual := hex.EncodeToString(hash.Sum(nil))
if expected != actual { if expected != actual {
@ -116,24 +125,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
func BenchmarkBytesPipeWrite(b *testing.B) { func BenchmarkBytesPipeWrite(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
readBuf := make([]byte, 1024)
buf := NewBytesPipe(nil) buf := NewBytesPipe(nil)
go func() {
var err error
for err == nil {
_, err = buf.Read(readBuf)
}
}()
for j := 0; j < 1000; j++ { for j := 0; j < 1000; j++ {
buf.Write([]byte("pretty short line, because why not?")) buf.Write([]byte("pretty short line, because why not?"))
} }
buf.Close()
} }
} }
func BenchmarkBytesPipeRead(b *testing.B) { func BenchmarkBytesPipeRead(b *testing.B) {
rd := make([]byte, 1024) rd := make([]byte, 512)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer() b.StopTimer()
buf := NewBytesPipe(nil) buf := NewBytesPipe(nil)
for j := 0; j < 1000; j++ { for j := 0; j < 500; j++ {
buf.Write(make([]byte, 1024)) buf.Write(make([]byte, 1024))
} }
b.StartTimer() b.StartTimer()
for j := 0; j < 1000; j++ { for j := 0; j < 1000; j++ {
if n, _ := buf.Read(rd); n != 1024 { if n, _ := buf.Read(rd); n != 512 {
b.Fatalf("Wrong number of bytes: %d", n) b.Fatalf("Wrong number of bytes: %d", n)
} }
} }

View file

@ -4,7 +4,6 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"io" "io"
"sync"
) )
type readCloserWrapper struct { type readCloserWrapper struct {
@ -45,92 +44,6 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
} }
} }
// bufReader allows the underlying reader to continue to produce
// output by pre-emptively reading from the wrapped reader.
// This is achieved by buffering this data in bufReader's
// expanding buffer.
type bufReader struct {
sync.Mutex
buf io.ReadWriter
reader io.Reader
err error
wait sync.Cond
drainBuf []byte
}
// NewBufReader returns a new bufReader.
func NewBufReader(r io.Reader) io.ReadCloser {
reader := &bufReader{
buf: NewBytesPipe(nil),
reader: r,
drainBuf: make([]byte, 1024),
}
reader.wait.L = &reader.Mutex
go reader.drain()
return reader
}
// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
reader := &bufReader{
buf: buffer,
drainBuf: drainBuffer,
reader: r,
}
reader.wait.L = &reader.Mutex
go reader.drain()
return reader
}
func (r *bufReader) drain() {
for {
//Call to scheduler is made to yield from this goroutine.
//This avoids goroutine looping here when n=0,err=nil, fixes code hangs when run with GCC Go.
callSchedulerIfNecessary()
n, err := r.reader.Read(r.drainBuf)
r.Lock()
if err != nil {
r.err = err
} else {
if n == 0 {
// nothing written, no need to signal
r.Unlock()
continue
}
r.buf.Write(r.drainBuf[:n])
}
r.wait.Signal()
r.Unlock()
if err != nil {
break
}
}
}
func (r *bufReader) Read(p []byte) (n int, err error) {
r.Lock()
defer r.Unlock()
for {
n, err = r.buf.Read(p)
if n > 0 {
return n, err
}
if r.err != nil {
return 0, r.err
}
r.wait.Wait()
}
}
// Close closes the bufReader
func (r *bufReader) Close() error {
closer, ok := r.reader.(io.ReadCloser)
if !ok {
return nil
}
return closer.Close()
}
// HashData returns the sha256 sum of src. // HashData returns the sha256 sum of src.
func HashData(src io.Reader) (string, error) { func HashData(src io.Reader) (string, error) {
h := sha256.New() h := sha256.New()

View file

@ -1,13 +1,9 @@
package ioutils package ioutils
import ( import (
"bytes"
"fmt" "fmt"
"io"
"io/ioutil"
"strings" "strings"
"testing" "testing"
"time"
) )
// Implement io.Reader // Implement io.Reader
@ -58,101 +54,6 @@ func TestReaderErrWrapperRead(t *testing.T) {
} }
} }
func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
reader, writer := io.Pipe()
drainBuffer := make([]byte, 1024)
buffer := NewBytesPipe(nil)
bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
// Write everything down to a Pipe
// Usually, a pipe should block but because of the buffered reader,
// the writes will go through
done := make(chan bool)
go func() {
writer.Write([]byte("hello world"))
writer.Close()
done <- true
}()
// Drain the reader *after* everything has been written, just to verify
// it is indeed buffering
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout")
}
output, err := ioutil.ReadAll(bufreader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(output, []byte("hello world")) {
t.Error(string(output))
}
}
func TestBufReader(t *testing.T) {
reader, writer := io.Pipe()
bufreader := NewBufReader(reader)
// Write everything down to a Pipe
// Usually, a pipe should block but because of the buffered reader,
// the writes will go through
done := make(chan bool)
go func() {
writer.Write([]byte("hello world"))
writer.Close()
done <- true
}()
// Drain the reader *after* everything has been written, just to verify
// it is indeed buffering
<-done
output, err := ioutil.ReadAll(bufreader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(output, []byte("hello world")) {
t.Error(string(output))
}
}
func TestBufReaderCloseWithNonReaderCloser(t *testing.T) {
reader := strings.NewReader("buffer")
bufreader := NewBufReader(reader)
if err := bufreader.Close(); err != nil {
t.Fatal(err)
}
}
// implements io.ReadCloser
type simpleReaderCloser struct {
err error
}
func (r *simpleReaderCloser) Read(p []byte) (n int, err error) {
return 0, r.err
}
func (r *simpleReaderCloser) Close() error {
r.err = io.EOF
return nil
}
func TestBufReaderCloseWithReaderCloser(t *testing.T) {
reader := &simpleReaderCloser{}
bufreader := NewBufReader(reader)
err := bufreader.Close()
if err != nil {
t.Fatal(err)
}
}
func TestHashData(t *testing.T) { func TestHashData(t *testing.T) {
reader := strings.NewReader("hash-me") reader := strings.NewReader("hash-me")
actual, err := HashData(reader) actual, err := HashData(reader)
@ -164,61 +65,3 @@ func TestHashData(t *testing.T) {
t.Fatalf("Expecting %s, got %s", expected, actual) t.Fatalf("Expecting %s, got %s", expected, actual)
} }
} }
type repeatedReader struct {
readCount int
maxReads int
data []byte
}
func newRepeatedReader(max int, data []byte) *repeatedReader {
return &repeatedReader{0, max, data}
}
func (r *repeatedReader) Read(p []byte) (int, error) {
if r.readCount >= r.maxReads {
return 0, io.EOF
}
r.readCount++
n := copy(p, r.data)
return n, nil
}
func testWithData(data []byte, reads int) {
reader := newRepeatedReader(reads, data)
bufReader := NewBufReader(reader)
io.Copy(ioutil.Discard, bufReader)
}
func Benchmark1M10BytesReads(b *testing.B) {
reads := 1000000
readSize := int64(10)
data := make([]byte, readSize)
b.SetBytes(readSize * int64(reads))
b.ResetTimer()
for i := 0; i < b.N; i++ {
testWithData(data, reads)
}
}
func Benchmark1M1024BytesReads(b *testing.B) {
reads := 1000000
readSize := int64(1024)
data := make([]byte, readSize)
b.SetBytes(readSize * int64(reads))
b.ResetTimer()
for i := 0; i < b.N; i++ {
testWithData(data, reads)
}
}
func Benchmark10k32KBytesReads(b *testing.B) {
reads := 10000
readSize := int64(32 * 1024)
data := make([]byte, readSize)
b.SetBytes(readSize * int64(reads))
b.ResetTimer()
for i := 0; i < b.N; i++ {
testWithData(data, reads)
}
}