From 46b458fe3b7a1e61ed209462dc47c5f646b35f1a Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 9 Sep 2015 09:48:34 -0700 Subject: [PATCH] Refactor bufReader to use BytesPipe Signed-off-by: Alexander Morozov --- ioutils/readers.go | 120 +++++----------------------------------- ioutils/readers_test.go | 11 +++- 2 files changed, 23 insertions(+), 108 deletions(-) diff --git a/ioutils/readers.go b/ioutils/readers.go index bb96fe1..3e68410 100644 --- a/ioutils/readers.go +++ b/ioutils/readers.go @@ -1,13 +1,10 @@ package ioutils import ( - "bytes" "crypto/sha256" "encoding/hex" "io" - "math/rand" "sync" - "time" "github.com/docker/docker/pkg/random" ) @@ -58,31 +55,19 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { // expanding buffer. type bufReader struct { sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond - drainBuf []byte - reuseBuf []byte - maxReuse int64 - resetTimeout time.Duration - bufLenResetThreshold int64 - maxReadDataReset int64 + buf io.ReadWriter + reader io.Reader + err error + wait sync.Cond + drainBuf []byte } // NewBufReader returns a new bufReader. func NewBufReader(r io.Reader) io.ReadCloser { - timeout := rand.New(rndSrc).Intn(90) - reader := &bufReader{ - buf: &bytes.Buffer{}, - drainBuf: make([]byte, 1024), - reuseBuf: make([]byte, 4096), - maxReuse: 1000, - resetTimeout: time.Duration(timeout) * time.Second, - bufLenResetThreshold: 100 * 1024, - maxReadDataReset: 10 * 1024 * 1024, - reader: r, + buf: NewBytesPipe(nil), + reader: r, + drainBuf: make([]byte, 1024), } reader.wait.L = &reader.Mutex go reader.drain() @@ -90,7 +75,7 @@ func NewBufReader(r io.Reader) io.ReadCloser { } // NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer. -func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) io.ReadCloser { +func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser { reader := &bufReader{ buf: buffer, drainBuf: drainBuffer, @@ -102,94 +87,19 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer * } func (r *bufReader) drain() { - var ( - duration time.Duration - lastReset time.Time - now time.Time - reset bool - bufLen int64 - dataSinceReset int64 - maxBufLen int64 - reuseBufLen int64 - reuseCount int64 - ) - reuseBufLen = int64(len(r.reuseBuf)) - lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) - dataSinceReset += int64(n) r.Lock() - bufLen = int64(r.buf.Len()) - if bufLen > maxBufLen { - maxBufLen = bufLen - } - - // Avoid unbounded growth of the buffer over time. - // This has been discovered to be the only non-intrusive - // solution to the unbounded growth of the buffer. - // Alternative solutions such as compression, multiple - // buffers, channels and other similar pieces of code - // were reducing throughput, overall Docker performance - // or simply crashed Docker. - // This solution releases the buffer when specific - // conditions are met to avoid the continuous resizing - // of the buffer for long lived containers. - // - // Move data to the front of the buffer if it's - // smaller than what reuseBuf can store - if bufLen > 0 && reuseBufLen >= bufLen { - n, _ := r.buf.Read(r.reuseBuf) - r.buf.Write(r.reuseBuf[0:n]) - // Take action if the buffer has been reused too many - // times and if there's data in the buffer. - // The timeout is also used as means to avoid doing - // these operations more often or less often than - // required. - // The various conditions try to detect heavy activity - // in the buffer which might be indicators of heavy - // growth of the buffer. - } else if reuseCount >= r.maxReuse && bufLen > 0 { - now = time.Now() - duration = now.Sub(lastReset) - timeoutReached := duration >= r.resetTimeout - - // The timeout has been reached and the - // buffered data couldn't be moved to the front - // of the buffer, so the buffer gets reset. - if timeoutReached && bufLen > reuseBufLen { - reset = true - } - // The amount of buffered data is too high now, - // reset the buffer. - if timeoutReached && maxBufLen >= r.bufLenResetThreshold { - reset = true - } - // Reset the buffer if a certain amount of - // data has gone through the buffer since the - // last reset. - if timeoutReached && dataSinceReset >= r.maxReadDataReset { - reset = true - } - // The buffered data is moved to a fresh buffer, - // swap the old buffer with the new one and - // reset all counters. - if reset { - newbuf := &bytes.Buffer{} - newbuf.ReadFrom(r.buf) - r.buf = newbuf - lastReset = now - reset = false - dataSinceReset = 0 - maxBufLen = 0 - reuseCount = 0 - } - } if err != nil { r.err = err } else { - r.buf.Write(r.drainBuf[0:n]) + if n == 0 { + // nothing written, no need to signal + r.Unlock() + continue + } + r.buf.Write(r.drainBuf[:n]) } - reuseCount++ r.wait.Signal() r.Unlock() callSchedulerIfNecessary() diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go index 0a39b6e..5eeda96 100644 --- a/ioutils/readers_test.go +++ b/ioutils/readers_test.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "strings" "testing" + "time" ) // Implement io.Reader @@ -61,8 +62,8 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { reader, writer := io.Pipe() drainBuffer := make([]byte, 1024) - buffer := bytes.Buffer{} - bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer) + buffer := NewBytesPipe(nil) + bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer) // Write everything down to a Pipe // Usually, a pipe should block but because of the buffered reader, @@ -76,7 +77,11 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { // Drain the reader *after* everything has been written, just to verify // it is indeed buffering - <-done + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("timeout") + } output, err := ioutil.ReadAll(bufreader) if err != nil {