Improve performance/reduce allocs of bytespipe

Creates a `fixedBuffer` type that is used to encapsulate functionality
for reading/writing from the underlying byte slices.

Uses lazily-loaded set of sync.Pools for storing buffers that are no
longer needed so they can be re-used.

```
benchmark                     old ns/op     new ns/op     delta
BenchmarkBytesPipeWrite-8     138469        48985         -64.62%
BenchmarkBytesPipeRead-8      130922        56601         -56.77%

benchmark                     old allocs     new allocs     delta
BenchmarkBytesPipeWrite-8     18             8              -55.56%
BenchmarkBytesPipeRead-8      0              0              +0.00%

benchmark                     old bytes     new bytes     delta
BenchmarkBytesPipeWrite-8     66903         1649          -97.54%
BenchmarkBytesPipeRead-8      0             1             +Inf%
```

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2016-03-31 09:50:50 -07:00
parent e2664472b6
commit 729c6a44bc
4 changed files with 208 additions and 57 deletions

51
ioutils/buffer.go Normal file
View file

@ -0,0 +1,51 @@
package ioutils
import (
"errors"
"io"
)
var errBufferFull = errors.New("buffer is full")
type fixedBuffer struct {
buf []byte
pos int
lastRead int
}
func (b *fixedBuffer) Write(p []byte) (int, error) {
n := copy(b.buf[b.pos:cap(b.buf)], p)
b.pos += n
if n < len(p) {
if b.pos == cap(b.buf) {
return n, errBufferFull
}
return n, io.ErrShortWrite
}
return n, nil
}
func (b *fixedBuffer) Read(p []byte) (int, error) {
n := copy(p, b.buf[b.lastRead:b.pos])
b.lastRead += n
return n, nil
}
func (b *fixedBuffer) Len() int {
return b.pos - b.lastRead
}
func (b *fixedBuffer) Cap() int {
return cap(b.buf)
}
func (b *fixedBuffer) Reset() {
b.pos = 0
b.lastRead = 0
b.buf = b.buf[:0]
}
func (b *fixedBuffer) String() string {
return string(b.buf[b.lastRead:b.pos])
}

75
ioutils/buffer_test.go Normal file
View file

@ -0,0 +1,75 @@
package ioutils
import (
"bytes"
"testing"
)
func TestFixedBufferWrite(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 64)}
n, err := buf.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes written, got %d", n)
}
if string(buf.buf[:5]) != "hello" {
t.Fatalf("expected \"hello\", got %q", string(buf.buf[:5]))
}
n, err = buf.Write(bytes.Repeat([]byte{1}, 64))
if err != errBufferFull {
t.Fatalf("expected errBufferFull, got %v - %v", err, buf.buf[:64])
}
}
func TestFixedBufferRead(t *testing.T) {
buf := &fixedBuffer{buf: make([]byte, 0, 64)}
if _, err := buf.Write([]byte("hello world")); err != nil {
t.Fatal(err)
}
b := make([]byte, 5)
n, err := buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes read, got %d - %s", n, buf.String())
}
if string(b) != "hello" {
t.Fatalf("expected \"hello\", got %q", string(b))
}
n, err = buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Fatalf("expected 5 bytes read, got %d", n)
}
if string(b) != " worl" {
t.Fatalf("expected \" worl\", got %s", string(b))
}
b = b[:1]
n, err = buf.Read(b)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("expected 1 byte read, got %d - %s", n, buf.String())
}
if string(b) != "d" {
t.Fatalf("expected \"d\", got %s", string(b))
}
}

View file

@ -9,12 +9,19 @@ import (
// maxCap is the highest capacity to use in byte slices that buffer data. // maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6 const maxCap = 1e6
// minCap is the lowest capacity to use in byte slices that buffer data
const minCap = 64
// blockThreshold is the minimum number of bytes in the buffer which will cause // blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice. // a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6 const blockThreshold = 1e6
var (
// ErrClosed is returned when Write is called on a closed BytesPipe. // ErrClosed is returned when Write is called on a closed BytesPipe.
var ErrClosed = errors.New("write to closed BytesPipe") ErrClosed = errors.New("write to closed BytesPipe")
bufPools = make(map[int]*sync.Pool)
)
// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates // All written data may be read at most once. Also, BytesPipe allocates
@ -23,22 +30,17 @@ var ErrClosed = errors.New("write to closed BytesPipe")
type BytesPipe struct { type BytesPipe struct {
mu sync.Mutex mu sync.Mutex
wait *sync.Cond wait *sync.Cond
buf [][]byte // slice of byte-slices of buffered data buf []*fixedBuffer
lastRead int // index in the first slice to a read point bufLen int
bufLen int // length of data buffered over the slices
closeErr error // error to return from next Read. set to nil if not closed. closeErr error // error to return from next Read. set to nil if not closed.
} }
// NewBytesPipe creates new BytesPipe, initialized by specified slice. // NewBytesPipe creates new BytesPipe, initialized by specified slice.
// If buf is nil, then it will be initialized with slice which cap is 64. // If buf is nil, then it will be initialized with slice which cap is 64.
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
func NewBytesPipe(buf []byte) *BytesPipe { func NewBytesPipe() *BytesPipe {
if cap(buf) == 0 { bp := &BytesPipe{}
buf = make([]byte, 0, 64) bp.buf = append(bp.buf, getBuffer(minCap))
}
bp := &BytesPipe{
buf: [][]byte{buf[:0]},
}
bp.wait = sync.NewCond(&bp.mu) bp.wait = sync.NewCond(&bp.mu)
return bp return bp
} }
@ -47,22 +49,30 @@ func NewBytesPipe(buf []byte) *BytesPipe {
// It can allocate new []byte slices in a process of writing. // It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (int, error) { func (bp *BytesPipe) Write(p []byte) (int, error) {
bp.mu.Lock() bp.mu.Lock()
defer bp.mu.Unlock()
written := 0 written := 0
for { for {
if bp.closeErr != nil { if bp.closeErr != nil {
bp.mu.Unlock()
return written, ErrClosed return written, ErrClosed
} }
// write data to the last buffer
b := bp.buf[len(bp.buf)-1]
// copy data to the current empty allocated area
n := copy(b[len(b):cap(b)], p)
// increment buffered data length
bp.bufLen += n
// include written data in last buffer
bp.buf[len(bp.buf)-1] = b[:len(b)+n]
if len(bp.buf) == 0 {
bp.buf = append(bp.buf, getBuffer(64))
}
// get the last buffer
b := bp.buf[len(bp.buf)-1]
n, err := b.Write(p)
written += n written += n
bp.bufLen += n
// errBufferFull is an error we expect to get if the buffer is full
if err != nil && err != errBufferFull {
bp.wait.Broadcast()
bp.mu.Unlock()
return written, err
}
// if there was enough room to write all then break // if there was enough room to write all then break
if len(p) == n { if len(p) == n {
@ -72,20 +82,20 @@ func (bp *BytesPipe) Write(p []byte) (int, error) {
// more data: write to the next slice // more data: write to the next slice
p = p[n:] p = p[n:]
// block if too much data is still in the buffer // make sure the buffer doesn't grow too big from this write
for bp.bufLen >= blockThreshold { for bp.bufLen >= blockThreshold {
bp.wait.Wait() bp.wait.Wait()
} }
// allocate slice that has twice the size of the last unless maximum reached // add new byte slice to the buffers slice and continue writing
nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) nextCap := b.Cap() * 2
if nextCap > maxCap { if nextCap > maxCap {
nextCap = maxCap nextCap = maxCap
} }
// add new byte slice to the buffers slice and continue writing bp.buf = append(bp.buf, getBuffer(nextCap))
bp.buf = append(bp.buf, make([]byte, 0, nextCap))
} }
bp.wait.Broadcast() bp.wait.Broadcast()
bp.mu.Unlock()
return written, nil return written, nil
} }
@ -107,46 +117,60 @@ func (bp *BytesPipe) Close() error {
return bp.CloseWithError(nil) return bp.CloseWithError(nil)
} }
func (bp *BytesPipe) len() int {
return bp.bufLen - bp.lastRead
}
// Read reads bytes from BytesPipe. // Read reads bytes from BytesPipe.
// Data could be read only once. // Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) { func (bp *BytesPipe) Read(p []byte) (n int, err error) {
bp.mu.Lock() bp.mu.Lock()
defer bp.mu.Unlock() if bp.bufLen == 0 {
if bp.len() == 0 {
if bp.closeErr != nil { if bp.closeErr != nil {
bp.mu.Unlock()
return 0, bp.closeErr return 0, bp.closeErr
} }
bp.wait.Wait() bp.wait.Wait()
if bp.len() == 0 && bp.closeErr != nil { if bp.bufLen == 0 && bp.closeErr != nil {
bp.mu.Unlock()
return 0, bp.closeErr return 0, bp.closeErr
} }
} }
for {
read := copy(p, bp.buf[0][bp.lastRead:]) for bp.bufLen > 0 {
b := bp.buf[0]
read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error
n += read n += read
bp.lastRead += read bp.bufLen -= read
if bp.len() == 0 {
// we have read everything. reset to the beginning. if b.Len() == 0 {
bp.lastRead = 0 // it's empty so return it to the pool and move to the next one
bp.bufLen -= len(bp.buf[0]) returnBuffer(b)
bp.buf[0] = bp.buf[0][:0] bp.buf[0] = nil
break bp.buf = bp.buf[1:]
} }
// break if everything was read
if len(p) == read { if len(p) == read {
break break
} }
// more buffered data and more asked. read from next slice.
p = p[read:] p = p[read:]
bp.lastRead = 0
bp.bufLen -= len(bp.buf[0])
bp.buf[0] = nil // throw away old slice
bp.buf = bp.buf[1:] // switch to next
} }
bp.wait.Broadcast() bp.wait.Broadcast()
bp.mu.Unlock()
return return
} }
func returnBuffer(b *fixedBuffer) {
b.Reset()
pool := bufPools[b.Cap()]
if pool != nil {
pool.Put(b)
}
}
func getBuffer(size int) *fixedBuffer {
pool, ok := bufPools[size]
if !ok {
pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }}
bufPools[size] = pool
}
return pool.Get().(*fixedBuffer)
}

View file

@ -9,7 +9,7 @@ import (
) )
func TestBytesPipeRead(t *testing.T) { func TestBytesPipeRead(t *testing.T) {
buf := NewBytesPipe(nil) buf := NewBytesPipe()
buf.Write([]byte("12")) buf.Write([]byte("12"))
buf.Write([]byte("34")) buf.Write([]byte("34"))
buf.Write([]byte("56")) buf.Write([]byte("56"))
@ -49,14 +49,14 @@ func TestBytesPipeRead(t *testing.T) {
} }
func TestBytesPipeWrite(t *testing.T) { func TestBytesPipeWrite(t *testing.T) {
buf := NewBytesPipe(nil) buf := NewBytesPipe()
buf.Write([]byte("12")) buf.Write([]byte("12"))
buf.Write([]byte("34")) buf.Write([]byte("34"))
buf.Write([]byte("56")) buf.Write([]byte("56"))
buf.Write([]byte("78")) buf.Write([]byte("78"))
buf.Write([]byte("90")) buf.Write([]byte("90"))
if string(buf.buf[0]) != "1234567890" { if buf.buf[0].String() != "1234567890" {
t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890") t.Fatalf("Buffer %q, must be %q", buf.buf[0].String(), "1234567890")
} }
} }
@ -86,7 +86,7 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
expected := hex.EncodeToString(hash.Sum(nil)) expected := hex.EncodeToString(hash.Sum(nil))
// write/read through buffer // write/read through buffer
buf := NewBytesPipe(nil) buf := NewBytesPipe()
hash.Reset() hash.Reset()
done := make(chan struct{}) done := make(chan struct{})
@ -124,9 +124,10 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) {
} }
func BenchmarkBytesPipeWrite(b *testing.B) { func BenchmarkBytesPipeWrite(b *testing.B) {
testData := []byte("pretty short line, because why not?")
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
readBuf := make([]byte, 1024) readBuf := make([]byte, 1024)
buf := NewBytesPipe(nil) buf := NewBytesPipe()
go func() { go func() {
var err error var err error
for err == nil { for err == nil {
@ -134,7 +135,7 @@ func BenchmarkBytesPipeWrite(b *testing.B) {
} }
}() }()
for j := 0; j < 1000; j++ { for j := 0; j < 1000; j++ {
buf.Write([]byte("pretty short line, because why not?")) buf.Write(testData)
} }
buf.Close() buf.Close()
} }
@ -144,7 +145,7 @@ func BenchmarkBytesPipeRead(b *testing.B) {
rd := make([]byte, 512) rd := make([]byte, 512)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
b.StopTimer() b.StopTimer()
buf := NewBytesPipe(nil) buf := NewBytesPipe()
for j := 0; j < 500; j++ { for j := 0; j < 500; j++ {
buf.Write(make([]byte, 1024)) buf.Write(make([]byte, 1024))
} }