Adds new storagedriver.FileWriter interface

Updates registry storage code to use this for better resumable writes.
Implements this interface for the following drivers:
 + Inmemory
 + Filesystem
 + S3
 + Azure

Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
Brian Bland 2016-02-08 14:29:21 -08:00
parent 7adddecf0b
commit c69c8a3286
25 changed files with 1059 additions and 2488 deletions

View file

@ -20,10 +20,8 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
@ -103,9 +101,6 @@ type driver struct {
Encrypt bool
RootDirectory string
StorageClass string
pool sync.Pool // pool []byte buffers used for WriteStream
zeros []byte // shared, zero-valued buffer used for WriteStream
}
type baseEmbed struct {
@ -302,11 +297,6 @@ func New(params DriverParameters) (*Driver, error) {
Encrypt: params.Encrypt,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
zeros: make([]byte, params.ChunkSize),
}
d.pool.New = func() interface{} {
return make([]byte, d.ChunkSize)
}
return &Driver{
@ -326,7 +316,7 @@ func (d *driver) Name() string {
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
reader, err := d.ReadStream(ctx, path, 0)
reader, err := d.Reader(ctx, path, 0)
if err != nil {
return nil, err
}
@ -347,9 +337,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
return parseError(path, err)
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
@ -366,372 +356,52 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.
return resp.Body, nil
}
// WriteStream stores the contents of the provided io.Reader at a
// location designated by the given path. The driver will know it has
// received the full contents when the reader returns io.EOF. The number
// of successfully READ bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
var partNumber int64 = 1
bytesRead := 0
var putErrChan chan error
parts := []*s3.CompletedPart{}
done := make(chan struct{}) // stopgap to free up waiting goroutines
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return 0, err
}
uploadID := resp.UploadId
buf := d.getbuf()
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
// the stat call.
//
// Note that if the machine dies before executing the defer, we will be left with a dangling
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func() {
if putErrChan != nil {
if putErr := <-putErrChan; putErr != nil {
err = putErr
}
}
if len(parts) > 0 {
_, err := d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
UploadId: uploadID,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: parts,
},
})
if err != nil {
// TODO (brianbland): log errors here
d.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
UploadId: uploadID,
})
}
}
d.putbuf(buf) // needs to be here to pick up new buf value
close(done) // free up any waiting goroutines
}()
// Fills from 0 to total from current
fromSmallCurrent := func(total int64) error {
current, err := d.ReadStream(ctx, path, 0)
if err != nil {
return err
}
bytesRead = 0
for int64(bytesRead) < total {
//The loop should very rarely enter a second iteration
nn, err := current.Read(buf[bytesRead:total])
bytesRead += nn
if err != nil {
if err != io.EOF {
return err
}
break
}
}
return nil
}
// Fills from parameter to chunkSize from reader
fromReader := func(from int64) error {
bytesRead = 0
for from+int64(bytesRead) < d.ChunkSize {
nn, err := reader.Read(buf[from+int64(bytesRead):])
totalRead += int64(nn)
bytesRead += nn
if err != nil {
if err != io.EOF {
return err
}
break
}
}
if putErrChan == nil {
putErrChan = make(chan error)
} else {
if putErr := <-putErrChan; putErr != nil {
putErrChan = nil
return putErr
}
}
go func(bytesRead int, from int64, buf []byte) {
defer d.putbuf(buf) // this buffer gets dropped after this call
// DRAGONS(stevvooe): There are few things one might want to know
// about this section. First, the putErrChan is expecting an error
// and a nil or just a nil to come through the channel. This is
// covered by the silly defer below. The other aspect is the s3
// retry backoff to deal with RequestTimeout errors. Even though
// the underlying s3 library should handle it, it doesn't seem to
// be part of the shouldRetry function (see AdRoll/goamz/s3).
defer func() {
select {
case putErrChan <- nil: // for some reason, we do this no matter what.
case <-done:
return // ensure we don't leak the goroutine
}
}()
if bytesRead <= 0 {
return
}
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
PartNumber: aws.Int64(partNumber),
UploadId: uploadID,
Body: bytes.NewReader(buf[0 : int64(bytesRead)+from]),
})
if err != nil {
logrus.Errorf("error putting part, aborting: %v", err)
select {
case putErrChan <- err:
case <-done:
return // don't leak the goroutine
}
}
// parts and partNumber are safe, because this function is the
// only one modifying them and we force it to be executed
// serially.
parts = append(parts, &s3.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int64(partNumber),
})
partNumber++
}(bytesRead, from, buf)
buf = d.getbuf() // use a new buffer for the next call
return nil
}
if offset > 0 {
resp, err := d.S3.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
key := d.s3Path(path)
if !append {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
if s3Err, ok := err.(awserr.Error); !ok || s3Err.Code() != "NoSuchKey" {
return 0, err
}
}
currentLength := int64(0)
if err == nil && resp.ContentLength != nil {
currentLength = *resp.ContentLength
}
if currentLength >= offset {
if offset < d.ChunkSize {
// chunkSize > currentLength >= offset
if err = fromSmallCurrent(offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < d.ChunkSize {
return totalRead, nil
}
} else {
// currentLength >= offset >= chunkSize
resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
PartNumber: aws.Int64(partNumber),
UploadId: uploadID,
CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)),
CopySourceRange: aws.String("bytes=0-" + strconv.FormatInt(offset-1, 10)),
})
if err != nil {
return 0, err
}
parts = append(parts, &s3.CompletedPart{
ETag: resp.CopyPartResult.ETag,
PartNumber: aws.Int64(partNumber),
})
partNumber++
}
} else {
// Fills between parameters with 0s but only when to - from <= chunkSize
fromZeroFillSmall := func(from, to int64) error {
bytesRead = 0
for from+int64(bytesRead) < to {
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
bytesRead += nn
if err != nil {
return err
}
}
return nil
}
// Fills between parameters with 0s, making new parts
fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0)
for to-(from+bytesRead64) >= d.ChunkSize {
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
PartNumber: aws.Int64(partNumber),
UploadId: uploadID,
Body: bytes.NewReader(d.zeros),
})
if err != nil {
return err
}
bytesRead64 += d.ChunkSize
parts = append(parts, &s3.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int64(partNumber),
})
partNumber++
}
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
}
// currentLength < offset
if currentLength < d.ChunkSize {
if offset < d.ChunkSize {
// chunkSize > offset > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < d.ChunkSize {
return totalRead, nil
}
} else {
// offset >= chunkSize > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
return totalRead, err
}
resp, err := d.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
PartNumber: aws.Int64(partNumber),
UploadId: uploadID,
Body: bytes.NewReader(buf),
})
if err != nil {
return totalRead, err
}
parts = append(parts, &s3.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int64(partNumber),
})
partNumber++
//Zero fill from chunkSize up to offset, then some reader
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset % d.ChunkSize); err != nil {
return totalRead, err
}
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
return totalRead, nil
}
}
} else {
// offset > currentLength >= chunkSize
resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
PartNumber: aws.Int64(partNumber),
UploadId: uploadID,
CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)),
})
if err != nil {
return 0, err
}
parts = append(parts, &s3.CompletedPart{
ETag: resp.CopyPartResult.ETag,
PartNumber: aws.Int64(partNumber),
})
partNumber++
//Zero fill from currentLength up to offset, then some reader
if err = fromZeroFillLarge(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
return totalRead, err
}
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
return totalRead, nil
}
}
return nil, err
}
return d.newWriter(key, *resp.UploadId, nil), nil
}
resp, err := d.S3.ListMultipartUploads(&s3.ListMultipartUploadsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(key),
})
if err != nil {
return nil, parseError(path, err)
}
for {
if err = fromReader(0); err != nil {
return totalRead, err
for _, multi := range resp.Uploads {
if key != *multi.Key {
continue
}
if int64(bytesRead) < d.ChunkSize {
break
resp, err := d.S3.ListParts(&s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
})
if err != nil {
return nil, parseError(path, err)
}
var multiSize int64
for _, part := range resp.Parts {
multiSize += *part.Size
}
return d.newWriter(key, *multi.UploadId, resp.Parts), nil
}
return totalRead, nil
return nil, storagedriver.PathNotFoundError{Path: path}
}
// Stat retrieves the FileInfo for the given path, including the current size
@ -971,12 +641,258 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass)
}
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
func (d *driver) getbuf() []byte {
return d.pool.Get().([]byte)
// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
type writer struct {
driver *driver
key string
uploadID string
parts []*s3.Part
size int64
readyPart []byte
pendingPart []byte
closed bool
committed bool
cancelled bool
}
func (d *driver) putbuf(p []byte) {
copy(p, d.zeros)
d.pool.Put(p)
func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter {
var size int64
for _, part := range parts {
size += *part.Size
}
return &writer{
driver: d,
key: key,
uploadID: uploadID,
parts: parts,
size: size,
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
}
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
var completedParts []*s3.CompletedPart
for _, part := range w.parts {
completedParts = append(completedParts, &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
_, err := w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
return 0, err
}
resp, err := w.driver.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
ContentType: w.driver.getContentType(),
ACL: w.driver.getACL(),
ServerSideEncryption: w.driver.getEncryptionMode(),
StorageClass: w.driver.getStorageClass(),
})
if err != nil {
return 0, err
}
w.uploadID = *resp.UploadId
// If the entire written file is smaller than minChunkSize, we need to make
// a new part from scratch :double sad face:
if w.size < minChunkSize {
resp, err := w.driver.S3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
})
defer resp.Body.Close()
if err != nil {
return 0, err
}
w.parts = nil
w.readyPart, err = ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}
} else {
// Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(w.driver.Bucket),
CopySource: aws.String(w.driver.Bucket + "/" + w.key),
Key: aws.String(w.key),
PartNumber: aws.Int64(1),
UploadId: resp.UploadId,
})
if err != nil {
return 0, err
}
w.parts = []*s3.Part{
{
ETag: copyPartResp.CopyPartResult.ETag,
PartNumber: aws.Int64(1),
Size: aws.Int64(w.size),
},
}
}
}
var n int
for len(p) > 0 {
// If no parts are ready to write, fill up the first part
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
} else {
w.readyPart = append(w.readyPart, p...)
n += len(p)
p = nil
}
}
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
err := w.flushPart()
if err != nil {
w.size += int64(n)
return n, err
}
} else {
w.pendingPart = append(w.pendingPart, p...)
n += len(p)
p = nil
}
}
}
w.size += int64(n)
return n, nil
}
func (w *writer) Size() int64 {
return w.size
}
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
return w.flushPart()
}
func (w *writer) Cancel() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
}
w.cancelled = true
_, err := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
return err
}
func (w *writer) Commit() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
err := w.flushPart()
if err != nil {
return err
}
w.committed = true
var completedParts []*s3.CompletedPart
for _, part := range w.parts {
completedParts = append(completedParts, &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
_, err = w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
return err
}
return nil
}
// flushPart flushes buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flushPart() error {
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
// nothing to write
return nil
}
if len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w.readyPart = append(w.readyPart, w.pendingPart...)
w.pendingPart = nil
}
partNumber := aws.Int64(int64(len(w.parts) + 1))
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
PartNumber: partNumber,
UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(w.readyPart),
})
if err != nil {
return err
}
w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag,
PartNumber: partNumber,
Size: aws.Int64(int64(len(w.readyPart))),
})
w.readyPart = w.pendingPart
w.pendingPart = nil
return nil
}