Merge pull request #16190 from LK4D4/drain_refactor
Refactoring of bufReader
This commit is contained in:
commit
f09705e6aa
4 changed files with 253 additions and 112 deletions
89
ioutils/bytespipe.go
Normal file
89
ioutils/bytespipe.go
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
package ioutils
|
||||||
|
|
||||||
|
const maxCap = 1e6
|
||||||
|
|
||||||
|
// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
|
||||||
|
// All written data could be read only once. Also BytesPipe is allocating
|
||||||
|
// and releasing new byte slices to adjust to current needs, so there won't be
|
||||||
|
// overgrown buffer after high load peak.
|
||||||
|
// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
|
||||||
|
type BytesPipe struct {
|
||||||
|
buf [][]byte // slice of byte-slices of buffered data
|
||||||
|
lastRead int // index in the first slice to a read point
|
||||||
|
bufLen int // length of data buffered over the slices
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
|
||||||
|
// If buf is nil, then it will be initialized with slice which cap is 64.
|
||||||
|
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
|
||||||
|
func NewBytesPipe(buf []byte) *BytesPipe {
|
||||||
|
if cap(buf) == 0 {
|
||||||
|
buf = make([]byte, 0, 64)
|
||||||
|
}
|
||||||
|
return &BytesPipe{
|
||||||
|
buf: [][]byte{buf[:0]},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes p to BytesPipe.
|
||||||
|
// It can allocate new []byte slices in a process of writing.
|
||||||
|
func (bp *BytesPipe) Write(p []byte) (n int, err error) {
|
||||||
|
for {
|
||||||
|
// write data to the last buffer
|
||||||
|
b := bp.buf[len(bp.buf)-1]
|
||||||
|
// copy data to the current empty allocated area
|
||||||
|
n := copy(b[len(b):cap(b)], p)
|
||||||
|
// increment buffered data length
|
||||||
|
bp.bufLen += n
|
||||||
|
// include written data in last buffer
|
||||||
|
bp.buf[len(bp.buf)-1] = b[:len(b)+n]
|
||||||
|
|
||||||
|
// if there was enough room to write all then break
|
||||||
|
if len(p) == n {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// more data: write to the next slice
|
||||||
|
p = p[n:]
|
||||||
|
// allocate slice that has twice the size of the last unless maximum reached
|
||||||
|
nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
|
||||||
|
if maxCap < nextCap {
|
||||||
|
nextCap = maxCap
|
||||||
|
}
|
||||||
|
// add new byte slice to the buffers slice and continue writing
|
||||||
|
bp.buf = append(bp.buf, make([]byte, 0, nextCap))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *BytesPipe) len() int {
|
||||||
|
return bp.bufLen - bp.lastRead
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads bytes from BytesPipe.
|
||||||
|
// Data could be read only once.
|
||||||
|
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
|
||||||
|
for {
|
||||||
|
read := copy(p, bp.buf[0][bp.lastRead:])
|
||||||
|
n += read
|
||||||
|
bp.lastRead += read
|
||||||
|
if bp.len() == 0 {
|
||||||
|
// we have read everything. reset to the beginning.
|
||||||
|
bp.lastRead = 0
|
||||||
|
bp.bufLen -= len(bp.buf[0])
|
||||||
|
bp.buf[0] = bp.buf[0][:0]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// break if everything was read
|
||||||
|
if len(p) == read {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// more buffered data and more asked. read from next slice.
|
||||||
|
p = p[read:]
|
||||||
|
bp.lastRead = 0
|
||||||
|
bp.bufLen -= len(bp.buf[0])
|
||||||
|
bp.buf[0] = nil // throw away old slice
|
||||||
|
bp.buf = bp.buf[1:] // switch to next
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
141
ioutils/bytespipe_test.go
Normal file
141
ioutils/bytespipe_test.go
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
package ioutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha1"
|
||||||
|
"encoding/hex"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBytesPipeRead(t *testing.T) {
|
||||||
|
buf := NewBytesPipe(nil)
|
||||||
|
buf.Write([]byte("12"))
|
||||||
|
buf.Write([]byte("34"))
|
||||||
|
buf.Write([]byte("56"))
|
||||||
|
buf.Write([]byte("78"))
|
||||||
|
buf.Write([]byte("90"))
|
||||||
|
rd := make([]byte, 4)
|
||||||
|
n, err := buf.Read(rd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if n != 4 {
|
||||||
|
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
|
||||||
|
}
|
||||||
|
if string(rd) != "1234" {
|
||||||
|
t.Fatalf("Read %s, but must be %s", rd, "1234")
|
||||||
|
}
|
||||||
|
n, err = buf.Read(rd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if n != 4 {
|
||||||
|
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
|
||||||
|
}
|
||||||
|
if string(rd) != "5678" {
|
||||||
|
t.Fatalf("Read %s, but must be %s", rd, "5679")
|
||||||
|
}
|
||||||
|
n, err = buf.Read(rd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if n != 2 {
|
||||||
|
t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2)
|
||||||
|
}
|
||||||
|
if string(rd[:n]) != "90" {
|
||||||
|
t.Fatalf("Read %s, but must be %s", rd, "90")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBytesPipeWrite(t *testing.T) {
|
||||||
|
buf := NewBytesPipe(nil)
|
||||||
|
buf.Write([]byte("12"))
|
||||||
|
buf.Write([]byte("34"))
|
||||||
|
buf.Write([]byte("56"))
|
||||||
|
buf.Write([]byte("78"))
|
||||||
|
buf.Write([]byte("90"))
|
||||||
|
if string(buf.buf[0]) != "1234567890" {
|
||||||
|
t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write and read in different speeds/chunk sizes and check valid data is read.
|
||||||
|
func TestBytesPipeWriteRandomChunks(t *testing.T) {
|
||||||
|
cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{
|
||||||
|
{100, 10, 1},
|
||||||
|
{1000, 10, 5},
|
||||||
|
{1000, 100, 0},
|
||||||
|
{1000, 5, 6},
|
||||||
|
{10000, 50, 25},
|
||||||
|
}
|
||||||
|
|
||||||
|
testMessage := []byte("this is a random string for testing")
|
||||||
|
// random slice sizes to read and write
|
||||||
|
writeChunks := []int{25, 35, 15, 20}
|
||||||
|
readChunks := []int{5, 45, 20, 25}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
// first pass: write directly to hash
|
||||||
|
hash := sha1.New()
|
||||||
|
for i := 0; i < c.iterations*c.writesPerLoop; i++ {
|
||||||
|
if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expected := hex.EncodeToString(hash.Sum(nil))
|
||||||
|
|
||||||
|
// write/read through buffer
|
||||||
|
buf := NewBytesPipe(nil)
|
||||||
|
hash.Reset()
|
||||||
|
for i := 0; i < c.iterations; i++ {
|
||||||
|
for w := 0; w < c.writesPerLoop; w++ {
|
||||||
|
buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
|
||||||
|
}
|
||||||
|
for r := 0; r < c.readsPerLoop; r++ {
|
||||||
|
p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
|
||||||
|
n, _ := buf.Read(p)
|
||||||
|
hash.Write(p[:n])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// read rest of the data from buffer
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
|
||||||
|
n, _ := buf.Read(p)
|
||||||
|
if n == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
hash.Write(p[:n])
|
||||||
|
}
|
||||||
|
actual := hex.EncodeToString(hash.Sum(nil))
|
||||||
|
|
||||||
|
if expected != actual {
|
||||||
|
t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBytesPipeWrite(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
buf := NewBytesPipe(nil)
|
||||||
|
for j := 0; j < 1000; j++ {
|
||||||
|
buf.Write([]byte("pretty short line, because why not?"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBytesPipeRead(b *testing.B) {
|
||||||
|
rd := make([]byte, 1024)
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
b.StopTimer()
|
||||||
|
buf := NewBytesPipe(nil)
|
||||||
|
for j := 0; j < 1000; j++ {
|
||||||
|
buf.Write(make([]byte, 1024))
|
||||||
|
}
|
||||||
|
b.StartTimer()
|
||||||
|
for j := 0; j < 1000; j++ {
|
||||||
|
if n, _ := buf.Read(rd); n != 1024 {
|
||||||
|
b.Fatalf("Wrong number of bytes: %d", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,19 +1,12 @@
|
||||||
package ioutils
|
package ioutils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/random"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var rndSrc = random.NewSource()
|
|
||||||
|
|
||||||
type readCloserWrapper struct {
|
type readCloserWrapper struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
closer func() error
|
closer func() error
|
||||||
|
@ -58,31 +51,19 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader {
|
||||||
// expanding buffer.
|
// expanding buffer.
|
||||||
type bufReader struct {
|
type bufReader struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
buf *bytes.Buffer
|
buf io.ReadWriter
|
||||||
reader io.Reader
|
reader io.Reader
|
||||||
err error
|
err error
|
||||||
wait sync.Cond
|
wait sync.Cond
|
||||||
drainBuf []byte
|
drainBuf []byte
|
||||||
reuseBuf []byte
|
|
||||||
maxReuse int64
|
|
||||||
resetTimeout time.Duration
|
|
||||||
bufLenResetThreshold int64
|
|
||||||
maxReadDataReset int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBufReader returns a new bufReader.
|
// NewBufReader returns a new bufReader.
|
||||||
func NewBufReader(r io.Reader) io.ReadCloser {
|
func NewBufReader(r io.Reader) io.ReadCloser {
|
||||||
timeout := rand.New(rndSrc).Intn(90)
|
|
||||||
|
|
||||||
reader := &bufReader{
|
reader := &bufReader{
|
||||||
buf: &bytes.Buffer{},
|
buf: NewBytesPipe(nil),
|
||||||
drainBuf: make([]byte, 1024),
|
reader: r,
|
||||||
reuseBuf: make([]byte, 4096),
|
drainBuf: make([]byte, 1024),
|
||||||
maxReuse: 1000,
|
|
||||||
resetTimeout: time.Duration(timeout) * time.Second,
|
|
||||||
bufLenResetThreshold: 100 * 1024,
|
|
||||||
maxReadDataReset: 10 * 1024 * 1024,
|
|
||||||
reader: r,
|
|
||||||
}
|
}
|
||||||
reader.wait.L = &reader.Mutex
|
reader.wait.L = &reader.Mutex
|
||||||
go reader.drain()
|
go reader.drain()
|
||||||
|
@ -90,7 +71,7 @@ func NewBufReader(r io.Reader) io.ReadCloser {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
|
// NewBufReaderWithDrainbufAndBuffer returns a BufReader with drainBuffer and buffer.
|
||||||
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *bytes.Buffer) io.ReadCloser {
|
func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer io.ReadWriter) io.ReadCloser {
|
||||||
reader := &bufReader{
|
reader := &bufReader{
|
||||||
buf: buffer,
|
buf: buffer,
|
||||||
drainBuf: drainBuffer,
|
drainBuf: drainBuffer,
|
||||||
|
@ -102,94 +83,19 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer *
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *bufReader) drain() {
|
func (r *bufReader) drain() {
|
||||||
var (
|
|
||||||
duration time.Duration
|
|
||||||
lastReset time.Time
|
|
||||||
now time.Time
|
|
||||||
reset bool
|
|
||||||
bufLen int64
|
|
||||||
dataSinceReset int64
|
|
||||||
maxBufLen int64
|
|
||||||
reuseBufLen int64
|
|
||||||
reuseCount int64
|
|
||||||
)
|
|
||||||
reuseBufLen = int64(len(r.reuseBuf))
|
|
||||||
lastReset = time.Now()
|
|
||||||
for {
|
for {
|
||||||
n, err := r.reader.Read(r.drainBuf)
|
n, err := r.reader.Read(r.drainBuf)
|
||||||
dataSinceReset += int64(n)
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
bufLen = int64(r.buf.Len())
|
|
||||||
if bufLen > maxBufLen {
|
|
||||||
maxBufLen = bufLen
|
|
||||||
}
|
|
||||||
|
|
||||||
// Avoid unbounded growth of the buffer over time.
|
|
||||||
// This has been discovered to be the only non-intrusive
|
|
||||||
// solution to the unbounded growth of the buffer.
|
|
||||||
// Alternative solutions such as compression, multiple
|
|
||||||
// buffers, channels and other similar pieces of code
|
|
||||||
// were reducing throughput, overall Docker performance
|
|
||||||
// or simply crashed Docker.
|
|
||||||
// This solution releases the buffer when specific
|
|
||||||
// conditions are met to avoid the continuous resizing
|
|
||||||
// of the buffer for long lived containers.
|
|
||||||
//
|
|
||||||
// Move data to the front of the buffer if it's
|
|
||||||
// smaller than what reuseBuf can store
|
|
||||||
if bufLen > 0 && reuseBufLen >= bufLen {
|
|
||||||
n, _ := r.buf.Read(r.reuseBuf)
|
|
||||||
r.buf.Write(r.reuseBuf[0:n])
|
|
||||||
// Take action if the buffer has been reused too many
|
|
||||||
// times and if there's data in the buffer.
|
|
||||||
// The timeout is also used as means to avoid doing
|
|
||||||
// these operations more often or less often than
|
|
||||||
// required.
|
|
||||||
// The various conditions try to detect heavy activity
|
|
||||||
// in the buffer which might be indicators of heavy
|
|
||||||
// growth of the buffer.
|
|
||||||
} else if reuseCount >= r.maxReuse && bufLen > 0 {
|
|
||||||
now = time.Now()
|
|
||||||
duration = now.Sub(lastReset)
|
|
||||||
timeoutReached := duration >= r.resetTimeout
|
|
||||||
|
|
||||||
// The timeout has been reached and the
|
|
||||||
// buffered data couldn't be moved to the front
|
|
||||||
// of the buffer, so the buffer gets reset.
|
|
||||||
if timeoutReached && bufLen > reuseBufLen {
|
|
||||||
reset = true
|
|
||||||
}
|
|
||||||
// The amount of buffered data is too high now,
|
|
||||||
// reset the buffer.
|
|
||||||
if timeoutReached && maxBufLen >= r.bufLenResetThreshold {
|
|
||||||
reset = true
|
|
||||||
}
|
|
||||||
// Reset the buffer if a certain amount of
|
|
||||||
// data has gone through the buffer since the
|
|
||||||
// last reset.
|
|
||||||
if timeoutReached && dataSinceReset >= r.maxReadDataReset {
|
|
||||||
reset = true
|
|
||||||
}
|
|
||||||
// The buffered data is moved to a fresh buffer,
|
|
||||||
// swap the old buffer with the new one and
|
|
||||||
// reset all counters.
|
|
||||||
if reset {
|
|
||||||
newbuf := &bytes.Buffer{}
|
|
||||||
newbuf.ReadFrom(r.buf)
|
|
||||||
r.buf = newbuf
|
|
||||||
lastReset = now
|
|
||||||
reset = false
|
|
||||||
dataSinceReset = 0
|
|
||||||
maxBufLen = 0
|
|
||||||
reuseCount = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.err = err
|
r.err = err
|
||||||
} else {
|
} else {
|
||||||
r.buf.Write(r.drainBuf[0:n])
|
if n == 0 {
|
||||||
|
// nothing written, no need to signal
|
||||||
|
r.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.buf.Write(r.drainBuf[:n])
|
||||||
}
|
}
|
||||||
reuseCount++
|
|
||||||
r.wait.Signal()
|
r.wait.Signal()
|
||||||
r.Unlock()
|
r.Unlock()
|
||||||
callSchedulerIfNecessary()
|
callSchedulerIfNecessary()
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Implement io.Reader
|
// Implement io.Reader
|
||||||
|
@ -61,8 +62,8 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
|
||||||
reader, writer := io.Pipe()
|
reader, writer := io.Pipe()
|
||||||
|
|
||||||
drainBuffer := make([]byte, 1024)
|
drainBuffer := make([]byte, 1024)
|
||||||
buffer := bytes.Buffer{}
|
buffer := NewBytesPipe(nil)
|
||||||
bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer)
|
bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, buffer)
|
||||||
|
|
||||||
// Write everything down to a Pipe
|
// Write everything down to a Pipe
|
||||||
// Usually, a pipe should block but because of the buffered reader,
|
// Usually, a pipe should block but because of the buffered reader,
|
||||||
|
@ -76,7 +77,11 @@ func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) {
|
||||||
|
|
||||||
// Drain the reader *after* everything has been written, just to verify
|
// Drain the reader *after* everything has been written, just to verify
|
||||||
// it is indeed buffering
|
// it is indeed buffering
|
||||||
<-done
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatal("timeout")
|
||||||
|
}
|
||||||
|
|
||||||
output, err := ioutil.ReadAll(bufreader)
|
output, err := ioutil.ReadAll(bufreader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue