Merge pull request #1906 from nwt/s3-multipart-copy

Use multipart upload API in S3 Move method
This commit is contained in:
Richard Scothern 2016-08-17 15:22:32 -07:00 committed by GitHub
commit 010e063270
4 changed files with 305 additions and 61 deletions

View file

@ -99,6 +99,9 @@ information about each option that appears later in this page.
secure: true secure: true
v4auth: true v4auth: true
chunksize: 5242880 chunksize: 5242880
multipartcopychunksize: 33554432
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix rootdirectory: /s3/object/name/prefix
swift: swift:
username: username username: username
@ -382,6 +385,9 @@ Permitted values are `error`, `warn`, `info` and `debug`. The default is
secure: true secure: true
v4auth: true v4auth: true
chunksize: 5242880 chunksize: 5242880
multipartcopychunksize: 33554432
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
rootdirectory: /s3/object/name/prefix rootdirectory: /s3/object/name/prefix
swift: swift:
username: username username: username

View file

@ -138,6 +138,42 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Amaz
should be a number that is larger than 5*1024*1024. should be a number that is larger than 5*1024*1024.
</td> </td>
</tr> </tr>
<tr>
<td>
<code>multipartcopychunksize</code>
</td>
<td>
no
</td>
<td>
Chunk size for all but the last Upload Part - Copy
operation of a copy that uses the multipart upload API.
</td>
</tr>
<tr>
<td>
<code>multipartcopymaxconcurrency</code>
</td>
<td>
no
</td>
<td>
Maximum number of concurrent Upload Part - Copy operations for a
copy that uses the multipart upload API.
</td>
</tr>
<tr>
<td>
<code>multipartcopythresholdsize</code>
</td>
<td>
no
</td>
<td>
Objects above this size will be copied using the multipart upload API.
PUT Object - Copy is used for objects at or below this size.
</td>
</tr>
<tr> <tr>
<td> <td>
<code>rootdirectory</code> <code>rootdirectory</code>

View file

@ -16,6 +16,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"reflect" "reflect"
"sort" "sort"
@ -45,8 +46,27 @@ const driverName = "s3aws"
// S3 API requires multipart upload chunks to be at least 5MB // S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20 const minChunkSize = 5 << 20
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
const maxChunkSize = 5 << 30
const defaultChunkSize = 2 * minChunkSize const defaultChunkSize = 2 * minChunkSize
const (
// defaultMultipartCopyChunkSize defines the default chunk size for all
// but the last Upload Part - Copy operation of a multipart copy.
// Empirically, 32 MB is optimal.
defaultMultipartCopyChunkSize = 32 << 20
// defaultMultipartCopyMaxConcurrency defines the default maximum number
// of concurrent Upload Part - Copy operations for a multipart copy.
defaultMultipartCopyMaxConcurrency = 100
// defaultMultipartCopyThresholdSize defines the default object size
// above which multipart copy will be used. (PUT Object - Copy is used
// for objects at or below this size.) Empirically, 32 MB is optimal.
defaultMultipartCopyThresholdSize = 32 << 20
)
// listMax is the largest amount of objects you can request from S3 in a list call // listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000 const listMax = 1000
@ -58,19 +78,22 @@ var validObjectAcls = map[string]struct{}{}
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set //DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct { type DriverParameters struct {
AccessKey string AccessKey string
SecretKey string SecretKey string
Bucket string Bucket string
Region string Region string
RegionEndpoint string RegionEndpoint string
Encrypt bool Encrypt bool
KeyID string KeyID string
Secure bool Secure bool
ChunkSize int64 ChunkSize int64
RootDirectory string MultipartCopyChunkSize int64
StorageClass string MultipartCopyMaxConcurrency int64
UserAgent string MultipartCopyThresholdSize int64
ObjectAcl string RootDirectory string
StorageClass string
UserAgent string
ObjectAcl string
} }
func init() { func init() {
@ -116,14 +139,17 @@ func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (stora
} }
type driver struct { type driver struct {
S3 *s3.S3 S3 *s3.S3
Bucket string Bucket string
ChunkSize int64 ChunkSize int64
Encrypt bool Encrypt bool
KeyID string KeyID string
RootDirectory string MultipartCopyChunkSize int64
StorageClass string MultipartCopyMaxConcurrency int64
ObjectAcl string MultipartCopyThresholdSize int64
RootDirectory string
StorageClass string
ObjectAcl string
} }
type baseEmbed struct { type baseEmbed struct {
@ -217,27 +243,24 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
keyID = "" keyID = ""
} }
chunkSize := int64(defaultChunkSize) chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
chunkSizeParam := parameters["chunksize"] if err != nil {
switch v := chunkSizeParam.(type) { return nil, err
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
}
chunkSize = vv
case int64:
chunkSize = v
case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for chunksize: %#v", chunkSizeParam)
} }
if chunkSize < minChunkSize { multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) if err != nil {
return nil, err
}
multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
if err != nil {
return nil, err
}
multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
if err != nil {
return nil, err
} }
rootDirectory := parameters["rootdirectory"] rootDirectory := parameters["rootdirectory"]
@ -289,6 +312,9 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
fmt.Sprint(keyID), fmt.Sprint(keyID),
secureBool, secureBool,
chunkSize, chunkSize,
multipartCopyChunkSize,
multipartCopyMaxConcurrency,
multipartCopyThresholdSize,
fmt.Sprint(rootDirectory), fmt.Sprint(rootDirectory),
storageClass, storageClass,
fmt.Sprint(userAgent), fmt.Sprint(userAgent),
@ -298,6 +324,35 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return New(params) return New(params)
} }
// getParameterAsInt64 converts paramaters[name] to an int64 value (using
// defaultt if nil), verifies it is no smaller than min, and returns it.
func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) {
rv := defaultt
param := parameters[name]
switch v := param.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param)
}
rv = vv
case int64:
rv = v
case int, uint, int32, uint32, uint64:
rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int()
case nil:
// do nothing
default:
return 0, fmt.Errorf("invalid value for %s: %#v", name, param)
}
if rv < min || rv > max {
return 0, fmt.Errorf("The %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max)
}
return rv, nil
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and // New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName // bucketName
func New(params DriverParameters) (*Driver, error) { func New(params DriverParameters) (*Driver, error) {
@ -346,14 +401,17 @@ func New(params DriverParameters) (*Driver, error) {
// } // }
d := &driver{ d := &driver{
S3: s3obj, S3: s3obj,
Bucket: params.Bucket, Bucket: params.Bucket,
ChunkSize: params.ChunkSize, ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt, Encrypt: params.Encrypt,
KeyID: params.KeyID, KeyID: params.KeyID,
RootDirectory: params.RootDirectory, MultipartCopyChunkSize: params.MultipartCopyChunkSize,
StorageClass: params.StorageClass, MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
ObjectAcl: params.ObjectAcl, MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectAcl: params.ObjectAcl,
} }
return &Driver{ return &Driver{
@ -565,21 +623,106 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
// object. // object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */ /* This is terrible, but aws doesn't have an actual move. */
_, err := d.S3.CopyObject(&s3.CopyObjectInput{ if err := d.copy(ctx, sourcePath, destPath); err != nil {
Bucket: aws.String(d.Bucket), return err
Key: aws.String(d.s3Path(destPath)), }
ContentType: d.getContentType(), return d.Delete(ctx, sourcePath)
ACL: d.getACL(), }
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(), // copy copies an object stored at sourcePath to destPath.
StorageClass: d.getStorageClass(), func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error {
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), // S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
}) // operation. For larger objects, the multipart upload API must be used.
//
// Empirically, multipart copy is fastest with 32 MB parts and is faster
// than PUT Object - Copy for objects larger than 32 MB.
fileInfo, err := d.Stat(ctx, sourcePath)
if err != nil { if err != nil {
return parseError(sourcePath, err) return parseError(sourcePath, err)
} }
return d.Delete(ctx, sourcePath) if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
})
if err != nil {
return parseError(sourcePath, err)
}
return nil
}
// Even in the worst case, a multipart copy should take no more
// than a few minutes, so 30 minutes is very conservative.
expires := time.Now().Add(time.Duration(30) * time.Minute)
createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
Expires: aws.Time(expires),
SSEKMSKeyId: d.getSSEKMSKeyID(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return err
}
numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
completedParts := make([]*s3.CompletedPart, numParts)
errChan := make(chan error, numParts)
limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency)
for i := range completedParts {
i := int64(i)
go func() {
limiter <- struct{}{}
firstByte := i * d.MultipartCopyChunkSize
lastByte := firstByte + d.MultipartCopyChunkSize - 1
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
PartNumber: aws.Int64(i + 1),
UploadId: createResp.UploadId,
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
})
if err == nil {
completedParts[i] = &s3.CompletedPart{
ETag: uploadResp.CopyPartResult.ETag,
PartNumber: aws.Int64(i + 1),
}
}
errChan <- err
<-limiter
}()
}
for range completedParts {
err := <-errChan
if err != nil {
return err
}
}
_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
})
return err
} }
func min(a, b int) int { func min(a, b int) int {

View file

@ -1,19 +1,21 @@
package s3 package s3
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"strconv" "strconv"
"testing" "testing"
"gopkg.in/check.v1"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites" "github.com/docker/distribution/registry/storage/driver/testsuites"
"gopkg.in/check.v1"
) )
// Hook up gocheck into the "go test" runner. // Hook up gocheck into the "go test" runner.
@ -65,6 +67,9 @@ func init() {
keyID, keyID,
secureBool, secureBool,
minChunkSize, minChunkSize,
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
rootDirectory, rootDirectory,
storageClass, storageClass,
driverName + "-test", driverName + "-test",
@ -238,3 +243,57 @@ func TestOverThousandBlobs(t *testing.T) {
t.Fatalf("unexpected error deleting thousand files: %v", err) t.Fatalf("unexpected error deleting thousand files: %v", err)
} }
} }
func TestMoveWithMultipartCopy(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}
rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(rootDir)
d, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver: %v", err)
}
ctx := context.Background()
sourcePath := "/source"
destPath := "/dest"
defer d.Delete(ctx, sourcePath)
defer d.Delete(ctx, destPath)
// An object larger than d's MultipartCopyThresholdSize will cause d.Move() to perform a multipart copy.
multipartCopyThresholdSize := d.baseEmbed.Base.StorageDriver.(*driver).MultipartCopyThresholdSize
contents := make([]byte, 2*multipartCopyThresholdSize)
rand.Read(contents)
err = d.PutContent(ctx, sourcePath, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
err = d.Move(ctx, sourcePath, destPath)
if err != nil {
t.Fatalf("unexpected error moving file: %v", err)
}
received, err := d.GetContent(ctx, destPath)
if err != nil {
t.Fatalf("unexpected error getting content: %v", err)
}
if !bytes.Equal(contents, received) {
t.Fatal("content differs")
}
_, err = d.GetContent(ctx, sourcePath)
switch err.(type) {
case storagedriver.PathNotFoundError:
default:
t.Fatalf("unexpected error getting content: %v", err)
}
}