diff --git a/ioutils/bytespipe.go b/ioutils/bytespipe.go index 932e1d1..e263c28 100644 --- a/ioutils/bytespipe.go +++ b/ioutils/bytespipe.go @@ -1,16 +1,32 @@ package ioutils +import ( + "errors" + "io" + "sync" +) + +// maxCap is the highest capacity to use in byte slices that buffer data. const maxCap = 1e6 -// BytesPipe is io.ReadWriter which works similarly to pipe(queue). -// All written data could be read only once. Also BytesPipe is allocating -// and releasing new byte slices to adjust to current needs, so there won't be -// overgrown buffer after high load peak. -// BytesPipe isn't goroutine-safe, caller must synchronize it if needed. +// blockThreshold is the minimum number of bytes in the buffer which will cause +// a write to BytesPipe to block when allocating a new slice. +const blockThreshold = 1e6 + +// ErrClosed is returned when Write is called on a closed BytesPipe. +var ErrClosed = errors.New("write to closed BytesPipe") + +// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). +// All written data may be read at most once. Also, BytesPipe allocates +// and releases new byte slices to adjust to current needs, so the buffer +// won't be overgrown after peak loads. type BytesPipe struct { + mu sync.Mutex + wait *sync.Cond buf [][]byte // slice of byte-slices of buffered data lastRead int // index in the first slice to a read point bufLen int // length of data buffered over the slices + closeErr error // error to return from next Read. set to nil if not closed. } // NewBytesPipe creates new BytesPipe, initialized by specified slice. @@ -20,15 +36,23 @@ func NewBytesPipe(buf []byte) *BytesPipe { if cap(buf) == 0 { buf = make([]byte, 0, 64) } - return &BytesPipe{ + bp := &BytesPipe{ buf: [][]byte{buf[:0]}, } + bp.wait = sync.NewCond(&bp.mu) + return bp } // Write writes p to BytesPipe. // It can allocate new []byte slices in a process of writing. -func (bp *BytesPipe) Write(p []byte) (n int, err error) { +func (bp *BytesPipe) Write(p []byte) (int, error) { + bp.mu.Lock() + defer bp.mu.Unlock() + written := 0 for { + if bp.closeErr != nil { + return written, ErrClosed + } // write data to the last buffer b := bp.buf[len(bp.buf)-1] // copy data to the current empty allocated area @@ -38,6 +62,8 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) { // include written data in last buffer bp.buf[len(bp.buf)-1] = b[:len(b)+n] + written += n + // if there was enough room to write all then break if len(p) == n { break @@ -45,15 +71,40 @@ func (bp *BytesPipe) Write(p []byte) (n int, err error) { // more data: write to the next slice p = p[n:] + + // block if too much data is still in the buffer + for bp.bufLen >= blockThreshold { + bp.wait.Wait() + } + // allocate slice that has twice the size of the last unless maximum reached nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) - if maxCap < nextCap { + if nextCap > maxCap { nextCap = maxCap } // add new byte slice to the buffers slice and continue writing bp.buf = append(bp.buf, make([]byte, 0, nextCap)) } - return + bp.wait.Broadcast() + return written, nil +} + +// CloseWithError causes further reads from a BytesPipe to return immediately. +func (bp *BytesPipe) CloseWithError(err error) error { + bp.mu.Lock() + if err != nil { + bp.closeErr = err + } else { + bp.closeErr = io.EOF + } + bp.wait.Broadcast() + bp.mu.Unlock() + return nil +} + +// Close causes further reads from a BytesPipe to return immediately. +func (bp *BytesPipe) Close() error { + return bp.CloseWithError(nil) } func (bp *BytesPipe) len() int { @@ -63,6 +114,17 @@ func (bp *BytesPipe) len() int { // Read reads bytes from BytesPipe. // Data could be read only once. func (bp *BytesPipe) Read(p []byte) (n int, err error) { + bp.mu.Lock() + defer bp.mu.Unlock() + if bp.len() == 0 { + if bp.closeErr != nil { + return 0, bp.closeErr + } + bp.wait.Wait() + if bp.len() == 0 && bp.closeErr != nil { + return 0, bp.closeErr + } + } for { read := copy(p, bp.buf[0][bp.lastRead:]) n += read @@ -85,5 +147,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) { bp.buf[0] = nil // throw away old slice bp.buf = bp.buf[1:] // switch to next } + bp.wait.Broadcast() return } diff --git a/ioutils/bytespipe_test.go b/ioutils/bytespipe_test.go index 62b1a18..b051139 100644 --- a/ioutils/bytespipe_test.go +++ b/ioutils/bytespipe_test.go @@ -3,7 +3,9 @@ package ioutils import ( "crypto/sha1" "encoding/hex" + "math/rand" "testing" + "time" ) func TestBytesPipeRead(t *testing.T) { @@ -86,25 +88,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) { // write/read through buffer buf := NewBytesPipe(nil) hash.Reset() + + done := make(chan struct{}) + + go func() { + // random delay before read starts + <-time.After(time.Duration(rand.Intn(10)) * time.Millisecond) + for i := 0; ; i++ { + p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)]) + n, _ := buf.Read(p) + if n == 0 { + break + } + hash.Write(p[:n]) + } + + close(done) + }() + for i := 0; i < c.iterations; i++ { for w := 0; w < c.writesPerLoop; w++ { buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]]) } - for r := 0; r < c.readsPerLoop; r++ { - p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)]) - n, _ := buf.Read(p) - hash.Write(p[:n]) - } - } - // read rest of the data from buffer - for i := 0; ; i++ { - p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)]) - n, _ := buf.Read(p) - if n == 0 { - break - } - hash.Write(p[:n]) } + buf.Close() + <-done + actual := hex.EncodeToString(hash.Sum(nil)) if expected != actual { @@ -116,24 +125,32 @@ func TestBytesPipeWriteRandomChunks(t *testing.T) { func BenchmarkBytesPipeWrite(b *testing.B) { for i := 0; i < b.N; i++ { + readBuf := make([]byte, 1024) buf := NewBytesPipe(nil) + go func() { + var err error + for err == nil { + _, err = buf.Read(readBuf) + } + }() for j := 0; j < 1000; j++ { buf.Write([]byte("pretty short line, because why not?")) } + buf.Close() } } func BenchmarkBytesPipeRead(b *testing.B) { - rd := make([]byte, 1024) + rd := make([]byte, 512) for i := 0; i < b.N; i++ { b.StopTimer() buf := NewBytesPipe(nil) - for j := 0; j < 1000; j++ { + for j := 0; j < 500; j++ { buf.Write(make([]byte, 1024)) } b.StartTimer() for j := 0; j < 1000; j++ { - if n, _ := buf.Read(rd); n != 1024 { + if n, _ := buf.Read(rd); n != 512 { b.Fatalf("Wrong number of bytes: %d", n) } } diff --git a/ioutils/readers.go b/ioutils/readers.go index 54dd312..b4544de 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -4,7 +4,6 @@ import ( "crypto/sha256" "encoding/hex" "io" - "sync" ) type readCloserWrapper struct { @@ -45,92 +44,6 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { } } -// bufReader allows the underlying reader to continue to produce -// output by pre-emptively reading from the wrapped reader. -// This is achieved by buffering this data in bufReader's -// expanding buffer. -type bufReader struct { - sync.Mutex - buf io.ReadWriter - reader io.Reader - err error - wait sync.Cond - drainBuf []byte -} - -// NewBufReader returns a new bufReader. -func NewBufReader(r io.Reader) io.ReadCloser { - reader := &bufReader{ - buf: NewBytesPipe(nil), - reader: r, - drainBuf: make([]byte, 1024), - } - reader.wait.L = &reader.Mutex - go reader.drain() - return reader -} - -// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer. -func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser { - reader := &bufReader{ - buf: buffer, - drainBuf: drainBuffer, - reader: r, - } - reader.wait.L = &reader.Mutex - go reader.drain() - return reader -} - -func (r *bufReader) drain() { - for { - //Call to scheduler is made to yield from this goroutine. - //This avoids goroutine looping here when n=0,err=nil, fixes code hangs when run with GCC Go. - callSchedulerIfNecessary() - n, err := r.reader.Read(r.drainBuf) - r.Lock() - if err != nil { - r.err = err - } else { - if n == 0 { - // nothing written, no need to signal - r.Unlock() - continue - } - r.buf.Write(r.drainBuf[:n]) - } - r.wait.Signal() - r.Unlock() - if err != nil { - break - } - } -} - -func (r *bufReader) Read(p []byte) (n int, err error) { - r.Lock() - defer r.Unlock() - for { - n, err = r.buf.Read(p) - if n > 0 { - return n, err - } - if r.err != nil { - return 0, r.err - } - r.wait.Wait() - } -} - -// Close closes the bufReader -func (r *bufReader) Close() error { - closer, ok := r.reader.(io.ReadCloser) - if !ok { - return nil - } - return closer.Close() -} - // HashData returns the sha256 sum of src. func HashData(src io.Reader) (string, error) { h := sha256.New() diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go index 5c26a2a..5be68cb 100644 --- a/ioutils/readers_test.go +++ b/ioutils/readers_test.go @@ -1,13 +1,9 @@ package ioutils import ( - "bytes" "fmt" - "io" - "io/ioutil" "strings" "testing" - "time" ) // Implement io.Reader @@ -58,101 +54,6 @@ func TestReaderErrWrapperRead(t *testing.T) { } } -func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { - reader, writer := io.Pipe() - - drainBuffer := make([]byte, 1024) - buffer := NewBytesPipe(nil) - bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer) - - // Write everything down to a Pipe - // Usually, a pipe should block but because of the buffered reader, - // the writes will go through - done := make(chan bool) - go func() { - writer.Write([]byte("hello world")) - writer.Close() - done <- true - }() - - // Drain the reader *after* everything has been written, just to verify - // it is indeed buffering - select { - case <-done: - case <-time.After(1 * time.Second): - t.Fatal("timeout") - } - - output, err := ioutil.ReadAll(bufreader) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(output, []byte("hello world")) { - t.Error(string(output)) - } -} - -func TestBufReader(t *testing.T) { - reader, writer := io.Pipe() - bufreader := NewBufReader(reader) - - // Write everything down to a Pipe - // Usually, a pipe should block but because of the buffered reader, - // the writes will go through - done := make(chan bool) - go func() { - writer.Write([]byte("hello world")) - writer.Close() - done <- true - }() - - // Drain the reader *after* everything has been written, just to verify - // it is indeed buffering - <-done - output, err := ioutil.ReadAll(bufreader) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(output, []byte("hello world")) { - t.Error(string(output)) - } -} - -func TestBufReaderCloseWithNonReaderCloser(t *testing.T) { - reader := strings.NewReader("buffer") - bufreader := NewBufReader(reader) - - if err := bufreader.Close(); err != nil { - t.Fatal(err) - } - -} - -// implements io.ReadCloser -type simpleReaderCloser struct { - err error -} - -func (r *simpleReaderCloser) Read(p []byte) (n int, err error) { - return 0, r.err -} - -func (r *simpleReaderCloser) Close() error { - r.err = io.EOF - return nil -} - -func TestBufReaderCloseWithReaderCloser(t *testing.T) { - reader := &simpleReaderCloser{} - bufreader := NewBufReader(reader) - - err := bufreader.Close() - if err != nil { - t.Fatal(err) - } - -} - func TestHashData(t *testing.T) { reader := strings.NewReader("hash-me") actual, err := HashData(reader) @@ -164,61 +65,3 @@ func TestHashData(t *testing.T) { t.Fatalf("Expecting %s, got %s", expected, actual) } } - -type repeatedReader struct { - readCount int - maxReads int - data []byte -} - -func newRepeatedReader(max int, data []byte) *repeatedReader { - return &repeatedReader{0, max, data} -} - -func (r *repeatedReader) Read(p []byte) (int, error) { - if r.readCount >= r.maxReads { - return 0, io.EOF - } - r.readCount++ - n := copy(p, r.data) - return n, nil -} - -func testWithData(data []byte, reads int) { - reader := newRepeatedReader(reads, data) - bufReader := NewBufReader(reader) - io.Copy(ioutil.Discard, bufReader) -} - -func Benchmark1M10BytesReads(b *testing.B) { - reads := 1000000 - readSize := int64(10) - data := make([]byte, readSize) - b.SetBytes(readSize * int64(reads)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - testWithData(data, reads) - } -} - -func Benchmark1M1024BytesReads(b *testing.B) { - reads := 1000000 - readSize := int64(1024) - data := make([]byte, readSize) - b.SetBytes(readSize * int64(reads)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - testWithData(data, reads) - } -} - -func Benchmark10k32KBytesReads(b *testing.B) { - reads := 10000 - readSize := int64(32 * 1024) - data := make([]byte, readSize) - b.SetBytes(readSize * int64(reads)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - testWithData(data, reads) - } -}