Merge pull request #10347 from unclejack/bufreader_reset
reset bufReader to avoid extreme growth of buffers
This commit is contained in:
commit
885a11991a
2 changed files with 169 additions and 8 deletions
|
@ -2,8 +2,11 @@ package ioutils
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"io"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type readCloserWrapper struct {
|
||||
|
@ -42,20 +45,40 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
|
|||
}
|
||||
}
|
||||
|
||||
// bufReader allows the underlying reader to continue to produce
|
||||
// output by pre-emptively reading from the wrapped reader.
|
||||
// This is achieved by buffering this data in bufReader's
|
||||
// expanding buffer.
|
||||
type bufReader struct {
|
||||
sync.Mutex
|
||||
buf *bytes.Buffer
|
||||
reader io.Reader
|
||||
err error
|
||||
wait sync.Cond
|
||||
drainBuf []byte
|
||||
buf *bytes.Buffer
|
||||
reader io.Reader
|
||||
err error
|
||||
wait sync.Cond
|
||||
drainBuf []byte
|
||||
reuseBuf []byte
|
||||
maxReuse int64
|
||||
resetTimeout time.Duration
|
||||
bufLenResetThreshold int64
|
||||
maxReadDataReset int64
|
||||
}
|
||||
|
||||
func NewBufReader(r io.Reader) *bufReader {
|
||||
var timeout int
|
||||
if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil {
|
||||
timeout = int(randVal.Int64()) + 180
|
||||
} else {
|
||||
timeout = 300
|
||||
}
|
||||
reader := &bufReader{
|
||||
buf: &bytes.Buffer{},
|
||||
drainBuf: make([]byte, 1024),
|
||||
reader: r,
|
||||
buf: &bytes.Buffer{},
|
||||
drainBuf: make([]byte, 1024),
|
||||
reuseBuf: make([]byte, 4096),
|
||||
maxReuse: 1000,
|
||||
resetTimeout: time.Second * time.Duration(timeout),
|
||||
bufLenResetThreshold: 100 * 1024,
|
||||
maxReadDataReset: 10 * 1024 * 1024,
|
||||
reader: r,
|
||||
}
|
||||
reader.wait.L = &reader.Mutex
|
||||
go reader.drain()
|
||||
|
@ -74,14 +97,94 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *
|
|||
}
|
||||
|
||||
func (r *bufReader) drain() {
|
||||
var (
|
||||
duration time.Duration
|
||||
lastReset time.Time
|
||||
now time.Time
|
||||
reset bool
|
||||
bufLen int64
|
||||
dataSinceReset int64
|
||||
maxBufLen int64
|
||||
reuseBufLen int64
|
||||
reuseCount int64
|
||||
)
|
||||
reuseBufLen = int64(len(r.reuseBuf))
|
||||
lastReset = time.Now()
|
||||
for {
|
||||
n, err := r.reader.Read(r.drainBuf)
|
||||
dataSinceReset += int64(n)
|
||||
r.Lock()
|
||||
bufLen = int64(r.buf.Len())
|
||||
if bufLen > maxBufLen {
|
||||
maxBufLen = bufLen
|
||||
}
|
||||
|
||||
// Avoid unbounded growth of the buffer over time.
|
||||
// This has been discovered to be the only non-intrusive
|
||||
// solution to the unbounded growth of the buffer.
|
||||
// Alternative solutions such as compression, multiple
|
||||
// buffers, channels and other similar pieces of code
|
||||
// were reducing throughput, overall Docker performance
|
||||
// or simply crashed Docker.
|
||||
// This solution releases the buffer when specific
|
||||
// conditions are met to avoid the continuous resizing
|
||||
// of the buffer for long lived containers.
|
||||
//
|
||||
// Move data to the front of the buffer if it's
|
||||
// smaller than what reuseBuf can store
|
||||
if bufLen > 0 && reuseBufLen >= bufLen {
|
||||
n, _ := r.buf.Read(r.reuseBuf)
|
||||
r.buf.Write(r.reuseBuf[0:n])
|
||||
// Take action if the buffer has been reused too many
|
||||
// times and if there's data in the buffer.
|
||||
// The timeout is also used as means to avoid doing
|
||||
// these operations more often or less often than
|
||||
// required.
|
||||
// The various conditions try to detect heavy activity
|
||||
// in the buffer which might be indicators of heavy
|
||||
// growth of the buffer.
|
||||
} else if reuseCount >= r.maxReuse && bufLen > 0 {
|
||||
now = time.Now()
|
||||
duration = now.Sub(lastReset)
|
||||
timeoutReached := duration >= r.resetTimeout
|
||||
|
||||
// The timeout has been reached and the
|
||||
// buffered data couldn't be moved to the front
|
||||
// of the buffer, so the buffer gets reset.
|
||||
if timeoutReached && bufLen > reuseBufLen {
|
||||
reset = true
|
||||
}
|
||||
// The amount of buffered data is too high now,
|
||||
// reset the buffer.
|
||||
if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
|
||||
reset = true
|
||||
}
|
||||
// Reset the buffer if a certain amount of
|
||||
// data has gone through the buffer since the
|
||||
// last reset.
|
||||
if timeoutReached && dataSinceReset >= r.maxReadDataReset {
|
||||
reset = true
|
||||
}
|
||||
// The buffered data is moved to a fresh buffer,
|
||||
// swap the old buffer with the new one and
|
||||
// reset all counters.
|
||||
if reset {
|
||||
newbuf := &bytes.Buffer{}
|
||||
newbuf.ReadFrom(r.buf)
|
||||
r.buf = newbuf
|
||||
lastReset = now
|
||||
reset = false
|
||||
dataSinceReset = 0
|
||||
maxBufLen = 0
|
||||
reuseCount = 0
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
r.err = err
|
||||
} else {
|
||||
r.buf.Write(r.drainBuf[0:n])
|
||||
}
|
||||
reuseCount++
|
||||
r.wait.Signal()
|
||||
r.Unlock()
|
||||
if err != nil {
|
||||
|
|
|
@ -32,3 +32,61 @@ func TestBufReader(t *testing.T) {
|
|||
t.Error(string(output))
|
||||
}
|
||||
}
|
||||
|
||||
type repeatedReader struct {
|
||||
readCount int
|
||||
maxReads int
|
||||
data []byte
|
||||
}
|
||||
|
||||
func newRepeatedReader(max int, data []byte) *repeatedReader {
|
||||
return &repeatedReader{0, max, data}
|
||||
}
|
||||
|
||||
func (r *repeatedReader) Read(p []byte) (int, error) {
|
||||
if r.readCount >= r.maxReads {
|
||||
return 0, io.EOF
|
||||
}
|
||||
r.readCount++
|
||||
n := copy(p, r.data)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func testWithData(data []byte, reads int) {
|
||||
reader := newRepeatedReader(reads, data)
|
||||
bufReader := NewBufReader(reader)
|
||||
io.Copy(ioutil.Discard, bufReader)
|
||||
}
|
||||
|
||||
func Benchmark1M10BytesReads(b *testing.B) {
|
||||
reads := 1000000
|
||||
readSize := int64(10)
|
||||
data := make([]byte, readSize)
|
||||
b.SetBytes(readSize * int64(reads))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
testWithData(data, reads)
|
||||
}
|
||||
}
|
||||
|
||||
func Benchmark1M1024BytesReads(b *testing.B) {
|
||||
reads := 1000000
|
||||
readSize := int64(1024)
|
||||
data := make([]byte, readSize)
|
||||
b.SetBytes(readSize * int64(reads))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
testWithData(data, reads)
|
||||
}
|
||||
}
|
||||
|
||||
func Benchmark10k32KBytesReads(b *testing.B) {
|
||||
reads := 10000
|
||||
readSize := int64(32 * 1024)
|
||||
data := make([]byte, readSize)
|
||||
b.SetBytes(readSize * int64(reads))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
testWithData(data, reads)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue