diff --git a/ioutils/buffer.go b/ioutils/buffer.go new file mode 100644 index 0000000..3d737b3 --- /dev/null +++ b/ioutils/buffer.go @@ -0,0 +1,51 @@ +package ioutils + +import ( + "errors" + "io" +) + +var errBufferFull = errors.New("buffer is full") + +type fixedBuffer struct { + buf []byte + pos int + lastRead int +} + +func (b *fixedBuffer) Write(p []byte) (int, error) { + n := copy(b.buf[b.pos:cap(b.buf)], p) + b.pos += n + + if n < len(p) { + if b.pos == cap(b.buf) { + return n, errBufferFull + } + return n, io.ErrShortWrite + } + return n, nil +} + +func (b *fixedBuffer) Read(p []byte) (int, error) { + n := copy(p, b.buf[b.lastRead:b.pos]) + b.lastRead += n + return n, nil +} + +func (b *fixedBuffer) Len() int { + return b.pos - b.lastRead +} + +func (b *fixedBuffer) Cap() int { + return cap(b.buf) +} + +func (b *fixedBuffer) Reset() { + b.pos = 0 + b.lastRead = 0 + b.buf = b.buf[:0] +} + +func (b *fixedBuffer) String() string { + return string(b.buf[b.lastRead:b.pos]) +} diff --git a/ioutils/buffer_test.go b/ioutils/buffer_test.go new file mode 100644 index 0000000..41098fa --- /dev/null +++ b/ioutils/buffer_test.go @@ -0,0 +1,75 @@ +package ioutils + +import ( + "bytes" + "testing" +) + +func TestFixedBufferWrite(t *testing.T) { + buf := &fixedBuffer{buf: make([]byte, 0, 64)} + n, err := buf.Write([]byte("hello")) + if err != nil { + t.Fatal(err) + } + + if n != 5 { + t.Fatalf("expected 5 bytes written, got %d", n) + } + + if string(buf.buf[:5]) != "hello" { + t.Fatalf("expected \"hello\", got %q", string(buf.buf[:5])) + } + + n, err = buf.Write(bytes.Repeat([]byte{1}, 64)) + if err != errBufferFull { + t.Fatalf("expected errBufferFull, got %v - %v", err, buf.buf[:64]) + } +} + +func TestFixedBufferRead(t *testing.T) { + buf := &fixedBuffer{buf: make([]byte, 0, 64)} + if _, err := buf.Write([]byte("hello world")); err != nil { + t.Fatal(err) + } + + b := make([]byte, 5) + n, err := buf.Read(b) + if err != nil { + t.Fatal(err) + } + + if n != 5 { + t.Fatalf("expected 5 bytes read, got %d - %s", n, buf.String()) + } + + if string(b) != "hello" { + t.Fatalf("expected \"hello\", got %q", string(b)) + } + + n, err = buf.Read(b) + if err != nil { + t.Fatal(err) + } + + if n != 5 { + t.Fatalf("expected 5 bytes read, got %d", n) + } + + if string(b) != " worl" { + t.Fatalf("expected \" worl\", got %s", string(b)) + } + + b = b[:1] + n, err = buf.Read(b) + if err != nil { + t.Fatal(err) + } + + if n != 1 { + t.Fatalf("expected 1 byte read, got %d - %s", n, buf.String()) + } + + if string(b) != "d" { + t.Fatalf("expected \"d\", got %s", string(b)) + } +} diff --git a/ioutils/bytespipe.go b/ioutils/bytespipe.go index e263c28..f0d21de 100644 --- a/ioutils/bytespipe.go +++ b/ioutils/bytespipe.go @@ -9,12 +9,19 @@ import ( // maxCap is the highest capacity to use in byte slices that buffer data. const maxCap = 1e6 +// minCap is the lowest capacity to use in byte slices that buffer data +const minCap = 64 + // blockThreshold is the minimum number of bytes in the buffer which will cause // a write to BytesPipe to block when allocating a new slice. const blockThreshold = 1e6 -// ErrClosed is returned when Write is called on a closed BytesPipe. -var ErrClosed = errors.New("write to closed BytesPipe") +var ( + // ErrClosed is returned when Write is called on a closed BytesPipe. + ErrClosed = errors.New("write to closed BytesPipe") + + bufPools = make(map[int]*sync.Pool) +) // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). // All written data may be read at most once. Also, BytesPipe allocates @@ -23,22 +30,17 @@ var ErrClosed = errors.New("write to closed BytesPipe") type BytesPipe struct { mu sync.Mutex wait *sync.Cond - buf [][]byte // slice of byte-slices of buffered data - lastRead int // index in the first slice to a read point - bufLen int // length of data buffered over the slices - closeErr error // error to return from next Read. set to nil if not closed. + buf []*fixedBuffer + bufLen int + closeErr error // error to return from next Read. set to nil if not closed. } // NewBytesPipe creates new BytesPipe, initialized by specified slice. // If buf is nil, then it will be initialized with slice which cap is 64. // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). -func NewBytesPipe(buf []byte) *BytesPipe { - if cap(buf) == 0 { - buf = make([]byte, 0, 64) - } - bp := &BytesPipe{ - buf: [][]byte{buf[:0]}, - } +func NewBytesPipe() *BytesPipe { + bp := &BytesPipe{} + bp.buf = append(bp.buf, getBuffer(minCap)) bp.wait = sync.NewCond(&bp.mu) return bp } @@ -47,22 +49,30 @@ func NewBytesPipe(buf []byte) *BytesPipe { // It can allocate new []byte slices in a process of writing. func (bp *BytesPipe) Write(p []byte) (int, error) { bp.mu.Lock() - defer bp.mu.Unlock() + written := 0 for { if bp.closeErr != nil { + bp.mu.Unlock() return written, ErrClosed } - // write data to the last buffer - b := bp.buf[len(bp.buf)-1] - // copy data to the current empty allocated area - n := copy(b[len(b):cap(b)], p) - // increment buffered data length - bp.bufLen += n - // include written data in last buffer - bp.buf[len(bp.buf)-1] = b[:len(b)+n] + if len(bp.buf) == 0 { + bp.buf = append(bp.buf, getBuffer(64)) + } + // get the last buffer + b := bp.buf[len(bp.buf)-1] + + n, err := b.Write(p) written += n + bp.bufLen += n + + // errBufferFull is an error we expect to get if the buffer is full + if err != nil && err != errBufferFull { + bp.wait.Broadcast() + bp.mu.Unlock() + return written, err + } // if there was enough room to write all then break if len(p) == n { @@ -72,20 +82,20 @@ func (bp *BytesPipe) Write(p []byte) (int, error) { // more data: write to the next slice p = p[n:] - // block if too much data is still in the buffer + // make sure the buffer doesn't grow too big from this write for bp.bufLen >= blockThreshold { bp.wait.Wait() } - // allocate slice that has twice the size of the last unless maximum reached - nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) + // add new byte slice to the buffers slice and continue writing + nextCap := b.Cap() * 2 if nextCap > maxCap { nextCap = maxCap } - // add new byte slice to the buffers slice and continue writing - bp.buf = append(bp.buf, make([]byte, 0, nextCap)) + bp.buf = append(bp.buf, getBuffer(nextCap)) } bp.wait.Broadcast() + bp.mu.Unlock() return written, nil } @@ -107,46 +117,60 @@ func (bp *BytesPipe) Close() error { return bp.CloseWithError(nil) } -func (bp *BytesPipe) len() int { - return bp.bufLen - bp.lastRead -} - // Read reads bytes from BytesPipe. // Data could be read only once. func (bp *BytesPipe) Read(p []byte) (n int, err error) { bp.mu.Lock() - defer bp.mu.Unlock() - if bp.len() == 0 { + if bp.bufLen == 0 { if bp.closeErr != nil { + bp.mu.Unlock() return 0, bp.closeErr } bp.wait.Wait() - if bp.len() == 0 && bp.closeErr != nil { + if bp.bufLen == 0 && bp.closeErr != nil { + bp.mu.Unlock() return 0, bp.closeErr } } - for { - read := copy(p, bp.buf[0][bp.lastRead:]) + + for bp.bufLen > 0 { + b := bp.buf[0] + read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error n += read - bp.lastRead += read - if bp.len() == 0 { - // we have read everything. reset to the beginning. - bp.lastRead = 0 - bp.bufLen -= len(bp.buf[0]) - bp.buf[0] = bp.buf[0][:0] - break + bp.bufLen -= read + + if b.Len() == 0 { + // it's empty so return it to the pool and move to the next one + returnBuffer(b) + bp.buf[0] = nil + bp.buf = bp.buf[1:] } - // break if everything was read + if len(p) == read { break } - // more buffered data and more asked. read from next slice. + p = p[read:] - bp.lastRead = 0 - bp.bufLen -= len(bp.buf[0]) - bp.buf[0] = nil // throw away old slice - bp.buf = bp.buf[1:] // switch to next } + bp.wait.Broadcast() + bp.mu.Unlock() return } + +func returnBuffer(b *fixedBuffer) { + b.Reset() + pool := bufPools[b.Cap()] + if pool != nil { + pool.Put(b) + } +} + +func getBuffer(size int) *fixedBuffer { + pool, ok := bufPools[size] + if !ok { + pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }} + bufPools[size] = pool + } + return pool.Get().(*fixedBuffer) +} diff --git a/ioutils/bytespipe_test.go b/ioutils/bytespipe_test.go index b051139..300fb5f 100644 --- a/ioutils/bytespipe_test.go +++ b/ioutils/bytespipe_test.go @@ -9,7 +9,7 @@ import ( ) func TestBytesPipeRead(t *testing.T) { - buf := NewBytesPipe(nil) + buf := NewBytesPipe() buf.Write([]byte("12")) buf.Write([]byte("34")) buf.Write([]byte("56")) @@ -49,14 +49,14 @@ func TestBytesPipeRead(t *testing.T) { } func TestBytesPipeWrite(t *testing.T) { - buf := NewBytesPipe(nil) + buf := NewBytesPipe() buf.Write([]byte("12")) buf.Write([]byte("34")) buf.Write([]byte("56")) buf.Write([]byte("78")) buf.Write([]byte("90")) - if string(buf.buf[0]) != "1234567890" { - t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890") + if buf.buf[0].String() != "1234567890" { + t.Fatalf("Buffer %q, must be %q", buf.buf[0].String(), "1234567890") } } @@ -86,7 +86,7 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) { expected := hex.EncodeToString(hash.Sum(nil)) // write/read through buffer - buf := NewBytesPipe(nil) + buf := NewBytesPipe() hash.Reset() done := make(chan struct{}) @@ -124,9 +124,10 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) { } func BenchmarkBytesPipeWrite(b *testing.B) { + testData := []byte("pretty short line, because why not?") for i := 0; i < b.N; i++ { readBuf := make([]byte, 1024) - buf := NewBytesPipe(nil) + buf := NewBytesPipe() go func() { var err error for err == nil { @@ -134,7 +135,7 @@ func BenchmarkBytesPipeWrite(b *testing.B) { } }() for j := 0; j < 1000; j++ { - buf.Write([]byte("pretty short line, because why not?")) + buf.Write(testData) } buf.Close() } @@ -144,7 +145,7 @@ func BenchmarkBytesPipeRead(b *testing.B) { rd := make([]byte, 512) for i := 0; i < b.N; i++ { b.StopTimer() - buf := NewBytesPipe(nil) + buf := NewBytesPipe() for j := 0; j < 500; j++ { buf.Write(make([]byte, 1024)) }