From 0706016675ff1d1f4bd05ba07a2b9e6c1bcff700 Mon Sep 17 00:00:00 2001 From: unclejack Date: Fri, 12 Dec 2014 22:10:09 +0200 Subject: [PATCH 1/2] pkg/ioutils: add tests for BufReader Signed-off-by: Cristian Staretu --- ioutils/readers_test.go | 58 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go index a7a2dad..0af978e 100644 --- a/ioutils/readers_test.go +++ b/ioutils/readers_test.go @@ -32,3 +32,61 @@ func TestBufReader(t *testing.T) { t.Error(string(output)) } } + +type repeatedReader struct { + readCount int + maxReads int + data []byte +} + +func newRepeatedReader(max int, data []byte) *repeatedReader { + return &repeatedReader{0, max, data} +} + +func (r *repeatedReader) Read(p []byte) (int, error) { + if r.readCount >= r.maxReads { + return 0, io.EOF + } + r.readCount++ + n := copy(p, r.data) + return n, nil +} + +func testWithData(data []byte, reads int) { + reader := newRepeatedReader(reads, data) + bufReader := NewBufReader(reader) + io.Copy(ioutil.Discard, bufReader) +} + +func Benchmark1M10BytesReads(b *testing.B) { + reads := 1000000 + readSize := int64(10) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} + +func Benchmark1M1024BytesReads(b *testing.B) { + reads := 1000000 + readSize := int64(1024) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} + +func Benchmark10k32KBytesReads(b *testing.B) { + reads := 10000 + readSize := int64(32 * 1024) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} From 3b45ed0fee7f25ef4644dc929730507f6b696516 Mon Sep 17 00:00:00 2001 From: unclejack Date: Mon, 8 Dec 2014 16:10:36 +0200 Subject: [PATCH 2/2] pkg/ioutils: avoid huge Buffer growth in bufreader Signed-off-by: Cristian Staretu --- ioutils/readers.go | 119 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 111 insertions(+), 8 deletions(-) diff --git a/ioutils/readers.go b/ioutils/readers.go index 22f46fb..58ff1af 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -2,8 +2,11 @@ package ioutils import ( "bytes" + "crypto/rand" "io" + "math/big" "sync" + "time" ) type readCloserWrapper struct { @@ -42,20 +45,40 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { } } +// bufReader allows the underlying reader to continue to produce +// output by pre-emptively reading from the wrapped reader. +// This is achieved by buffering this data in bufReader's +// expanding buffer. type bufReader struct { sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond - drainBuf []byte + buf *bytes.Buffer + reader io.Reader + err error + wait sync.Cond + drainBuf []byte + reuseBuf []byte + maxReuse int64 + resetTimeout time.Duration + bufLenResetThreshold int64 + maxReadDataReset int64 } func NewBufReader(r io.Reader) *bufReader { + var timeout int + if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil { + timeout = int(randVal.Int64()) + 180 + } else { + timeout = 300 + } reader := &bufReader{ - buf: &bytes.Buffer{}, - drainBuf: make([]byte, 1024), - reader: r, + buf: &bytes.Buffer{}, + drainBuf: make([]byte, 1024), + reuseBuf: make([]byte, 4096), + maxReuse: 1000, + resetTimeout: time.Second * time.Duration(timeout), + bufLenResetThreshold: 100 * 1024, + maxReadDataReset: 10 * 1024 * 1024, + reader: r, } reader.wait.L = &reader.Mutex go reader.drain() @@ -74,14 +97,94 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer * } func (r *bufReader) drain() { + var ( + duration time.Duration + lastReset time.Time + now time.Time + reset bool + bufLen int64 + dataSinceReset int64 + maxBufLen int64 + reuseBufLen int64 + reuseCount int64 + ) + reuseBufLen = int64(len(r.reuseBuf)) + lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) + dataSinceReset += int64(n) r.Lock() + bufLen = int64(r.buf.Len()) + if bufLen > maxBufLen { + maxBufLen = bufLen + } + + // Avoid unbounded growth of the buffer over time. + // This has been discovered to be the only non-intrusive + // solution to the unbounded growth of the buffer. + // Alternative solutions such as compression, multiple + // buffers, channels and other similar pieces of code + // were reducing throughput, overall Docker performance + // or simply crashed Docker. + // This solution releases the buffer when specific + // conditions are met to avoid the continuous resizing + // of the buffer for long lived containers. + // + // Move data to the front of the buffer if it's + // smaller than what reuseBuf can store + if bufLen > 0 && reuseBufLen >= bufLen { + n, _ := r.buf.Read(r.reuseBuf) + r.buf.Write(r.reuseBuf[0:n]) + // Take action if the buffer has been reused too many + // times and if there's data in the buffer. + // The timeout is also used as means to avoid doing + // these operations more often or less often than + // required. + // The various conditions try to detect heavy activity + // in the buffer which might be indicators of heavy + // growth of the buffer. + } else if reuseCount >= r.maxReuse && bufLen > 0 { + now = time.Now() + duration = now.Sub(lastReset) + timeoutReached := duration >= r.resetTimeout + + // The timeout has been reached and the + // buffered data couldn't be moved to the front + // of the buffer, so the buffer gets reset. + if timeoutReached && bufLen > reuseBufLen { + reset = true + } + // The amount of buffered data is too high now, + // reset the buffer. + if timeoutReached && maxBufLen >= r.bufLenResetThreshold { + reset = true + } + // Reset the buffer if a certain amount of + // data has gone through the buffer since the + // last reset. + if timeoutReached && dataSinceReset >= r.maxReadDataReset { + reset = true + } + // The buffered data is moved to a fresh buffer, + // swap the old buffer with the new one and + // reset all counters. + if reset { + newbuf := &bytes.Buffer{} + newbuf.ReadFrom(r.buf) + r.buf = newbuf + lastReset = now + reset = false + dataSinceReset = 0 + maxBufLen = 0 + reuseCount = 0 + } + } if err != nil { r.err = err } else { r.buf.Write(r.drainBuf[0:n]) } + reuseCount++ r.wait.Signal() r.Unlock() if err != nil {