3b45ed0fee
Signed-off-by: Cristian Staretu <cristian.staretu@gmail.com>
217 lines
5.1 KiB
Go
217 lines
5.1 KiB
Go
package ioutils
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
"io"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type readCloserWrapper struct {
|
|
io.Reader
|
|
closer func() error
|
|
}
|
|
|
|
func (r *readCloserWrapper) Close() error {
|
|
return r.closer()
|
|
}
|
|
|
|
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
|
|
return &readCloserWrapper{
|
|
Reader: r,
|
|
closer: closer,
|
|
}
|
|
}
|
|
|
|
type readerErrWrapper struct {
|
|
reader io.Reader
|
|
closer func()
|
|
}
|
|
|
|
func (r *readerErrWrapper) Read(p []byte) (int, error) {
|
|
n, err := r.reader.Read(p)
|
|
if err != nil {
|
|
r.closer()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
|
|
return &readerErrWrapper{
|
|
reader: r,
|
|
closer: closer,
|
|
}
|
|
}
|
|
|
|
// bufReader allows the underlying reader to continue to produce
|
|
// output by pre-emptively reading from the wrapped reader.
|
|
// This is achieved by buffering this data in bufReader's
|
|
// expanding buffer.
|
|
type bufReader struct {
|
|
sync.Mutex
|
|
buf *bytes.Buffer
|
|
reader io.Reader
|
|
err error
|
|
wait sync.Cond
|
|
drainBuf []byte
|
|
reuseBuf []byte
|
|
maxReuse int64
|
|
resetTimeout time.Duration
|
|
bufLenResetThreshold int64
|
|
maxReadDataReset int64
|
|
}
|
|
|
|
func NewBufReader(r io.Reader) *bufReader {
|
|
var timeout int
|
|
if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil {
|
|
timeout = int(randVal.Int64()) + 180
|
|
} else {
|
|
timeout = 300
|
|
}
|
|
reader := &bufReader{
|
|
buf: &bytes.Buffer{},
|
|
drainBuf: make([]byte, 1024),
|
|
reuseBuf: make([]byte, 4096),
|
|
maxReuse: 1000,
|
|
resetTimeout: time.Second * time.Duration(timeout),
|
|
bufLenResetThreshold: 100 * 1024,
|
|
maxReadDataReset: 10 * 1024 * 1024,
|
|
reader: r,
|
|
}
|
|
reader.wait.L = &reader.Mutex
|
|
go reader.drain()
|
|
return reader
|
|
}
|
|
|
|
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) *bufReader {
|
|
reader := &bufReader{
|
|
buf: buffer,
|
|
drainBuf: drainBuffer,
|
|
reader: r,
|
|
}
|
|
reader.wait.L = &reader.Mutex
|
|
go reader.drain()
|
|
return reader
|
|
}
|
|
|
|
func (r *bufReader) drain() {
|
|
var (
|
|
duration time.Duration
|
|
lastReset time.Time
|
|
now time.Time
|
|
reset bool
|
|
bufLen int64
|
|
dataSinceReset int64
|
|
maxBufLen int64
|
|
reuseBufLen int64
|
|
reuseCount int64
|
|
)
|
|
reuseBufLen = int64(len(r.reuseBuf))
|
|
lastReset = time.Now()
|
|
for {
|
|
n, err := r.reader.Read(r.drainBuf)
|
|
dataSinceReset += int64(n)
|
|
r.Lock()
|
|
bufLen = int64(r.buf.Len())
|
|
if bufLen > maxBufLen {
|
|
maxBufLen = bufLen
|
|
}
|
|
|
|
// Avoid unbounded growth of the buffer over time.
|
|
// This has been discovered to be the only non-intrusive
|
|
// solution to the unbounded growth of the buffer.
|
|
// Alternative solutions such as compression, multiple
|
|
// buffers, channels and other similar pieces of code
|
|
// were reducing throughput, overall Docker performance
|
|
// or simply crashed Docker.
|
|
// This solution releases the buffer when specific
|
|
// conditions are met to avoid the continuous resizing
|
|
// of the buffer for long lived containers.
|
|
//
|
|
// Move data to the front of the buffer if it's
|
|
// smaller than what reuseBuf can store
|
|
if bufLen > 0 && reuseBufLen >= bufLen {
|
|
n, _ := r.buf.Read(r.reuseBuf)
|
|
r.buf.Write(r.reuseBuf[0:n])
|
|
// Take action if the buffer has been reused too many
|
|
// times and if there's data in the buffer.
|
|
// The timeout is also used as means to avoid doing
|
|
// these operations more often or less often than
|
|
// required.
|
|
// The various conditions try to detect heavy activity
|
|
// in the buffer which might be indicators of heavy
|
|
// growth of the buffer.
|
|
} else if reuseCount >= r.maxReuse && bufLen > 0 {
|
|
now = time.Now()
|
|
duration = now.Sub(lastReset)
|
|
timeoutReached := duration >= r.resetTimeout
|
|
|
|
// The timeout has been reached and the
|
|
// buffered data couldn't be moved to the front
|
|
// of the buffer, so the buffer gets reset.
|
|
if timeoutReached && bufLen > reuseBufLen {
|
|
reset = true
|
|
}
|
|
// The amount of buffered data is too high now,
|
|
// reset the buffer.
|
|
if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
|
|
reset = true
|
|
}
|
|
// Reset the buffer if a certain amount of
|
|
// data has gone through the buffer since the
|
|
// last reset.
|
|
if timeoutReached && dataSinceReset >= r.maxReadDataReset {
|
|
reset = true
|
|
}
|
|
// The buffered data is moved to a fresh buffer,
|
|
// swap the old buffer with the new one and
|
|
// reset all counters.
|
|
if reset {
|
|
newbuf := &bytes.Buffer{}
|
|
newbuf.ReadFrom(r.buf)
|
|
r.buf = newbuf
|
|
lastReset = now
|
|
reset = false
|
|
dataSinceReset = 0
|
|
maxBufLen = 0
|
|
reuseCount = 0
|
|
}
|
|
}
|
|
if err != nil {
|
|
r.err = err
|
|
} else {
|
|
r.buf.Write(r.drainBuf[0:n])
|
|
}
|
|
reuseCount++
|
|
r.wait.Signal()
|
|
r.Unlock()
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *bufReader) Read(p []byte) (n int, err error) {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
for {
|
|
n, err = r.buf.Read(p)
|
|
if n > 0 {
|
|
return n, err
|
|
}
|
|
if r.err != nil {
|
|
return 0, r.err
|
|
}
|
|
r.wait.Wait()
|
|
}
|
|
}
|
|
|
|
func (r *bufReader) Close() error {
|
|
closer, ok := r.reader.(io.ReadCloser)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return closer.Close()
|
|
}
|