From c49f7cd0154b3bfc18d53e48a9fb46586092f71f Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 22 Apr 2015 15:07:18 -0700 Subject: [PATCH] Pool buffers used in S3.WriteStream Signed-off-by: Stephen J Day --- docs/storage/driver/s3/s3.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/docs/storage/driver/s3/s3.go b/docs/storage/driver/s3/s3.go index 402f2eaa..1b04d784 100644 --- a/docs/storage/driver/s3/s3.go +++ b/docs/storage/driver/s3/s3.go @@ -22,6 +22,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/AdRoll/goamz/aws" @@ -72,6 +73,9 @@ type driver struct { ChunkSize int64 Encrypt bool RootDirectory string + + pool sync.Pool // pool []byte buffers used for WriteStream + zeros []byte // shared, zero-valued buffer used for WriteStream } type baseEmbed struct { @@ -224,6 +228,11 @@ func New(params DriverParameters) (*Driver, error) { ChunkSize: params.ChunkSize, Encrypt: params.Encrypt, RootDirectory: params.RootDirectory, + zeros: make([]byte, params.ChunkSize), + } + + d.pool.New = func() interface{} { + return make([]byte, d.ChunkSize) } return &Driver{ @@ -287,8 +296,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total return 0, err } - buf := make([]byte, d.ChunkSize) - zeroBuf := make([]byte, d.ChunkSize) + buf := d.getbuf() // We never want to leave a dangling multipart upload, our only consistent state is // when there is a whole object at path. This is in order to remain consistent with @@ -314,6 +322,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total } } } + + d.putbuf(buf) // needs to be here to pick up new buf value }() // Fills from 0 to total from current @@ -367,6 +377,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total } go func(bytesRead int, from int64, buf []byte) { + defer d.putbuf(buf) // this buffer gets dropped after this call + // parts and partNumber are safe, because this function is the only one modifying them and we // force it to be executed serially. if bytesRead > 0 { @@ -381,7 +393,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total putErrChan <- nil }(bytesRead, from, buf) - buf = make([]byte, d.ChunkSize) + buf = d.getbuf() // use a new buffer for the next call return nil } @@ -429,7 +441,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total fromZeroFillSmall := func(from, to int64) error { bytesRead = 0 for from+int64(bytesRead) < to { - nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to]) + nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) bytesRead += nn if err != nil { return err @@ -443,7 +455,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total fromZeroFillLarge := func(from, to int64) error { bytesRead64 := int64(0) for to-(from+bytesRead64) >= d.ChunkSize { - part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) + part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros)) if err != nil { return err } @@ -724,3 +736,13 @@ func getPermissions() s3.ACL { func (d *driver) getContentType() string { return "application/octet-stream" } + +// getbuf returns a buffer from the driver's pool with length d.ChunkSize. +func (d *driver) getbuf() []byte { + return d.pool.Get().([]byte) +} + +func (d *driver) putbuf(p []byte) { + copy(p, d.zeros) + d.pool.Put(p) +}