Make bytesPipe use linear allocations
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
parent
46b458fe3b
commit
3c6dcfb6ca
3 changed files with 120 additions and 57 deletions
|
@ -1,15 +1,16 @@
|
|||
package ioutils
|
||||
|
||||
const maxCap = 10 * 1e6
|
||||
const maxCap = 1e6
|
||||
|
||||
// BytesPipe is io.ReadWriter which works similary to pipe(queue).
|
||||
// All written data could be read only once. Also BytesPipe trying to adjust
|
||||
// internal []byte slice to current needs, so there won't be overgrown buffer
|
||||
// after highload peak.
|
||||
// BytesPipe is io.ReadWriter which works similarly to pipe(queue).
|
||||
// All written data could be read only once. Also BytesPipe is allocating
|
||||
// and releasing new byte slices to adjust to current needs, so there won't be
|
||||
// overgrown buffer after high load peak.
|
||||
// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
|
||||
type BytesPipe struct {
|
||||
buf []byte
|
||||
lastRead int
|
||||
buf [][]byte // slice of byte-slices of buffered data
|
||||
lastRead int // index in the first slice to a read point
|
||||
bufLen int // length of data buffered over the slices
|
||||
}
|
||||
|
||||
// NewBytesPipe creates new BytesPipe, initialized by specified slice.
|
||||
|
@ -20,63 +21,69 @@ func NewBytesPipe(buf []byte) *BytesPipe {
|
|||
buf = make([]byte, 0, 64)
|
||||
}
|
||||
return &BytesPipe{
|
||||
buf: buf[:0],
|
||||
}
|
||||
}
|
||||
|
||||
func (bp *BytesPipe) grow(n int) {
|
||||
if len(bp.buf)+n > cap(bp.buf) {
|
||||
// not enough space
|
||||
var buf []byte
|
||||
remain := bp.len()
|
||||
if remain+n <= cap(bp.buf)/2 {
|
||||
// enough space in current buffer, just move data to head
|
||||
copy(bp.buf, bp.buf[bp.lastRead:])
|
||||
buf = bp.buf[:remain]
|
||||
} else {
|
||||
// reallocate buffer
|
||||
buf = make([]byte, remain, 2*cap(bp.buf)+n)
|
||||
copy(buf, bp.buf[bp.lastRead:])
|
||||
}
|
||||
bp.buf = buf
|
||||
bp.lastRead = 0
|
||||
buf: [][]byte{buf[:0]},
|
||||
}
|
||||
}
|
||||
|
||||
// Write writes p to BytesPipe.
|
||||
// It can increase cap of internal []byte slice in a process of writing.
|
||||
// It can allocate new []byte slices in a process of writing.
|
||||
func (bp *BytesPipe) Write(p []byte) (n int, err error) {
|
||||
bp.grow(len(p))
|
||||
bp.buf = append(bp.buf, p...)
|
||||
for {
|
||||
// write data to the last buffer
|
||||
b := bp.buf[len(bp.buf)-1]
|
||||
// copy data to the current empty allocated area
|
||||
n := copy(b[len(b):cap(b)], p)
|
||||
// increment buffered data length
|
||||
bp.bufLen += n
|
||||
// include written data in last buffer
|
||||
bp.buf[len(bp.buf)-1] = b[:len(b)+n]
|
||||
|
||||
// if there was enough room to write all then break
|
||||
if len(p) == n {
|
||||
break
|
||||
}
|
||||
|
||||
// more data: write to the next slice
|
||||
p = p[n:]
|
||||
// allocate slice that has twice the size of the last unless maximum reached
|
||||
nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
|
||||
if maxCap < nextCap {
|
||||
nextCap = maxCap
|
||||
}
|
||||
// add new byte slice to the buffers slice and continue writing
|
||||
bp.buf = append(bp.buf, make([]byte, 0, nextCap))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (bp *BytesPipe) len() int {
|
||||
return len(bp.buf) - bp.lastRead
|
||||
}
|
||||
|
||||
func (bp *BytesPipe) crop() {
|
||||
// shortcut for empty buffer
|
||||
if bp.lastRead == len(bp.buf) {
|
||||
bp.lastRead = 0
|
||||
bp.buf = bp.buf[:0]
|
||||
}
|
||||
r := bp.len()
|
||||
// if we have too large buffer for too small data
|
||||
if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 {
|
||||
copy(bp.buf, bp.buf[bp.lastRead:])
|
||||
// will use same underlying slice until reach cap
|
||||
bp.buf = bp.buf[:r : cap(bp.buf)/2]
|
||||
bp.lastRead = 0
|
||||
}
|
||||
return bp.bufLen - bp.lastRead
|
||||
}
|
||||
|
||||
// Read reads bytes from BytesPipe.
|
||||
// Data could be read only once.
|
||||
// Internal []byte slice could be shrinked.
|
||||
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
|
||||
n = copy(p, bp.buf[bp.lastRead:])
|
||||
bp.lastRead += n
|
||||
bp.crop()
|
||||
for {
|
||||
read := copy(p, bp.buf[0][bp.lastRead:])
|
||||
n += read
|
||||
bp.lastRead += read
|
||||
if bp.len() == 0 {
|
||||
// we have read everything. reset to the beginning.
|
||||
bp.lastRead = 0
|
||||
bp.bufLen -= len(bp.buf[0])
|
||||
bp.buf[0] = bp.buf[0][:0]
|
||||
break
|
||||
}
|
||||
// break if everything was read
|
||||
if len(p) == read {
|
||||
break
|
||||
}
|
||||
// more buffered data and more asked. read from next slice.
|
||||
p = p[read:]
|
||||
bp.lastRead = 0
|
||||
bp.bufLen -= len(bp.buf[0])
|
||||
bp.buf[0] = nil // throw away old slice
|
||||
bp.buf = bp.buf[1:] // switch to next
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package ioutils
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBytesPipeRead(t *testing.T) {
|
||||
buf := NewBytesPipe(nil)
|
||||
|
@ -49,11 +53,67 @@ func TestBytesPipeWrite(t *testing.T) {
|
|||
buf.Write([]byte("56"))
|
||||
buf.Write([]byte("78"))
|
||||
buf.Write([]byte("90"))
|
||||
if string(buf.buf) != "1234567890" {
|
||||
if string(buf.buf[0]) != "1234567890" {
|
||||
t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
|
||||
}
|
||||
}
|
||||
|
||||
// Write and read in different speeds/chunk sizes and check valid data is read.
|
||||
func TestBytesPipeWriteRandomChunks(t *testing.T) {
|
||||
cases := []struct{ iterations, writesPerLoop, readsPerLoop int }{
|
||||
{100, 10, 1},
|
||||
{1000, 10, 5},
|
||||
{1000, 100, 0},
|
||||
{1000, 5, 6},
|
||||
{10000, 50, 25},
|
||||
}
|
||||
|
||||
testMessage := []byte("this is a random string for testing")
|
||||
// random slice sizes to read and write
|
||||
writeChunks := []int{25, 35, 15, 20}
|
||||
readChunks := []int{5, 45, 20, 25}
|
||||
|
||||
for _, c := range cases {
|
||||
// first pass: write directly to hash
|
||||
hash := sha1.New()
|
||||
for i := 0; i < c.iterations*c.writesPerLoop; i++ {
|
||||
if _, err := hash.Write(testMessage[:writeChunks[i%len(writeChunks)]]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
expected := hex.EncodeToString(hash.Sum(nil))
|
||||
|
||||
// write/read through buffer
|
||||
buf := NewBytesPipe(nil)
|
||||
hash.Reset()
|
||||
for i := 0; i < c.iterations; i++ {
|
||||
for w := 0; w < c.writesPerLoop; w++ {
|
||||
buf.Write(testMessage[:writeChunks[(i*c.writesPerLoop+w)%len(writeChunks)]])
|
||||
}
|
||||
for r := 0; r < c.readsPerLoop; r++ {
|
||||
p := make([]byte, readChunks[(i*c.readsPerLoop+r)%len(readChunks)])
|
||||
n, _ := buf.Read(p)
|
||||
hash.Write(p[:n])
|
||||
}
|
||||
}
|
||||
// read rest of the data from buffer
|
||||
for i := 0; ; i++ {
|
||||
p := make([]byte, readChunks[(c.iterations*c.readsPerLoop+i)%len(readChunks)])
|
||||
n, _ := buf.Read(p)
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
hash.Write(p[:n])
|
||||
}
|
||||
actual := hex.EncodeToString(hash.Sum(nil))
|
||||
|
||||
if expected != actual {
|
||||
t.Fatalf("BytesPipe returned invalid data. Expected checksum %v, got %v", expected, actual)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBytesPipeWrite(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
buf := NewBytesPipe(nil)
|
||||
|
|
|
@ -5,12 +5,8 @@ import (
|
|||
"encoding/hex"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/docker/pkg/random"
|
||||
)
|
||||
|
||||
var rndSrc = random.NewSource()
|
||||
|
||||
type readCloserWrapper struct {
|
||||
io.Reader
|
||||
closer func() error
|
||||
|
|
Loading…
Reference in a new issue