From 434c274741d24387e0e48b0cb244a82ecb858ed6 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 9 Sep 2015 09:47:24 -0700 Subject: [PATCH 1/3] Add BytesPipe datastructure to ioutils Signed-off-by: Alexander Morozov --- ioutils/bytespipe.go | 82 +++++++++++++++++++++++++++++++++++++++ ioutils/bytespipe_test.go | 81 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 ioutils/bytespipe.go create mode 100644 ioutils/bytespipe_test.go diff --git a/ioutils/bytespipe.go b/ioutils/bytespipe.go new file mode 100644 index 0000000..ab06fa1 --- /dev/null +++ b/ioutils/bytespipe.go @@ -0,0 +1,82 @@ +package ioutils + +const maxCap = 10 * 1e6 + +// BytesPipe is io.ReadWriter which works similary to pipe(queue). +// All written data could be read only once. Also BytesPipe trying to adjust +// internal []byte slice to current needs, so there won't be overgrown buffer +// after highload peak. +// BytesPipe isn't goroutine-safe, caller must synchronize it if needed. +type BytesPipe struct { + buf []byte + lastRead int +} + +// NewBytesPipe creates new BytesPipe, initialized by specified slice. +// If buf is nil, then it will be initialized with slice which cap is 64. +// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). +func NewBytesPipe(buf []byte) *BytesPipe { + if cap(buf) == 0 { + buf = make([]byte, 0, 64) + } + return &BytesPipe{ + buf: buf[:0], + } +} + +func (bp *BytesPipe) grow(n int) { + if len(bp.buf)+n > cap(bp.buf) { + // not enough space + var buf []byte + remain := bp.len() + if remain+n <= cap(bp.buf)/2 { + // enough space in current buffer, just move data to head + copy(bp.buf, bp.buf[bp.lastRead:]) + buf = bp.buf[:remain] + } else { + // reallocate buffer + buf = make([]byte, remain, 2*cap(bp.buf)+n) + copy(buf, bp.buf[bp.lastRead:]) + } + bp.buf = buf + bp.lastRead = 0 + } +} + +// Write writes p to BytesPipe. +// It can increase cap of internal []byte slice in a process of writing. +func (bp *BytesPipe) Write(p []byte) (n int, err error) { + bp.grow(len(p)) + bp.buf = append(bp.buf, p...) + return +} + +func (bp *BytesPipe) len() int { + return len(bp.buf) - bp.lastRead +} + +func (bp *BytesPipe) crop() { + // shortcut for empty buffer + if bp.lastRead == len(bp.buf) { + bp.lastRead = 0 + bp.buf = bp.buf[:0] + } + r := bp.len() + // if we have too large buffer for too small data + if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 { + copy(bp.buf, bp.buf[bp.lastRead:]) + // will use same underlying slice until reach cap + bp.buf = bp.buf[:r : cap(bp.buf)/2] + bp.lastRead = 0 + } +} + +// Read reads bytes from BytesPipe. +// Data could be read only once. +// Internal []byte slice could be shrinked. +func (bp *BytesPipe) Read(p []byte) (n int, err error) { + n = copy(p, bp.buf[bp.lastRead:]) + bp.lastRead += n + bp.crop() + return +} diff --git a/ioutils/bytespipe_test.go b/ioutils/bytespipe_test.go new file mode 100644 index 0000000..c7cf795 --- /dev/null +++ b/ioutils/bytespipe_test.go @@ -0,0 +1,81 @@ +package ioutils + +import "testing" + +func TestBytesPipeRead(t *testing.T) { + buf := NewBytesPipe(nil) + buf.Write([]byte("12")) + buf.Write([]byte("34")) + buf.Write([]byte("56")) + buf.Write([]byte("78")) + buf.Write([]byte("90")) + rd := make([]byte, 4) + n, err := buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 4 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4) + } + if string(rd) != "1234" { + t.Fatalf("Read %s, but must be %s", rd, "1234") + } + n, err = buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 4 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4) + } + if string(rd) != "5678" { + t.Fatalf("Read %s, but must be %s", rd, "5679") + } + n, err = buf.Read(rd) + if err != nil { + t.Fatal(err) + } + if n != 2 { + t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2) + } + if string(rd[:n]) != "90" { + t.Fatalf("Read %s, but must be %s", rd, "90") + } +} + +func TestBytesPipeWrite(t *testing.T) { + buf := NewBytesPipe(nil) + buf.Write([]byte("12")) + buf.Write([]byte("34")) + buf.Write([]byte("56")) + buf.Write([]byte("78")) + buf.Write([]byte("90")) + if string(buf.buf) != "1234567890" { + t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890") + } +} + +func BenchmarkBytesPipeWrite(b *testing.B) { + for i := 0; i < b.N; i++ { + buf := NewBytesPipe(nil) + for j := 0; j < 1000; j++ { + buf.Write([]byte("pretty short line, because why not?")) + } + } +} + +func BenchmarkBytesPipeRead(b *testing.B) { + rd := make([]byte, 1024) + for i := 0; i < b.N; i++ { + b.StopTimer() + buf := NewBytesPipe(nil) + for j := 0; j < 1000; j++ { + buf.Write(make([]byte, 1024)) + } + b.StartTimer() + for j := 0; j < 1000; j++ { + if n, _ := buf.Read(rd); n != 1024 { + b.Fatalf("Wrong number of bytes: %d", n) + } + } + } +} From 46b458fe3b7a1e61ed209462dc47c5f646b35f1a Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 9 Sep 2015 09:48:34 -0700 Subject: [PATCH 2/3] Refactor bufReader to use BytesPipe Signed-off-by: Alexander Morozov --- ioutils/readers.go | 120 +++++----------------------------------- ioutils/readers_test.go | 11 +++- 2 files changed, 23 insertions(+), 108 deletions(-) diff --git a/ioutils/readers.go b/ioutils/readers.go index bb96fe1..3e68410 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -1,13 +1,10 @@ package ioutils import ( - "bytes" "crypto/sha256" "encoding/hex" "io" - "math/rand" "sync" - "time" "github.com/docker/docker/pkg/random" ) @@ -58,31 +55,19 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { // expanding buffer. type bufReader struct { sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond - drainBuf []byte - reuseBuf []byte - maxReuse int64 - resetTimeout time.Duration - bufLenResetThreshold int64 - maxReadDataReset int64 + buf io.ReadWriter + reader io.Reader + err error + wait sync.Cond + drainBuf []byte } // NewBufReader returns a new bufReader. func NewBufReader(r io.Reader) io.ReadCloser { - timeout := rand.New(rndSrc).Intn(90) - reader := &bufReader{ - buf: &bytes.Buffer{}, - drainBuf: make([]byte, 1024), - reuseBuf: make([]byte, 4096), - maxReuse: 1000, - resetTimeout: time.Duration(timeout) * time.Second, - bufLenResetThreshold: 100 * 1024, - maxReadDataReset: 10 * 1024 * 1024, - reader: r, + buf: NewBytesPipe(nil), + reader: r, + drainBuf: make([]byte, 1024), } reader.wait.L = &reader.Mutex go reader.drain() @@ -90,7 +75,7 @@ func NewBufReader(r io.Reader) io.ReadCloser { } // NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer. -func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) io.ReadCloser { +func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser { reader := &bufReader{ buf: buffer, drainBuf: drainBuffer, @@ -102,94 +87,19 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer * } func (r *bufReader) drain() { - var ( - duration time.Duration - lastReset time.Time - now time.Time - reset bool - bufLen int64 - dataSinceReset int64 - maxBufLen int64 - reuseBufLen int64 - reuseCount int64 - ) - reuseBufLen = int64(len(r.reuseBuf)) - lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) - dataSinceReset += int64(n) r.Lock() - bufLen = int64(r.buf.Len()) - if bufLen > maxBufLen { - maxBufLen = bufLen - } - - // Avoid unbounded growth of the buffer over time. - // This has been discovered to be the only non-intrusive - // solution to the unbounded growth of the buffer. - // Alternative solutions such as compression, multiple - // buffers, channels and other similar pieces of code - // were reducing throughput, overall Docker performance - // or simply crashed Docker. - // This solution releases the buffer when specific - // conditions are met to avoid the continuous resizing - // of the buffer for long lived containers. - // - // Move data to the front of the buffer if it's - // smaller than what reuseBuf can store - if bufLen > 0 && reuseBufLen >= bufLen { - n, _ := r.buf.Read(r.reuseBuf) - r.buf.Write(r.reuseBuf[0:n]) - // Take action if the buffer has been reused too many - // times and if there's data in the buffer. - // The timeout is also used as means to avoid doing - // these operations more often or less often than - // required. - // The various conditions try to detect heavy activity - // in the buffer which might be indicators of heavy - // growth of the buffer. - } else if reuseCount >= r.maxReuse && bufLen > 0 { - now = time.Now() - duration = now.Sub(lastReset) - timeoutReached := duration >= r.resetTimeout - - // The timeout has been reached and the - // buffered data couldn't be moved to the front - // of the buffer, so the buffer gets reset. - if timeoutReached && bufLen > reuseBufLen { - reset = true - } - // The amount of buffered data is too high now, - // reset the buffer. - if timeoutReached && maxBufLen >= r.bufLenResetThreshold { - reset = true - } - // Reset the buffer if a certain amount of - // data has gone through the buffer since the - // last reset. - if timeoutReached && dataSinceReset >= r.maxReadDataReset { - reset = true - } - // The buffered data is moved to a fresh buffer, - // swap the old buffer with the new one and - // reset all counters. - if reset { - newbuf := &bytes.Buffer{} - newbuf.ReadFrom(r.buf) - r.buf = newbuf - lastReset = now - reset = false - dataSinceReset = 0 - maxBufLen = 0 - reuseCount = 0 - } - } if err != nil { r.err = err } else { - r.buf.Write(r.drainBuf[0:n]) + if n == 0 { + // nothing written, no need to signal + r.Unlock() + continue + } + r.buf.Write(r.drainBuf[:n]) } - reuseCount++ r.wait.Signal() r.Unlock() callSchedulerIfNecessary() diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go index 0a39b6e..5eeda96 100644 --- a/ioutils/readers_test.go +++ b/ioutils/readers_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "strings" "testing" + "time" ) // Implement io.Reader @@ -61,8 +62,8 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { reader, writer := io.Pipe() drainBuffer := make([]byte, 1024) - buffer := bytes.Buffer{} - bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer) + buffer := NewBytesPipe(nil) + bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer) // Write everything down to a Pipe // Usually, a pipe should block but because of the buffered reader, @@ -76,7 +77,11 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { // Drain the reader *after* everything has been written, just to verify // it is indeed buffering - <-done + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("timeout") + } output, err := ioutil.ReadAll(bufreader) if err != nil { From 3c6dcfb6ca2e989c33b9bc60fbbc14571d58fc2c Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 16 Sep 2015 16:51:11 -0800 Subject: [PATCH 3/3] Make bytesPipe use linear allocations Signed-off-by: Tonis Tiigi --- ioutils/bytespipe.go | 109 ++++++++++++++++++++------------------ ioutils/bytespipe_test.go | 64 +++++++++++++++++++++- ioutils/readers.go | 4 -- 3 files changed, 120 insertions(+), 57 deletions(-) diff --git a/ioutils/bytespipe.go b/ioutils/bytespipe.go index ab06fa1..932e1d1 100644 --- a/ioutils/bytespipe.go +++ b/ioutils/bytespipe.go @@ -1,15 +1,16 @@ package ioutils -const maxCap = 10 * 1e6 +const maxCap = 1e6 -// BytesPipe is io.ReadWriter which works similary to pipe(queue). -// All written data could be read only once. Also BytesPipe trying to adjust -// internal []byte slice to current needs, so there won't be overgrown buffer -// after highload peak. +// BytesPipe is io.ReadWriter which works similarly to pipe(queue). +// All written data could be read only once. Also BytesPipe is allocating +// and releasing new byte slices to adjust to current needs, so there won't be +// overgrown buffer after high load peak. // BytesPipe isn't goroutine-safe, caller must synchronize it if needed. type BytesPipe struct { - buf []byte - lastRead int + buf [][]byte // slice of byte-slices of buffered data + lastRead int // index in the first slice to a read point + bufLen int // length of data buffered over the slices } // NewBytesPipe creates new BytesPipe, initialized by specified slice. @@ -20,63 +21,69 @@ func NewBytesPipe(buf []byte) *BytesPipe { buf = make([]byte, 0, 64) } return &BytesPipe{ - buf: buf[:0], - } -} - -func (bp *BytesPipe) grow(n int) { - if len(bp.buf)+n > cap(bp.buf) { - // not enough space - var buf []byte - remain := bp.len() - if remain+n <= cap(bp.buf)/2 { - // enough space in current buffer, just move data to head - copy(bp.buf, bp.buf[bp.lastRead:]) - buf = bp.buf[:remain] - } else { - // reallocate buffer - buf = make([]byte, remain, 2*cap(bp.buf)+n) - copy(buf, bp.buf[bp.lastRead:]) - } - bp.buf = buf - bp.lastRead = 0 + buf: [][]byte{buf[:0]}, } } // Write writes p to BytesPipe. -// It can increase cap of internal []byte slice in a process of writing. +// It can allocate new []byte slices in a process of writing. func (bp *BytesPipe) Write(p []byte) (n int, err error) { - bp.grow(len(p)) - bp.buf = append(bp.buf, p...) + for { + // write data to the last buffer + b := bp.buf[len(bp.buf)-1] + // copy data to the current empty allocated area + n := copy(b[len(b):cap(b)], p) + // increment buffered data length + bp.bufLen += n + // include written data in last buffer + bp.buf[len(bp.buf)-1] = b[:len(b)+n] + + // if there was enough room to write all then break + if len(p) == n { + break + } + + // more data: write to the next slice + p = p[n:] + // allocate slice that has twice the size of the last unless maximum reached + nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) + if maxCap < nextCap { + nextCap = maxCap + } + // add new byte slice to the buffers slice and continue writing + bp.buf = append(bp.buf, make([]byte, 0, nextCap)) + } return } func (bp *BytesPipe) len() int { - return len(bp.buf) - bp.lastRead -} - -func (bp *BytesPipe) crop() { - // shortcut for empty buffer - if bp.lastRead == len(bp.buf) { - bp.lastRead = 0 - bp.buf = bp.buf[:0] - } - r := bp.len() - // if we have too large buffer for too small data - if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 { - copy(bp.buf, bp.buf[bp.lastRead:]) - // will use same underlying slice until reach cap - bp.buf = bp.buf[:r : cap(bp.buf)/2] - bp.lastRead = 0 - } + return bp.bufLen - bp.lastRead } // Read reads bytes from BytesPipe. // Data could be read only once. -// Internal []byte slice could be shrinked. func (bp *BytesPipe) Read(p []byte) (n int, err error) { - n = copy(p, bp.buf[bp.lastRead:]) - bp.lastRead += n - bp.crop() + for { + read := copy(p, bp.buf[0][bp.lastRead:]) + n += read + bp.lastRead += read + if bp.len() == 0 { + // we have read everything. reset to the beginning. + bp.lastRead = 0 + bp.bufLen -= len(bp.buf[0]) + bp.buf[0] = bp.buf[0][:0] + break + } + // break if everything was read + if len(p) == read { + break + } + // more buffered data and more asked. read from next slice. + p = p[read:] + bp.lastRead = 0 + bp.bufLen -= len(bp.buf[0]) + bp.buf[0] = nil // throw away old slice + bp.buf = bp.buf[1:] // switch to next + } return } diff --git a/ioutils/bytespipe_test.go b/ioutils/bytespipe_test.go index c7cf795..62b1a18 100644 --- a/ioutils/bytespipe_test.go +++ b/ioutils/bytespipe_test.go @@ -1,6 +1,10 @@ package ioutils -import "testing" +import ( + "crypto/sha1" + "encoding/hex" + "testing" +) func TestBytesPipeRead(t *testing.T) { buf := NewBytesPipe(nil) @@ -49,11 +53,67 @@ func TestBytesPipeWrite(t *testing.T) { buf.Write([]byte("56")) buf.Write([]byte("78")) buf.Write([]byte("90")) - if string(buf.buf) != "1234567890" { + if string(buf.buf[0]) != "1234567890" { t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890") } } +// Write and read in different speeds/chunk sizes and check valid data is read. +func TestBytesPipeWriteRandomChunks(t *testing.T) { + cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{ + {100, 10, 1}, + {1000, 10, 5}, + {1000, 100, 0}, + {1000, 5, 6}, + {10000, 50, 25}, + } + + testMessage := []byte("this is a random string for testing") + // random slice sizes to read and write + writeChunks := []int{25, 35, 15, 20} + readChunks := []int{5, 45, 20, 25} + + for _, c := range cases { + // first pass: write directly to hash + hash := sha1.New() + for i := 0; i < c.iterations*c.writesPerLoop; i++ { + if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil { + t.Fatal(err) + } + } + expected := hex.EncodeToString(hash.Sum(nil)) + + // write/read through buffer + buf := NewBytesPipe(nil) + hash.Reset() + for i := 0; i < c.iterations; i++ { + for w := 0; w < c.writesPerLoop; w++ { + buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]]) + } + for r := 0; r < c.readsPerLoop; r++ { + p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)]) + n, _ := buf.Read(p) + hash.Write(p[:n]) + } + } + // read rest of the data from buffer + for i := 0; ; i++ { + p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)]) + n, _ := buf.Read(p) + if n == 0 { + break + } + hash.Write(p[:n]) + } + actual := hex.EncodeToString(hash.Sum(nil)) + + if expected != actual { + t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual) + } + + } +} + func BenchmarkBytesPipeWrite(b *testing.B) { for i := 0; i < b.N; i++ { buf := NewBytesPipe(nil) diff --git a/ioutils/readers.go b/ioutils/readers.go index 3e68410..6b9637a 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -5,12 +5,8 @@ import ( "encoding/hex" "io" "sync" - - "github.com/docker/docker/pkg/random" ) -var rndSrc = random.NewSource() - type readCloserWrapper struct { io.Reader closer func() error