From 434c274741d24387e0e48b0cb244a82ecb858ed6 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 9 Sep 2015 09:47:24 -0700 Subject: [PATCH] Add BytesPipe datastructure to ioutils Signed-off-by: Alexander Morozov --- ioutils/bytespipe.go | 82 +++++++++++++++++++++++++++++++++++++++ ioutils/bytespipe_test.go | 81 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 ioutils/bytespipe.go create mode 100644 ioutils/bytespipe_test.go diff --git a/ioutils/bytespipe.go b/ioutils/bytespipe.go new file mode 100644 index 0000000..ab06fa1 --- /dev/null +++ b/ioutils/bytespipe.go @@ -0,0 +1,82 @@ +package ioutils + +const maxCap = 10 * 1e6 + +// BytesPipe is io.ReadWriter which works similary to pipe(queue). +// All written data could be read only once. Also BytesPipe trying to adjust +// internal []byte slice to current needs, so there won't be overgrown buffer +// after highload peak. +// BytesPipe isn't goroutine-safe, caller must synchronize it if needed. +type BytesPipe struct { + buf []byte + lastRead int +} + +// NewBytesPipe creates new BytesPipe, initialized by specified slice. +// If buf is nil, then it will be initialized with slice which cap is 64. +// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). +func NewBytesPipe(buf []byte) *BytesPipe { + if cap(buf) == 0 { + buf = make([]byte, 0, 64) + } + return &BytesPipe{ + buf: buf[:0], + } +} + +func (bp *BytesPipe) grow(n int) { + if len(bp.buf)+n > cap(bp.buf) { + // not enough space + var buf []byte + remain := bp.len() + if remain+n <= cap(bp.buf)/2 { + // enough space in current buffer, just move data to head + copy(bp.buf, bp.buf[bp.lastRead:]) + buf = bp.buf[:remain] + } else { + // reallocate buffer + buf = make([]byte, remain, 2*cap(bp.buf)+n) + copy(buf, bp.buf[bp.lastRead:]) + } + bp.buf = buf + bp.lastRead = 0 + } +} + +// Write writes p to BytesPipe. +// It can increase cap of internal []byte slice in a process of writing. +func (bp *BytesPipe) Write(p []byte) (n int, err error) { + bp.grow(len(p)) + bp.buf = append(bp.buf, p...) + return +} + +func (bp *BytesPipe) len() int { + return len(bp.buf) - bp.lastRead +} + +func (bp *BytesPipe) crop() { + // shortcut for empty buffer + if bp.lastRead == len(bp.buf) { + bp.lastRead = 0 + bp.buf = bp.buf[:0] + } + r := bp.len() + // if we have too large buffer for too small data + if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 { + copy(bp.buf, bp.buf[bp.lastRead:]) + // will use same underlying slice until reach cap + bp.buf = bp.buf[:r : cap(bp.buf)/2] + bp.lastRead = 0 + } +} + +// Read reads bytes from BytesPipe. +// Data could be read only once. +// Internal []byte slice could be shrinked. +func (bp *BytesPipe) Read(p []byte) (n int, err error) { + n = copy(p, bp.buf[bp.lastRead:]) + bp.lastRead += n + bp.crop() + return +} diff --git a/ioutils/bytespipe_test.go b/ioutils/bytespipe_test.go new file mode 100644 index 0000000..c7cf795 --- /dev/null +++ b/ioutils/bytespipe_test.go @@ -0,0 +1,81 @@ +package ioutils + +import "testing" + +func TestBytesPipeRead(t *testing.T) { + buf := NewBytesPipe(nil) + buf.Write([]byte("12")) + buf.Write([]byte("34")) + buf.Write([]byte("56")) + buf.Write([]byte("78")) + buf.Write([]byte("90")) + rd := make([]byte, 4) + n, err := buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 4 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4) + } + if string(rd) != "1234" { + t.Fatalf("Read %s, but must be %s", rd, "1234") + } + n, err = buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 4 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4) + } + if string(rd) != "5678" { + t.Fatalf("Read %s, but must be %s", rd, "5679") + } + n, err = buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 2 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2) + } + if string(rd[:n]) != "90" { + t.Fatalf("Read %s, but must be %s", rd, "90") + } +} + +func TestBytesPipeWrite(t *testing.T) { + buf := NewBytesPipe(nil) + buf.Write([]byte("12")) + buf.Write([]byte("34")) + buf.Write([]byte("56")) + buf.Write([]byte("78")) + buf.Write([]byte("90")) + if string(buf.buf) != "1234567890" { + t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890") + } +} + +func BenchmarkBytesPipeWrite(b *testing.B) { + for i := 0; i < b.N; i++ { + buf := NewBytesPipe(nil) + for j := 0; j < 1000; j++ { + buf.Write([]byte("pretty short line, because why not?")) + } + } +} + +func BenchmarkBytesPipeRead(b *testing.B) { + rd := make([]byte, 1024) + for i := 0; i < b.N; i++ { + b.StopTimer() + buf := NewBytesPipe(nil) + for j := 0; j < 1000; j++ { + buf.Write(make([]byte, 1024)) + } + b.StartTimer() + for j := 0; j < 1000; j++ { + if n, _ := buf.Read(rd); n != 1024 { + b.Fatalf("Wrong number of bytes: %d", n) + } + } + } +}