2015-02-11 02:14:23 +00:00
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
2016-01-22 02:17:53 +00:00
// This package leverages the official aws client library for interfacing with
// S3.
2015-02-11 02:14:23 +00:00
//
2016-01-22 02:17:53 +00:00
// Because S3 is a key, value store the Stat call does not support last modification
2015-02-11 02:14:23 +00:00
// time for directories (directories are an abstraction for key, value stores)
//
2016-01-22 02:17:53 +00:00
// Keep in mind that S3 guarantees only read-after-write consistency for new
// objects, but no read-after-update or list-after-write consistency.
2015-02-11 02:14:23 +00:00
package s3
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
2015-04-22 21:31:34 +00:00
"reflect"
2015-02-11 02:14:23 +00:00
"strconv"
"strings"
2015-04-22 22:07:18 +00:00
"sync"
2015-02-11 02:14:23 +00:00
"time"
2015-04-24 03:07:32 +00:00
"github.com/Sirupsen/logrus"
2016-01-22 02:17:53 +00:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
2015-04-27 22:58:58 +00:00
"github.com/docker/distribution/context"
2016-01-21 00:40:58 +00:00
"github.com/docker/distribution/registry/client/transport"
2015-02-11 02:14:23 +00:00
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
)
2016-01-22 02:17:53 +00:00
const driverName = "s3aws"
2015-02-11 02:14:23 +00:00
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
2016-01-22 02:17:53 +00:00
// validRegions maps known s3 region identifiers to region descriptors
var validRegions = map [ string ] struct { } { }
2015-02-11 02:14:23 +00:00
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
2016-01-22 02:17:53 +00:00
Region string
2015-02-11 02:14:23 +00:00
Encrypt bool
Secure bool
ChunkSize int64
RootDirectory string
2016-01-22 02:17:53 +00:00
StorageClass string
2016-01-21 00:40:58 +00:00
UserAgent string
2015-02-11 02:14:23 +00:00
}
func init ( ) {
2016-01-22 02:17:53 +00:00
for _ , region := range [ ] string {
"us-east-1" ,
"us-west-1" ,
"us-west-2" ,
"eu-west-1" ,
"eu-central-1" ,
"ap-southeast-1" ,
"ap-southeast-2" ,
"ap-northeast-1" ,
"ap-northeast-2" ,
"sa-east-1" ,
} {
validRegions [ region ] = struct { } { }
}
// Register this as the default s3 driver in addition to s3aws
factory . Register ( "s3" , & s3DriverFactory { } )
2015-02-11 02:14:23 +00:00
factory . Register ( driverName , & s3DriverFactory { } )
}
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct { }
func ( factory * s3DriverFactory ) Create ( parameters map [ string ] interface { } ) ( storagedriver . StorageDriver , error ) {
return FromParameters ( parameters )
}
type driver struct {
S3 * s3 . S3
2016-01-22 02:17:53 +00:00
Bucket string
2015-02-11 02:14:23 +00:00
ChunkSize int64
Encrypt bool
RootDirectory string
2016-01-22 02:17:53 +00:00
StorageClass string
2015-04-22 22:07:18 +00:00
pool sync . Pool // pool []byte buffers used for WriteStream
zeros [ ] byte // shared, zero-valued buffer used for WriteStream
2015-02-11 02:14:23 +00:00
}
type baseEmbed struct {
base . Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters ( parameters map [ string ] interface { } ) ( * Driver , error ) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
2016-03-07 19:50:46 +00:00
accessKey := parameters [ "accesskey" ]
if accessKey == nil {
2015-02-11 02:14:23 +00:00
accessKey = ""
}
2016-03-07 19:50:46 +00:00
secretKey := parameters [ "secretkey" ]
if secretKey == nil {
2015-02-11 02:14:23 +00:00
secretKey = ""
}
2016-01-22 02:17:53 +00:00
regionName , ok := parameters [ "region" ]
2016-03-07 19:50:46 +00:00
if regionName == nil || fmt . Sprint ( regionName ) == "" {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "No region parameter provided" )
}
2016-01-22 02:17:53 +00:00
region := fmt . Sprint ( regionName )
_ , ok = validRegions [ region ]
if ! ok {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "Invalid region provided: %v" , region )
}
2016-03-07 19:50:46 +00:00
bucket := parameters [ "bucket" ]
if bucket == nil || fmt . Sprint ( bucket ) == "" {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "No bucket parameter provided" )
}
encryptBool := false
2016-03-07 19:50:46 +00:00
encrypt := parameters [ "encrypt" ]
switch encrypt := encrypt . ( type ) {
case string :
b , err := strconv . ParseBool ( encrypt )
if err != nil {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
}
2016-03-07 19:50:46 +00:00
encryptBool = b
case bool :
encryptBool = encrypt
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The encrypt parameter should be a boolean" )
2015-02-11 02:14:23 +00:00
}
secureBool := true
2016-03-07 19:50:46 +00:00
secure := parameters [ "secure" ]
switch secure := secure . ( type ) {
case string :
b , err := strconv . ParseBool ( secure )
if err != nil {
2015-02-11 02:14:23 +00:00
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
}
2016-03-07 19:50:46 +00:00
secureBool = b
case bool :
secureBool = secure
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "The secure parameter should be a boolean" )
2015-02-11 02:14:23 +00:00
}
chunkSize := int64 ( defaultChunkSize )
2016-03-07 19:50:46 +00:00
chunkSizeParam := parameters [ "chunksize" ]
switch v := chunkSizeParam . ( type ) {
case string :
vv , err := strconv . ParseInt ( v , 0 , 64 )
if err != nil {
return nil , fmt . Errorf ( "chunksize parameter must be an integer, %v invalid" , chunkSizeParam )
2015-04-22 21:31:34 +00:00
}
2016-03-07 19:50:46 +00:00
chunkSize = vv
case int64 :
chunkSize = v
case int , uint , int32 , uint32 , uint64 :
chunkSize = reflect . ValueOf ( v ) . Convert ( reflect . TypeOf ( chunkSize ) ) . Int ( )
case nil :
// do nothing
default :
return nil , fmt . Errorf ( "invalid value for chunksize: %#v" , chunkSizeParam )
}
2015-04-22 21:31:34 +00:00
2016-03-07 19:50:46 +00:00
if chunkSize < minChunkSize {
return nil , fmt . Errorf ( "The chunksize %#v parameter should be a number that is larger than or equal to %d" , chunkSize , minChunkSize )
2015-02-11 02:14:23 +00:00
}
2016-03-07 19:50:46 +00:00
rootDirectory := parameters [ "rootdirectory" ]
if rootDirectory == nil {
2015-02-11 02:14:23 +00:00
rootDirectory = ""
}
2016-01-22 02:17:53 +00:00
storageClass := s3 . StorageClassStandard
2016-03-07 19:50:46 +00:00
storageClassParam := parameters [ "storageclass" ]
if storageClassParam != nil {
2016-01-28 23:48:49 +00:00
storageClassString , ok := storageClassParam . ( string )
if ! ok {
2016-01-22 02:17:53 +00:00
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] string { s3 . StorageClassStandard , s3 . StorageClassReducedRedundancy } , storageClassParam )
2016-01-28 23:48:49 +00:00
}
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
2016-01-22 02:17:53 +00:00
storageClassString = strings . ToUpper ( storageClassString )
if storageClassString != s3 . StorageClassStandard && storageClassString != s3 . StorageClassReducedRedundancy {
return nil , fmt . Errorf ( "The storageclass parameter must be one of %v, %v invalid" , [ ] string { s3 . StorageClassStandard , s3 . StorageClassReducedRedundancy } , storageClassParam )
2016-01-28 23:48:49 +00:00
}
2016-01-22 02:17:53 +00:00
storageClass = storageClassString
2016-01-28 23:48:49 +00:00
}
2016-03-07 19:50:46 +00:00
userAgent := parameters [ "useragent" ]
if userAgent == nil {
2016-01-21 00:40:58 +00:00
userAgent = ""
}
2015-02-11 02:14:23 +00:00
params := DriverParameters {
fmt . Sprint ( accessKey ) ,
fmt . Sprint ( secretKey ) ,
fmt . Sprint ( bucket ) ,
region ,
encryptBool ,
secureBool ,
chunkSize ,
fmt . Sprint ( rootDirectory ) ,
2016-01-28 23:48:49 +00:00
storageClass ,
2016-01-21 00:40:58 +00:00
fmt . Sprint ( userAgent ) ,
2015-02-11 02:14:23 +00:00
}
return New ( params )
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New ( params DriverParameters ) ( * Driver , error ) {
2016-01-22 02:17:53 +00:00
awsConfig := aws . NewConfig ( )
creds := credentials . NewChainCredentials ( [ ] credentials . Provider {
& credentials . StaticProvider {
Value : credentials . Value {
AccessKeyID : params . AccessKey ,
SecretAccessKey : params . SecretKey ,
} ,
} ,
& credentials . EnvProvider { } ,
& credentials . SharedCredentialsProvider { } ,
& ec2rolecreds . EC2RoleProvider { Client : ec2metadata . New ( session . New ( ) ) } ,
} )
2015-02-11 02:14:23 +00:00
2016-01-22 02:17:53 +00:00
awsConfig . WithCredentials ( creds )
awsConfig . WithRegion ( params . Region )
awsConfig . WithDisableSSL ( ! params . Secure )
// awsConfig.WithMaxRetries(10)
2016-01-21 00:40:58 +00:00
if params . UserAgent != "" {
2016-01-22 02:17:53 +00:00
awsConfig . WithHTTPClient ( & http . Client {
Transport : transport . NewTransport ( http . DefaultTransport , transport . NewHeaderRequestModifier ( http . Header { http . CanonicalHeaderKey ( "User-Agent" ) : [ ] string { params . UserAgent } } ) ) ,
} )
2016-01-21 00:40:58 +00:00
}
2015-02-11 02:14:23 +00:00
2016-01-22 02:17:53 +00:00
s3obj := s3 . New ( session . New ( awsConfig ) )
2016-01-21 00:40:58 +00:00
2015-02-11 02:14:23 +00:00
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
d := & driver {
S3 : s3obj ,
2016-01-22 02:17:53 +00:00
Bucket : params . Bucket ,
2015-02-11 02:14:23 +00:00
ChunkSize : params . ChunkSize ,
Encrypt : params . Encrypt ,
RootDirectory : params . RootDirectory ,
2016-01-28 23:48:49 +00:00
StorageClass : params . StorageClass ,
2015-04-22 22:07:18 +00:00
zeros : make ( [ ] byte , params . ChunkSize ) ,
}
d . pool . New = func ( ) interface { } {
return make ( [ ] byte , d . ChunkSize )
2015-02-11 02:14:23 +00:00
}
return & Driver {
baseEmbed : baseEmbed {
Base : base . Base {
StorageDriver : d ,
} ,
} ,
} , nil
}
// Implement the storagedriver.StorageDriver interface
2015-04-23 00:30:01 +00:00
func ( d * driver ) Name ( ) string {
return driverName
}
2015-02-11 02:14:23 +00:00
// GetContent retrieves the content stored at "path" as a []byte.
2015-04-27 22:58:58 +00:00
func ( d * driver ) GetContent ( ctx context . Context , path string ) ( [ ] byte , error ) {
2016-01-22 02:17:53 +00:00
reader , err := d . ReadStream ( ctx , path , 0 )
2015-02-11 02:14:23 +00:00
if err != nil {
2016-01-22 02:17:53 +00:00
return nil , err
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
return ioutil . ReadAll ( reader )
2015-02-11 02:14:23 +00:00
}
// PutContent stores the []byte content at a location designated by "path".
2015-04-27 22:58:58 +00:00
func ( d * driver ) PutContent ( ctx context . Context , path string , contents [ ] byte ) error {
2016-01-22 02:17:53 +00:00
_ , err := d . S3 . PutObject ( & s3 . PutObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
ContentType : d . getContentType ( ) ,
ACL : d . getACL ( ) ,
ServerSideEncryption : d . getEncryptionMode ( ) ,
StorageClass : d . getStorageClass ( ) ,
Body : bytes . NewReader ( contents ) ,
} )
return parseError ( path , err )
2015-02-11 02:14:23 +00:00
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
2015-04-27 22:58:58 +00:00
func ( d * driver ) ReadStream ( ctx context . Context , path string , offset int64 ) ( io . ReadCloser , error ) {
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . GetObject ( & s3 . GetObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
Range : aws . String ( "bytes=" + strconv . FormatInt ( offset , 10 ) + "-" ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
2016-01-22 02:17:53 +00:00
if s3Err , ok := err . ( awserr . Error ) ; ok && s3Err . Code ( ) == "InvalidRange" {
2015-02-11 02:14:23 +00:00
return ioutil . NopCloser ( bytes . NewReader ( nil ) ) , nil
}
return nil , parseError ( path , err )
}
return resp . Body , nil
}
// WriteStream stores the contents of the provided io.Reader at a
// location designated by the given path. The driver will know it has
// received the full contents when the reader returns io.EOF. The number
// of successfully READ bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
2015-04-27 22:58:58 +00:00
func ( d * driver ) WriteStream ( ctx context . Context , path string , offset int64 , reader io . Reader ) ( totalRead int64 , err error ) {
2016-01-22 02:17:53 +00:00
var partNumber int64 = 1
2015-02-11 02:14:23 +00:00
bytesRead := 0
var putErrChan chan error
2016-01-22 02:17:53 +00:00
parts := [ ] * s3 . CompletedPart { }
2015-04-28 21:06:24 +00:00
done := make ( chan struct { } ) // stopgap to free up waiting goroutines
2015-02-11 02:14:23 +00:00
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . CreateMultipartUpload ( & s3 . CreateMultipartUploadInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
ContentType : d . getContentType ( ) ,
ACL : d . getACL ( ) ,
ServerSideEncryption : d . getEncryptionMode ( ) ,
StorageClass : d . getStorageClass ( ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return 0 , err
}
2016-01-22 02:17:53 +00:00
uploadID := resp . UploadId
2015-04-22 22:07:18 +00:00
buf := d . getbuf ( )
2015-02-11 02:14:23 +00:00
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
// the stat call.
//
// Note that if the machine dies before executing the defer, we will be left with a dangling
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func ( ) {
if putErrChan != nil {
if putErr := <- putErrChan ; putErr != nil {
err = putErr
}
}
if len ( parts ) > 0 {
2016-01-22 02:17:53 +00:00
_ , err := d . S3 . CompleteMultipartUpload ( & s3 . CompleteMultipartUploadInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
UploadId : uploadID ,
MultipartUpload : & s3 . CompletedMultipartUpload {
Parts : parts ,
} ,
} )
if err != nil {
// TODO (brianbland): log errors here
d . S3 . AbortMultipartUpload ( & s3 . AbortMultipartUploadInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
UploadId : uploadID ,
} )
2015-02-11 02:14:23 +00:00
}
}
2015-04-22 22:07:18 +00:00
d . putbuf ( buf ) // needs to be here to pick up new buf value
2015-04-28 21:06:24 +00:00
close ( done ) // free up any waiting goroutines
2015-02-11 02:14:23 +00:00
} ( )
// Fills from 0 to total from current
fromSmallCurrent := func ( total int64 ) error {
2015-04-27 22:58:58 +00:00
current , err := d . ReadStream ( ctx , path , 0 )
2015-02-11 02:14:23 +00:00
if err != nil {
return err
}
bytesRead = 0
for int64 ( bytesRead ) < total {
//The loop should very rarely enter a second iteration
nn , err := current . Read ( buf [ bytesRead : total ] )
bytesRead += nn
if err != nil {
if err != io . EOF {
return err
}
break
}
}
return nil
}
// Fills from parameter to chunkSize from reader
fromReader := func ( from int64 ) error {
bytesRead = 0
for from + int64 ( bytesRead ) < d . ChunkSize {
nn , err := reader . Read ( buf [ from + int64 ( bytesRead ) : ] )
totalRead += int64 ( nn )
bytesRead += nn
if err != nil {
if err != io . EOF {
return err
}
break
}
}
if putErrChan == nil {
putErrChan = make ( chan error )
} else {
if putErr := <- putErrChan ; putErr != nil {
putErrChan = nil
return putErr
}
}
go func ( bytesRead int , from int64 , buf [ ] byte ) {
2015-04-22 22:07:18 +00:00
defer d . putbuf ( buf ) // this buffer gets dropped after this call
2015-04-24 03:07:32 +00:00
// DRAGONS(stevvooe): There are few things one might want to know
// about this section. First, the putErrChan is expecting an error
// and a nil or just a nil to come through the channel. This is
// covered by the silly defer below. The other aspect is the s3
// retry backoff to deal with RequestTimeout errors. Even though
// the underlying s3 library should handle it, it doesn't seem to
// be part of the shouldRetry function (see AdRoll/goamz/s3).
defer func ( ) {
2015-04-28 21:06:24 +00:00
select {
case putErrChan <- nil : // for some reason, we do this no matter what.
case <- done :
return // ensure we don't leak the goroutine
}
2015-04-24 03:07:32 +00:00
} ( )
if bytesRead <= 0 {
return
}
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . UploadPart ( & s3 . UploadPartInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
PartNumber : aws . Int64 ( partNumber ) ,
UploadId : uploadID ,
Body : bytes . NewReader ( buf [ 0 : int64 ( bytesRead ) + from ] ) ,
} )
2015-04-24 03:07:32 +00:00
if err != nil {
logrus . Errorf ( "error putting part, aborting: %v" , err )
2015-04-28 21:06:24 +00:00
select {
case putErrChan <- err :
case <- done :
return // don't leak the goroutine
}
2015-04-24 03:07:32 +00:00
}
// parts and partNumber are safe, because this function is the
// only one modifying them and we force it to be executed
// serially.
2016-01-22 02:17:53 +00:00
parts = append ( parts , & s3 . CompletedPart {
ETag : resp . ETag ,
PartNumber : aws . Int64 ( partNumber ) ,
} )
2015-04-24 03:07:32 +00:00
partNumber ++
2015-02-11 02:14:23 +00:00
} ( bytesRead , from , buf )
2015-04-22 22:07:18 +00:00
buf = d . getbuf ( ) // use a new buffer for the next call
2015-02-11 02:14:23 +00:00
return nil
}
if offset > 0 {
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . HeadObject ( & s3 . HeadObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
2016-01-22 02:17:53 +00:00
if s3Err , ok := err . ( awserr . Error ) ; ! ok || s3Err . Code ( ) != "NoSuchKey" {
2015-02-11 02:14:23 +00:00
return 0 , err
}
}
currentLength := int64 ( 0 )
2016-01-22 02:17:53 +00:00
if err == nil && resp . ContentLength != nil {
currentLength = * resp . ContentLength
2015-02-11 02:14:23 +00:00
}
if currentLength >= offset {
if offset < d . ChunkSize {
// chunkSize > currentLength >= offset
if err = fromSmallCurrent ( offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( offset ) ; err != nil {
return totalRead , err
}
if totalRead + offset < d . ChunkSize {
return totalRead , nil
}
} else {
// currentLength >= offset >= chunkSize
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . UploadPartCopy ( & s3 . UploadPartCopyInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
PartNumber : aws . Int64 ( partNumber ) ,
UploadId : uploadID ,
CopySource : aws . String ( d . Bucket + "/" + d . s3Path ( path ) ) ,
CopySourceRange : aws . String ( "bytes=0-" + strconv . FormatInt ( offset - 1 , 10 ) ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return 0 , err
}
2016-01-22 02:17:53 +00:00
parts = append ( parts , & s3 . CompletedPart {
ETag : resp . CopyPartResult . ETag ,
PartNumber : aws . Int64 ( partNumber ) ,
} )
2015-02-11 02:14:23 +00:00
partNumber ++
}
} else {
// Fills between parameters with 0s but only when to - from <= chunkSize
fromZeroFillSmall := func ( from , to int64 ) error {
bytesRead = 0
for from + int64 ( bytesRead ) < to {
2015-04-22 22:07:18 +00:00
nn , err := bytes . NewReader ( d . zeros ) . Read ( buf [ from + int64 ( bytesRead ) : to ] )
2015-02-11 02:14:23 +00:00
bytesRead += nn
if err != nil {
return err
}
}
return nil
}
// Fills between parameters with 0s, making new parts
fromZeroFillLarge := func ( from , to int64 ) error {
bytesRead64 := int64 ( 0 )
for to - ( from + bytesRead64 ) >= d . ChunkSize {
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . UploadPart ( & s3 . UploadPartInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
PartNumber : aws . Int64 ( partNumber ) ,
UploadId : uploadID ,
Body : bytes . NewReader ( d . zeros ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return err
}
bytesRead64 += d . ChunkSize
2016-01-22 02:17:53 +00:00
parts = append ( parts , & s3 . CompletedPart {
ETag : resp . ETag ,
PartNumber : aws . Int64 ( partNumber ) ,
} )
2015-02-11 02:14:23 +00:00
partNumber ++
}
return fromZeroFillSmall ( 0 , ( to - from ) % d . ChunkSize )
}
// currentLength < offset
if currentLength < d . ChunkSize {
if offset < d . ChunkSize {
// chunkSize > offset > currentLength
if err = fromSmallCurrent ( currentLength ) ; err != nil {
return totalRead , err
}
if err = fromZeroFillSmall ( currentLength , offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( offset ) ; err != nil {
return totalRead , err
}
if totalRead + offset < d . ChunkSize {
return totalRead , nil
}
} else {
// offset >= chunkSize > currentLength
if err = fromSmallCurrent ( currentLength ) ; err != nil {
return totalRead , err
}
if err = fromZeroFillSmall ( currentLength , d . ChunkSize ) ; err != nil {
return totalRead , err
}
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . UploadPart ( & s3 . UploadPartInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
PartNumber : aws . Int64 ( partNumber ) ,
UploadId : uploadID ,
Body : bytes . NewReader ( buf ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return totalRead , err
}
2016-01-22 02:17:53 +00:00
parts = append ( parts , & s3 . CompletedPart {
ETag : resp . ETag ,
PartNumber : aws . Int64 ( partNumber ) ,
} )
2015-02-11 02:14:23 +00:00
partNumber ++
//Zero fill from chunkSize up to offset, then some reader
if err = fromZeroFillLarge ( d . ChunkSize , offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( offset % d . ChunkSize ) ; err != nil {
return totalRead , err
}
if totalRead + ( offset % d . ChunkSize ) < d . ChunkSize {
return totalRead , nil
}
}
} else {
// offset > currentLength >= chunkSize
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . UploadPartCopy ( & s3 . UploadPartCopyInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
PartNumber : aws . Int64 ( partNumber ) ,
UploadId : uploadID ,
CopySource : aws . String ( d . Bucket + "/" + d . s3Path ( path ) ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return 0 , err
}
2016-01-22 02:17:53 +00:00
parts = append ( parts , & s3 . CompletedPart {
ETag : resp . CopyPartResult . ETag ,
PartNumber : aws . Int64 ( partNumber ) ,
} )
2015-02-11 02:14:23 +00:00
partNumber ++
//Zero fill from currentLength up to offset, then some reader
if err = fromZeroFillLarge ( currentLength , offset ) ; err != nil {
return totalRead , err
}
if err = fromReader ( ( offset - currentLength ) % d . ChunkSize ) ; err != nil {
return totalRead , err
}
if totalRead + ( ( offset - currentLength ) % d . ChunkSize ) < d . ChunkSize {
return totalRead , nil
}
}
}
}
for {
if err = fromReader ( 0 ) ; err != nil {
return totalRead , err
}
if int64 ( bytesRead ) < d . ChunkSize {
break
}
}
return totalRead , nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Stat ( ctx context . Context , path string ) ( storagedriver . FileInfo , error ) {
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . ListObjects ( & s3 . ListObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Prefix : aws . String ( d . s3Path ( path ) ) ,
MaxKeys : aws . Int64 ( 1 ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return nil , err
}
fi := storagedriver . FileInfoFields {
Path : path ,
}
2016-01-22 02:17:53 +00:00
if len ( resp . Contents ) == 1 {
if * resp . Contents [ 0 ] . Key != d . s3Path ( path ) {
2015-02-11 02:14:23 +00:00
fi . IsDir = true
} else {
fi . IsDir = false
2016-01-22 02:17:53 +00:00
fi . Size = * resp . Contents [ 0 ] . Size
fi . ModTime = * resp . Contents [ 0 ] . LastModified
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
} else if len ( resp . CommonPrefixes ) == 1 {
2015-02-11 02:14:23 +00:00
fi . IsDir = true
} else {
return nil , storagedriver . PathNotFoundError { Path : path }
}
return storagedriver . FileInfoInternal { FileInfoFields : fi } , nil
}
// List returns a list of the objects that are direct descendants of the given path.
2015-12-08 19:02:40 +00:00
func ( d * driver ) List ( ctx context . Context , opath string ) ( [ ] string , error ) {
path := opath
2015-02-11 02:14:23 +00:00
if path != "/" && path [ len ( path ) - 1 ] != '/' {
path = path + "/"
}
2015-02-20 00:31:34 +00:00
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d . s3Path ( "" ) == "" {
prefix = "/"
}
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . ListObjects ( & s3 . ListObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Prefix : aws . String ( d . s3Path ( path ) ) ,
Delimiter : aws . String ( "/" ) ,
MaxKeys : aws . Int64 ( listMax ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
2015-12-08 19:02:40 +00:00
return nil , parseError ( opath , err )
2015-11-24 22:23:12 +00:00
}
2015-02-11 02:14:23 +00:00
files := [ ] string { }
directories := [ ] string { }
for {
2016-01-22 02:17:53 +00:00
for _ , key := range resp . Contents {
files = append ( files , strings . Replace ( * key . Key , d . s3Path ( "" ) , prefix , 1 ) )
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
for _ , commonPrefix := range resp . CommonPrefixes {
commonPrefix := * commonPrefix . Prefix
2015-02-20 00:31:34 +00:00
directories = append ( directories , strings . Replace ( commonPrefix [ 0 : len ( commonPrefix ) - 1 ] , d . s3Path ( "" ) , prefix , 1 ) )
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
if * resp . IsTruncated {
resp , err = d . S3 . ListObjects ( & s3 . ListObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Prefix : aws . String ( d . s3Path ( path ) ) ,
Delimiter : aws . String ( "/" ) ,
MaxKeys : aws . Int64 ( listMax ) ,
Marker : resp . NextMarker ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return nil , err
}
} else {
break
}
}
2015-12-08 19:02:40 +00:00
if opath != "/" {
if len ( files ) == 0 && len ( directories ) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in s3.
return nil , storagedriver . PathNotFoundError { Path : opath }
}
}
2015-02-11 02:14:23 +00:00
return append ( files , directories ... ) , nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Move ( ctx context . Context , sourcePath string , destPath string ) error {
2015-02-11 02:14:23 +00:00
/* This is terrible, but aws doesn't have an actual move. */
2016-01-22 02:17:53 +00:00
_ , err := d . S3 . CopyObject ( & s3 . CopyObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( destPath ) ) ,
ContentType : d . getContentType ( ) ,
ACL : d . getACL ( ) ,
ServerSideEncryption : d . getEncryptionMode ( ) ,
StorageClass : d . getStorageClass ( ) ,
CopySource : aws . String ( d . Bucket + "/" + d . s3Path ( sourcePath ) ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return parseError ( sourcePath , err )
}
2015-04-27 22:58:58 +00:00
return d . Delete ( ctx , sourcePath )
2015-02-11 02:14:23 +00:00
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
2015-04-27 22:58:58 +00:00
func ( d * driver ) Delete ( ctx context . Context , path string ) error {
2016-01-22 02:17:53 +00:00
resp , err := d . S3 . ListObjects ( & s3 . ListObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Prefix : aws . String ( d . s3Path ( path ) ) ,
} )
if err != nil || len ( resp . Contents ) == 0 {
2015-02-11 02:14:23 +00:00
return storagedriver . PathNotFoundError { Path : path }
}
2016-01-22 02:17:53 +00:00
s3Objects := make ( [ ] * s3 . ObjectIdentifier , 0 , listMax )
2015-02-11 02:14:23 +00:00
2016-01-22 02:17:53 +00:00
for len ( resp . Contents ) > 0 {
for _ , key := range resp . Contents {
s3Objects = append ( s3Objects , & s3 . ObjectIdentifier {
Key : key . Key ,
} )
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
_ , err := d . S3 . DeleteObjects ( & s3 . DeleteObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Delete : & s3 . Delete {
Objects : s3Objects ,
Quiet : aws . Bool ( false ) ,
} ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return nil
}
2016-01-22 02:17:53 +00:00
resp , err = d . S3 . ListObjects ( & s3 . ListObjectsInput {
Bucket : aws . String ( d . Bucket ) ,
Prefix : aws . String ( d . s3Path ( path ) ) ,
} )
2015-02-11 02:14:23 +00:00
if err != nil {
return err
}
}
return nil
}
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
2015-04-27 22:58:58 +00:00
func ( d * driver ) URLFor ( ctx context . Context , path string , options map [ string ] interface { } ) ( string , error ) {
2015-02-11 02:14:23 +00:00
methodString := "GET"
method , ok := options [ "method" ]
if ok {
methodString , ok = method . ( string )
if ! ok || ( methodString != "GET" && methodString != "HEAD" ) {
2015-11-02 21:23:53 +00:00
return "" , storagedriver . ErrUnsupportedMethod { }
2015-02-11 02:14:23 +00:00
}
}
2016-01-22 02:17:53 +00:00
expiresIn := 20 * time . Minute
2015-02-11 02:14:23 +00:00
expires , ok := options [ "expiry" ]
if ok {
et , ok := expires . ( time . Time )
if ok {
2016-01-22 02:17:53 +00:00
expiresIn = et . Sub ( time . Now ( ) )
2015-02-11 02:14:23 +00:00
}
}
2016-01-22 02:17:53 +00:00
var req * request . Request
switch methodString {
case "GET" :
req , _ = d . S3 . GetObjectRequest ( & s3 . GetObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
} )
case "HEAD" :
req , _ = d . S3 . HeadObjectRequest ( & s3 . HeadObjectInput {
Bucket : aws . String ( d . Bucket ) ,
Key : aws . String ( d . s3Path ( path ) ) ,
} )
default :
panic ( "unreachable" )
}
return req . Presign ( expiresIn )
2015-02-11 02:14:23 +00:00
}
func ( d * driver ) s3Path ( path string ) string {
return strings . TrimLeft ( strings . TrimRight ( d . RootDirectory , "/" ) + path , "/" )
}
2015-04-06 23:23:31 +00:00
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func ( d * Driver ) S3BucketKey ( path string ) string {
return d . StorageDriver . ( * driver ) . s3Path ( path )
}
2015-02-11 02:14:23 +00:00
func parseError ( path string , err error ) error {
2016-01-22 02:17:53 +00:00
if s3Err , ok := err . ( awserr . Error ) ; ok && s3Err . Code ( ) == "NoSuchKey" {
2015-02-11 02:14:23 +00:00
return storagedriver . PathNotFoundError { Path : path }
}
return err
}
2016-01-22 02:17:53 +00:00
func ( d * driver ) getEncryptionMode ( ) * string {
if d . Encrypt {
return aws . String ( "AES256" )
}
return nil
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
func ( d * driver ) getContentType ( ) * string {
return aws . String ( "application/octet-stream" )
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
func ( d * driver ) getACL ( ) * string {
return aws . String ( "private" )
2015-02-11 02:14:23 +00:00
}
2016-01-22 02:17:53 +00:00
func ( d * driver ) getStorageClass ( ) * string {
return aws . String ( d . StorageClass )
2015-02-11 02:14:23 +00:00
}
2015-04-22 22:07:18 +00:00
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
func ( d * driver ) getbuf ( ) [ ] byte {
return d . pool . Get ( ) . ( [ ] byte )
}
func ( d * driver ) putbuf ( p [ ] byte ) {
copy ( p , d . zeros )
d . pool . Put ( p )
}