2015-02-11 02:14:23 +00:00
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
2016-01-28 23:48:49 +00:00
// This package leverages the docker/goamz client library for interfacing with
2016-01-22 02:17:53 +00:00
// S3. It is intended to be deprecated in favor of the s3-aws driver
// implementation.
2015-02-11 02:14:23 +00:00
//
2016-01-22 02:17:53 +00:00
// Because S3 is a key, value store the Stat call does not support last modification
2015-02-11 02:14:23 +00:00
// time for directories (directories are an abstraction for key, value stores)
//
2016-01-22 02:17:53 +00:00
// Keep in mind that S3 guarantees only read-after-write consistency for new
// objects, but no read-after-update or list-after-write consistency.
2015-02-11 02:14:23 +00:00
package s3
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
2015-04-22 21:31:34 +00:00
"reflect"
2015-02-11 02:14:23 +00:00
"strconv"
"strings"
"time"
2016-01-21 00:40:58 +00:00
"github.com/docker/goamz/aws"
"github.com/docker/goamz/s3"
2015-04-27 22:58:58 +00:00
"github.com/docker/distribution/context"
2016-01-21 00:40:58 +00:00
"github.com/docker/distribution/registry/client/transport"
2015-02-11 02:14:23 +00:00
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
)
2016-01-22 02:17:53 +00:00
const driverName = "s3goamz"
2015-02-11 02:14:23 +00:00
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Region aws . Region
Encrypt bool
Secure bool
V4Auth bool
ChunkSize int64
RootDirectory string
2016-01-28 23:48:49 +00:00
StorageClass s3 . StorageClass
2016-01-21 00:40:58 +00:00
UserAgent string
2015-02-11 02:14:23 +00:00
}
func init ( ) {
factory . Register ( driverName , & s3DriverFactory { } )
}
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct { }
func ( factory * s3DriverFactory ) Create ( parameters map [ string ] interface { } ) ( storagedriver . StorageDriver , error ) {
return FromParameters ( parameters )
}
type driver struct {
S3 * s3 . S3
Bucket * s3 . Bucket
ChunkSize int64
Encrypt bool
RootDirectory string
2016-01-28 23:48:49 +00:00
StorageClass s3 . StorageClass
2015-02-11 02:14:23 +00:00
}
type baseEmbed struct {
base . Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters ( parameters map [ string ] interface { } ) ( * Driver , error ) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
2016-02-03 03:30:48 +00:00
accessKey := parameters [ "accesskey" ]
if accessKey == nil {
2015-02-11 02:14:23 +00:00
accessKey = ""
}
2016-02-03 03:30:48 +00:00
secretKey := parameters [ "secretkey" ]
if secretKey == nil {
2015-02-11 02:14:23 +00:00
secretKey = ""
}
2016-02-03 03:30:48 +00:00
regionName := parameters [ "region" ]
if regionName == nil || fmt . Sprint ( regionName ) == "" {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "No region parameter provided" )
}
region := aws . GetRegion ( fmt . Sprint ( regionName ) )
if region . Name == "" {
return nil , fmt . Errorf ( "Invalid region provided: %v" , region )
}
2016-02-03 03:30:48 +00:00
bucket := parameters [ "bucket" ]
if bucket == nil || fmt . Sprint ( bucket ) == "" {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "No bucket parameter provided" )
}
encryptBool := false
2016-02-03 03:30:48 +00:00
encrypt := parameters [ "encrypt" ]
switch encrypt := encrypt . ( type ) {
case string :
b , err := strconv . ParseBool ( encrypt )
if err != nil {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
encryptBool = b
case bool :
encryptBool = encrypt
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
2015-02-11 02:14:23 +00:00
}
secureBool := true
2016-02-03 03:30:48 +00:00
secure := parameters [ "secure" ]
switch secure := secure . ( type ) {
case string :
b , err := strconv . ParseBool ( secure )
if err != nil {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
secureBool = b
case bool :
secureBool = secure
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
2015-02-11 02:14:23 +00:00
}
v4AuthBool := false
2016-02-03 03:30:48 +00:00
v4Auth := parameters [ "v4auth" ]
switch v4Auth := v4Auth . ( type ) {
case string :
b , err := strconv . ParseBool ( v4Auth )
if err != nil {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "The v4auth parameter should be a boolean" )
}
2016-02-03 03:30:48 +00:00
v4AuthBool = b
case bool :
v4AuthBool = v4Auth
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The v4auth parameter should be a boolean" )
2015-02-11 02:14:23 +00:00
}
chunkSize := int64 ( defaultChunkSize )
2016-02-03 03:30:48 +00:00
chunkSizeParam := parameters [ "chunksize" ]
switch v := chunkSizeParam . ( type ) {
case string :
vv , err := strconv . ParseInt ( v , 0 , 64 )
if err != nil {
return nil , fmt . Errorf ( "chunksize parameter must be an integer, %v invalid" , chunkSizeParam )
2015-04-22 21:31:34 +00:00
}
2016-02-03 03:30:48 +00:00
chunkSize = vv
case int64 :
chunkSize = v
case int , uint , int32 , uint32 , uint64 :
chunkSize = reflect . ValueOf ( v ) . Convert ( reflect . TypeOf ( chunkSize ) ) . Int ( )
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "invalid value for chunksize: %#v" , chunkSizeParam )
}
2015-04-22 21:31:34 +00:00
2016-02-03 03:30:48 +00:00
if chunkSize < minChunkSize {
return nil , fmt . Errorf ( "The chunksize %#v parameter should be a number that is larger than or equal to %d" , chunkSize , minChunkSize )
2015-02-11 02:14:23 +00:00
}
2016-02-03 03:30:48 +00:00
rootDirectory := parameters [ "rootdirectory" ]
if rootDirectory == nil {
2015-02-11 02:14:23 +00:00
rootDirectory = ""
}
2016-01-28 23:48:49 +00:00
storageClass := s3 . StandardStorage
2016-02-03 03:30:48 +00:00
storageClassParam := parameters [ "storageclass" ]
if storageClassParam != nil {
2016-01-28 23:48:49 +00:00
storageClassString , ok := storageClassParam . ( string )
if ! ok {
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] s3 . StorageClass { s3 . StandardStorage , s3 . ReducedRedundancy } , storageClassParam )
}
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
storageClassCasted := s3 . StorageClass ( strings . ToUpper ( storageClassString ) )
if storageClassCasted != s3 . StandardStorage && storageClassCasted != s3 . ReducedRedundancy {
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] s3 . StorageClass { s3 . StandardStorage , s3 . ReducedRedundancy } , storageClassParam )
}
storageClass = storageClassCasted
}
2016-02-03 03:30:48 +00:00
userAgent := parameters [ "useragent" ]
if userAgent == nil {
2016-01-21 00:40:58 +00:00
userAgent = ""
}
2015-02-11 02:14:23 +00:00
params := DriverParameters {
fmt . Sprint ( accessKey ) ,
fmt . Sprint ( secretKey ) ,
fmt . Sprint ( bucket ) ,
region ,
encryptBool ,
secureBool ,
v4AuthBool ,
chunkSize ,
fmt . Sprint ( rootDirectory ) ,
2016-01-28 23:48:49 +00:00
storageClass ,
2016-01-21 00:40:58 +00:00
fmt . Sprint ( userAgent ) ,
2015-02-11 02:14:23 +00:00
}
return New ( params )
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New ( params DriverParameters ) ( * Driver , error ) {
auth , err := aws . GetAuth ( params . AccessKey , params . SecretKey , "" , time . Time { } )
if err != nil {
2015-05-22 23:45:45 +00:00
return nil , fmt . Errorf ( "unable to resolve aws credentials, please ensure that 'accesskey' and 'secretkey' are properly set or the credentials are available in $HOME/.aws/credentials: %v" , err )
2015-02-11 02:14:23 +00:00
}
if ! params . Secure {
params . Region . S3Endpoint = strings . Replace ( params . Region . S3Endpoint , "https" , "http" , 1 )
}
s3obj := s3 . New ( auth , params . Region )
2016-01-21 00:40:58 +00:00
if params . UserAgent != "" {
s3obj . Client = & http . Client {
Transport : transport . NewTransport ( http . DefaultTransport ,
transport . NewHeaderRequestModifier ( http . Header {
http . CanonicalHeaderKey ( "User-Agent" ) : [ ] string { params . UserAgent } ,
} ) ,
) ,
}
}
2015-02-11 02:14:23 +00:00
if params . V4Auth {
s3obj . Signature = aws . V4Signature
} else {
if params . Region . Name == "eu-central-1" {
return nil , fmt . Errorf ( "The eu-central-1 region only works with v4 authentication" )
}
}
2016-01-21 00:40:58 +00:00
bucket := s3obj . Bucket ( params . Bucket )
2015-02-11 02:14:23 +00:00
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
d := & driver {
S3 : s3obj ,
Bucket : bucket ,
ChunkSize : params . ChunkSize ,
Encrypt : params . Encrypt ,
RootDirectory : params . RootDirectory ,
2016-01-28 23:48:49 +00:00
StorageClass : params . StorageClass ,
2015-02-11 02:14:23 +00:00
}
return & Driver {
baseEmbed : baseEmbed {
Base : base . Base {
StorageDriver : d ,
} ,
} ,
} , nil
}
// Implement the storagedriver.StorageDriver interface
2015-04-23 00:30:01 +00:00
func ( d * driver ) Name ( ) string {
return driverName
}
2015-02-11 02:14:23 +00:00
// GetContent retrieves the content stored at "path" as a []byte.
2015-04-27 22:58:58 +00:00
func ( d * driver ) GetContent ( ctx context . Context , path string ) ( [ ] byte , error ) {
2015-02-11 02:14:23 +00:00
content , err := d . Bucket . Get ( d . s3Path ( path ) )
if err != nil {
return nil , parseError ( path , err )
}
return content , nil
}
// PutContent stores the []byte content at a location designated by "path".
2015-04-27 22:58:58 +00:00
func ( d * driver ) PutContent ( ctx context . Context , path string , contents [ ] byte ) error {
2015-02-11 02:14:23 +00:00
return parseError ( path , d . Bucket . Put ( d . s3Path ( path ) , contents , d . getContentType ( ) , getPermissions ( ) , d . getOptions ( ) ) )
}
2016-02-08 22:29:21 +00:00
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
2015-02-11 02:14:23 +00:00
// given byte offset.
2016-02-08 22:29:21 +00:00
func ( d * driver ) Reader ( ctx context . Context , path string , offset int64 ) ( io . ReadCloser , error ) {
2015-02-11 02:14:23 +00:00
headers := make ( http . Header )
headers . Add ( "Range" , "bytes=" + strconv . FormatInt ( offset , 10 ) + "-" )
resp , err := d . Bucket . GetResponseWithHeaders ( d . s3Path ( path ) , headers )
if err != nil {
if s3Err , ok := err . ( * s3 . Error ) ; ok && s3Err . Code == "InvalidRange" {
return ioutil . NopCloser ( bytes . NewReader ( nil ) ) , nil
}
return nil , parseError ( path , err )
}
return resp . Body , nil
}
2016-02-08 22:29:21 +00:00
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func ( d * driver ) Writer ( ctx context . Context , path string , append bool ) ( storagedriver . FileWriter , error ) {
key := d . s3Path ( path )
if ! append {
// TODO (brianbland): cancel other uploads at this path
multi , err := d . Bucket . InitMulti ( key , d . getContentType ( ) , getPermissions ( ) , d . getOptions ( ) )
2015-02-11 02:14:23 +00:00
if err != nil {
2016-02-08 22:29:21 +00:00
return nil , err
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
return d . newWriter ( key , multi , nil ) , nil
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
multis , _ , err := d . Bucket . ListMulti ( key , "" )
if err != nil {
return nil , parseError ( path , err )
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
for _ , multi := range multis {
if key != multi . Key {
continue
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
parts , err := multi . ListParts ( )
if err != nil {
return nil , parseError ( path , err )
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
var multiSize int64
for _ , part := range parts {
multiSize += part . Size
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
return d . newWriter ( key , multi , parts ) , nil
2015-02-11 02:14:23 +00:00
}
2016-02-08 22:29:21 +00:00
return nil , storagedriver . PathNotFoundError { Path : path }
2015-02-11 02:14:23 +00:00
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Stat ( ctx context . Context , path string ) ( storagedriver . FileInfo , error ) {
2015-02-11 02:14:23 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "" , "" , 1 )
if err != nil {
return nil , err
}
fi := storagedriver . FileInfoFields {
Path : path ,
}
if len ( listResponse . Contents ) == 1 {
if listResponse . Contents [ 0 ] . Key != d . s3Path ( path ) {
fi . IsDir = true
} else {
fi . IsDir = false
fi . Size = listResponse . Contents [ 0 ] . Size
timestamp , err := time . Parse ( time . RFC3339Nano , listResponse . Contents [ 0 ] . LastModified )
if err != nil {
return nil , err
}
fi . ModTime = timestamp
}
} else if len ( listResponse . CommonPrefixes ) == 1 {
fi . IsDir = true
} else {
return nil , storagedriver . PathNotFoundError { Path : path }
}
return storagedriver . FileInfoInternal { FileInfoFields : fi } , nil
}
// List returns a list of the objects that are direct descendants of the given path.
2015-12-08 19:02:40 +00:00
func ( d * driver ) List ( ctx context . Context , opath string ) ( [ ] string , error ) {
path := opath
2015-02-11 02:14:23 +00:00
if path != "/" && path [ len ( path ) - 1 ] != '/' {
path = path + "/"
}
2015-02-20 00:31:34 +00:00
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d . s3Path ( "" ) == "" {
prefix = "/"
}
2015-02-11 02:14:23 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "/" , "" , listMax )
if err != nil {
2015-12-08 19:02:40 +00:00
return nil , parseError ( opath , err )
2015-11-24 22:23:12 +00:00
}
2015-02-11 02:14:23 +00:00
files := [ ] string { }
directories := [ ] string { }
for {
for _ , key := range listResponse . Contents {
2015-02-20 00:31:34 +00:00
files = append ( files , strings . Replace ( key . Key , d . s3Path ( "" ) , prefix , 1 ) )
2015-02-11 02:14:23 +00:00
}
for _ , commonPrefix := range listResponse . CommonPrefixes {
2015-02-20 00:31:34 +00:00
directories = append ( directories , strings . Replace ( commonPrefix [ 0 : len ( commonPrefix ) - 1 ] , d . s3Path ( "" ) , prefix , 1 ) )
2015-02-11 02:14:23 +00:00
}
if listResponse . IsTruncated {
listResponse , err = d . Bucket . List ( d . s3Path ( path ) , "/" , listResponse . NextMarker , listMax )
if err != nil {
return nil , err
}
} else {
break
}
}
2015-12-08 19:02:40 +00:00
if opath != "/" {
if len ( files ) == 0 && len ( directories ) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in s3.
return nil , storagedriver . PathNotFoundError { Path : opath }
}
}
2015-02-11 02:14:23 +00:00
return append ( files , directories ... ) , nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Move ( ctx context . Context , sourcePath string , destPath string ) error {
2015-02-11 02:14:23 +00:00
/* This is terrible, but aws doesn't have an actual move. */
_ , err := d . Bucket . PutCopy ( d . s3Path ( destPath ) , getPermissions ( ) ,
s3 . CopyOptions { Options : d . getOptions ( ) , ContentType : d . getContentType ( ) } , d . Bucket . Name + "/" + d . s3Path ( sourcePath ) )
if err != nil {
return parseError ( sourcePath , err )
}
2015-04-27 22:58:58 +00:00
return d . Delete ( ctx , sourcePath )
2015-02-11 02:14:23 +00:00
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Delete ( ctx context . Context , path string ) error {
2015-02-11 02:14:23 +00:00
listResponse , err := d . Bucket . List ( d . s3Path ( path ) , "" , "" , listMax )
if err != nil || len ( listResponse . Contents ) == 0 {
return storagedriver . PathNotFoundError { Path : path }
}
s3Objects := make ( [ ] s3 . Object , listMax )
for len ( listResponse . Contents ) > 0 {
for index , key := range listResponse . Contents {
s3Objects [ index ] . Key = key . Key
}
err := d . Bucket . DelMulti ( s3 . Delete { Quiet : false , Objects : s3Objects [ 0 : len ( listResponse . Contents ) ] } )
if err != nil {
return nil
}
listResponse , err = d . Bucket . List ( d . s3Path ( path ) , "" , "" , listMax )
if err != nil {
return err
}
}
return nil
}
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
2015-04-27 22:58:58 +00:00
func ( d * driver ) URLFor ( ctx context . Context , path string , options map [ string ] interface { } ) ( string , error ) {
2015-02-11 02:14:23 +00:00
methodString := "GET"
method , ok := options [ "method" ]
if ok {
methodString , ok = method . ( string )
if ! ok || ( methodString != "GET" && methodString != "HEAD" ) {
2015-11-02 21:23:53 +00:00
return "" , storagedriver . ErrUnsupportedMethod { }
2015-02-11 02:14:23 +00:00
}
}
expiresTime := time . Now ( ) . Add ( 20 * time . Minute )
expires , ok := options [ "expiry" ]
if ok {
et , ok := expires . ( time . Time )
if ok {
expiresTime = et
}
}
return d . Bucket . SignedURLWithMethod ( methodString , d . s3Path ( path ) , expiresTime , nil , nil ) , nil
}
func ( d * driver ) s3Path ( path string ) string {
return strings . TrimLeft ( strings . TrimRight ( d . RootDirectory , "/" ) + path , "/" )
}
2015-04-06 23:23:31 +00:00
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func ( d * Driver ) S3BucketKey ( path string ) string {
return d . StorageDriver . ( * driver ) . s3Path ( path )
}
2015-02-11 02:14:23 +00:00
func parseError ( path string , err error ) error {
if s3Err , ok := err . ( * s3 . Error ) ; ok && s3Err . Code == "NoSuchKey" {
return storagedriver . PathNotFoundError { Path : path }
}
return err
}
func hasCode ( err error , code string ) bool {
s3err , ok := err . ( * aws . Error )
return ok && s3err . Code == code
}
func ( d * driver ) getOptions ( ) s3 . Options {
2016-01-28 23:48:49 +00:00
return s3 . Options {
SSE : d . Encrypt ,
StorageClass : d . StorageClass ,
}
2015-02-11 02:14:23 +00:00
}
func getPermissions ( ) s3 . ACL {
return s3 . Private
}
func ( d * driver ) getContentType ( ) string {
return "application/octet-stream"
}
2015-04-22 22:07:18 +00:00
2016-02-08 22:29:21 +00:00
// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
type writer struct {
driver * driver
key string
multi * s3 . Multi
parts [ ] s3 . Part
size int64
readyPart [ ] byte
pendingPart [ ] byte
closed bool
committed bool
cancelled bool
2015-04-22 22:07:18 +00:00
}
2016-02-08 22:29:21 +00:00
func ( d * driver ) newWriter ( key string , multi * s3 . Multi , parts [ ] s3 . Part ) storagedriver . FileWriter {
var size int64
for _ , part := range parts {
size += part . Size
}
return & writer {
driver : d ,
key : key ,
multi : multi ,
parts : parts ,
size : size ,
}
}
func ( w * writer ) Write ( p [ ] byte ) ( int , error ) {
if w . closed {
return 0 , fmt . Errorf ( "already closed" )
} else if w . committed {
return 0 , fmt . Errorf ( "already committed" )
} else if w . cancelled {
return 0 , fmt . Errorf ( "already cancelled" )
}
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len ( w . parts ) > 0 && int ( w . parts [ len ( w . parts ) - 1 ] . Size ) < minChunkSize {
err := w . multi . Complete ( w . parts )
if err != nil {
w . multi . Abort ( )
return 0 , err
}
multi , err := w . driver . Bucket . InitMulti ( w . key , w . driver . getContentType ( ) , getPermissions ( ) , w . driver . getOptions ( ) )
if err != nil {
return 0 , err
}
w . multi = multi
// If the entire written file is smaller than minChunkSize, we need to make
// a new part from scratch :double sad face:
if w . size < minChunkSize {
contents , err := w . driver . Bucket . Get ( w . key )
if err != nil {
return 0 , err
}
w . parts = nil
w . readyPart = contents
} else {
// Otherwise we can use the old file as the new first part
_ , part , err := multi . PutPartCopy ( 1 , s3 . CopyOptions { } , w . driver . Bucket . Name + "/" + w . key )
if err != nil {
return 0 , err
}
w . parts = [ ] s3 . Part { part }
}
}
var n int
for len ( p ) > 0 {
// If no parts are ready to write, fill up the first part
if neededBytes := int ( w . driver . ChunkSize ) - len ( w . readyPart ) ; neededBytes > 0 {
if len ( p ) >= neededBytes {
w . readyPart = append ( w . readyPart , p [ : neededBytes ] ... )
n += neededBytes
p = p [ neededBytes : ]
} else {
w . readyPart = append ( w . readyPart , p ... )
n += len ( p )
p = nil
}
}
if neededBytes := int ( w . driver . ChunkSize ) - len ( w . pendingPart ) ; neededBytes > 0 {
if len ( p ) >= neededBytes {
w . pendingPart = append ( w . pendingPart , p [ : neededBytes ] ... )
n += neededBytes
p = p [ neededBytes : ]
err := w . flushPart ( )
if err != nil {
w . size += int64 ( n )
return n , err
}
} else {
w . pendingPart = append ( w . pendingPart , p ... )
n += len ( p )
p = nil
}
}
}
w . size += int64 ( n )
return n , nil
}
func ( w * writer ) Size ( ) int64 {
return w . size
}
func ( w * writer ) Close ( ) error {
if w . closed {
return fmt . Errorf ( "already closed" )
}
w . closed = true
return w . flushPart ( )
}
func ( w * writer ) Cancel ( ) error {
if w . closed {
return fmt . Errorf ( "already closed" )
} else if w . committed {
return fmt . Errorf ( "already committed" )
}
w . cancelled = true
err := w . multi . Abort ( )
return err
}
func ( w * writer ) Commit ( ) error {
if w . closed {
return fmt . Errorf ( "already closed" )
} else if w . committed {
return fmt . Errorf ( "already committed" )
} else if w . cancelled {
return fmt . Errorf ( "already cancelled" )
}
err := w . flushPart ( )
if err != nil {
return err
}
w . committed = true
err = w . multi . Complete ( w . parts )
if err != nil {
w . multi . Abort ( )
return err
}
return nil
}
// flushPart flushes buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always)
func ( w * writer ) flushPart ( ) error {
if len ( w . readyPart ) == 0 && len ( w . pendingPart ) == 0 {
// nothing to write
return nil
}
if len ( w . pendingPart ) < int ( w . driver . ChunkSize ) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w . readyPart = append ( w . readyPart , w . pendingPart ... )
w . pendingPart = nil
}
part , err := w . multi . PutPart ( len ( w . parts ) + 1 , bytes . NewReader ( w . readyPart ) )
if err != nil {
return err
}
w . parts = append ( w . parts , part )
w . readyPart = w . pendingPart
w . pendingPart = nil
return nil
2015-04-22 22:07:18 +00:00
}