Refactor bufReader to use BytesPipe

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
Alexander Morozov 2015-09-09 09:48:34 -07:00 committed by Tonis Tiigi
parent 434c274741
commit 46b458fe3b
2 changed files with 23 additions and 108 deletions

View file

@ -1,13 +1,10 @@
package ioutils
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"io"
"math/rand"
"sync"
"time"
"github.com/docker/docker/pkg/random"
)
@ -58,31 +55,19 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
// expanding buffer.
type bufReader struct {
sync.Mutex
buf *bytes.Buffer
reader io.Reader
err error
wait sync.Cond
drainBuf []byte
reuseBuf []byte
maxReuse int64
resetTimeout time.Duration
bufLenResetThreshold int64
maxReadDataReset int64
buf io.ReadWriter
reader io.Reader
err error
wait sync.Cond
drainBuf []byte
}
// NewBufReader returns a new bufReader.
func NewBufReader(r io.Reader) io.ReadCloser {
timeout := rand.New(rndSrc).Intn(90)
reader := &bufReader{
buf: &bytes.Buffer{},
drainBuf: make([]byte, 1024),
reuseBuf: make([]byte, 4096),
maxReuse: 1000,
resetTimeout: time.Duration(timeout) * time.Second,
bufLenResetThreshold: 100 * 1024,
maxReadDataReset: 10 * 1024 * 1024,
reader: r,
buf: NewBytesPipe(nil),
reader: r,
drainBuf: make([]byte, 1024),
}
reader.wait.L = &reader.Mutex
go reader.drain()
@ -90,7 +75,7 @@ func NewBufReader(r io.Reader) io.ReadCloser {
}
// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) io.ReadCloser {
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
reader := &bufReader{
buf: buffer,
drainBuf: drainBuffer,
@ -102,94 +87,19 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *
}
func (r *bufReader) drain() {
var (
duration time.Duration
lastReset time.Time
now time.Time
reset bool
bufLen int64
dataSinceReset int64
maxBufLen int64
reuseBufLen int64
reuseCount int64
)
reuseBufLen = int64(len(r.reuseBuf))
lastReset = time.Now()
for {
n, err := r.reader.Read(r.drainBuf)
dataSinceReset += int64(n)
r.Lock()
bufLen = int64(r.buf.Len())
if bufLen > maxBufLen {
maxBufLen = bufLen
}
// Avoid unbounded growth of the buffer over time.
// This has been discovered to be the only non-intrusive
// solution to the unbounded growth of the buffer.
// Alternative solutions such as compression, multiple
// buffers, channels and other similar pieces of code
// were reducing throughput, overall Docker performance
// or simply crashed Docker.
// This solution releases the buffer when specific
// conditions are met to avoid the continuous resizing
// of the buffer for long lived containers.
//
// Move data to the front of the buffer if it's
// smaller than what reuseBuf can store
if bufLen > 0 && reuseBufLen >= bufLen {
n, _ := r.buf.Read(r.reuseBuf)
r.buf.Write(r.reuseBuf[0:n])
// Take action if the buffer has been reused too many
// times and if there's data in the buffer.
// The timeout is also used as means to avoid doing
// these operations more often or less often than
// required.
// The various conditions try to detect heavy activity
// in the buffer which might be indicators of heavy
// growth of the buffer.
} else if reuseCount >= r.maxReuse && bufLen > 0 {
now = time.Now()
duration = now.Sub(lastReset)
timeoutReached := duration >= r.resetTimeout
// The timeout has been reached and the
// buffered data couldn't be moved to the front
// of the buffer, so the buffer gets reset.
if timeoutReached && bufLen > reuseBufLen {
reset = true
}
// The amount of buffered data is too high now,
// reset the buffer.
if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
reset = true
}
// Reset the buffer if a certain amount of
// data has gone through the buffer since the
// last reset.
if timeoutReached && dataSinceReset >= r.maxReadDataReset {
reset = true
}
// The buffered data is moved to a fresh buffer,
// swap the old buffer with the new one and
// reset all counters.
if reset {
newbuf := &bytes.Buffer{}
newbuf.ReadFrom(r.buf)
r.buf = newbuf
lastReset = now
reset = false
dataSinceReset = 0
maxBufLen = 0
reuseCount = 0
}
}
if err != nil {
r.err = err
} else {
r.buf.Write(r.drainBuf[0:n])
if n == 0 {
// nothing written, no need to signal
r.Unlock()
continue
}
r.buf.Write(r.drainBuf[:n])
}
reuseCount++
r.wait.Signal()
r.Unlock()
callSchedulerIfNecessary()

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"strings"
"testing"
"time"
)
// Implement io.Reader
@ -61,8 +62,8 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
reader, writer := io.Pipe()
drainBuffer := make([]byte, 1024)
buffer := bytes.Buffer{}
bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer)
buffer := NewBytesPipe(nil)
bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
// Write everything down to a Pipe
// Usually, a pipe should block but because of the buffered reader,
@ -76,7 +77,11 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
// Drain the reader *after* everything has been written, just to verify
// it is indeed buffering
<-done
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout")
}
output, err := ioutil.ReadAll(bufreader)
if err != nil {