@ -1,766 +0,0 @@
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
// This package leverages the docker/goamz client library for interfacing with
// S3. It is intended to be deprecated in favor of the s3-aws driver
// implementation.
// Because S3 is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
// Keep in mind that S3 guarantees only read-after-write consistency for new
// objects, but no read-after-update or list-after-write consistency.
package s3
import (
storagedriver ""
const driverName = "s3goamz"
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Region aws.Region
Encrypt bool
Secure bool
V4Auth bool
ChunkSize int64
RootDirectory string
StorageClass s3.StorageClass
UserAgent string
func init() {
factory.Register(driverName, &s3DriverFactory{})
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct{}
func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
type driver struct {
S3 *s3.S3
Bucket *s3.Bucket
ChunkSize int64
Encrypt bool
RootDirectory string
StorageClass s3.StorageClass
type baseEmbed struct {
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
accessKey := parameters["accesskey"]
if accessKey == nil {
accessKey = ""
secretKey := parameters["secretkey"]
if secretKey == nil {
secretKey = ""
regionName := parameters["region"]
if regionName == nil || fmt.Sprint(regionName) == "" {
return nil, fmt.Errorf("No region parameter provided")
region := aws.GetRegion(fmt.Sprint(regionName))
if region.Name == "" {
return nil, fmt.Errorf("Invalid region provided: %v", region)
bucket := parameters["bucket"]
if bucket == nil || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("No bucket parameter provided")
encryptBool := false
encrypt := parameters["encrypt"]
switch encrypt := encrypt.(type) {
case string:
b, err := strconv.ParseBool(encrypt)
if err != nil {
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
encryptBool = b
case bool:
encryptBool = encrypt
case nil:
// do nothing
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
secureBool := true
secure := parameters["secure"]
switch secure := secure.(type) {
case string:
b, err := strconv.ParseBool(secure)
if err != nil {
return nil, fmt.Errorf("The secure parameter should be a boolean")
secureBool = b
case bool:
secureBool = secure
case nil:
// do nothing
return nil, fmt.Errorf("The secure parameter should be a boolean")
v4AuthBool := false
v4Auth := parameters["v4auth"]
switch v4Auth := v4Auth.(type) {
case string:
b, err := strconv.ParseBool(v4Auth)
if err != nil {
return nil, fmt.Errorf("The v4auth parameter should be a boolean")
v4AuthBool = b
case bool:
v4AuthBool = v4Auth
case nil:
// do nothing
return nil, fmt.Errorf("The v4auth parameter should be a boolean")
chunkSize := int64(defaultChunkSize)
chunkSizeParam := parameters["chunksize"]
switch v := chunkSizeParam.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
chunkSize = vv
case int64:
chunkSize = v
case int, uint, int32, uint32, uint64:
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
case nil:
// do nothing
return nil, fmt.Errorf("invalid value for chunksize: %#v", chunkSizeParam)
if chunkSize < minChunkSize {
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
rootDirectory := parameters["rootdirectory"]
if rootDirectory == nil {
rootDirectory = ""
storageClass := s3.StandardStorage
storageClassParam := parameters["storageclass"]
if storageClassParam != nil {
storageClassString, ok := storageClassParam.(string)
if !ok {
return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []s3.StorageClass{s3.StandardStorage, s3.ReducedRedundancy}, storageClassParam)
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
storageClassCasted := s3.StorageClass(strings.ToUpper(storageClassString))
if storageClassCasted != s3.StandardStorage && storageClassCasted != s3.ReducedRedundancy {
return nil, fmt.Errorf("The storageclass parameter must be one of %v, %v invalid", []s3.StorageClass{s3.StandardStorage, s3.ReducedRedundancy}, storageClassParam)
storageClass = storageClassCasted
userAgent := parameters["useragent"]
if userAgent == nil {
userAgent = ""
params := DriverParameters{
return New(params)
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New(params DriverParameters) (*Driver, error) {
auth, err := aws.GetAuth(params.AccessKey, params.SecretKey, "", time.Time{})
if err != nil {
return nil, fmt.Errorf("unable to resolve aws credentials, please ensure that 'accesskey' and 'secretkey' are properly set or the credentials are available in $HOME/.aws/credentials: %v", err)
if !params.Secure {
params.Region.S3Endpoint = strings.Replace(params.Region.S3Endpoint, "https", "http", 1)
s3obj := s3.New(auth, params.Region)
if params.UserAgent != "" {
s3obj.Client = &http.Client{
Transport: transport.NewTransport(http.DefaultTransport,
http.CanonicalHeaderKey("User-Agent"): []string{params.UserAgent},
if params.V4Auth {
s3obj.Signature = aws.V4Signature
} else if mustV4Auth(params.Region.Name) {
return nil, fmt.Errorf("The %s region only works with v4 authentication", params.Region.Name)
bucket := s3obj.Bucket(params.Bucket)
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
d := &driver{
S3: s3obj,
Bucket: bucket,
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: d,
}, nil
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
content, err := d.Bucket.Get(d.s3Path(path))
if err != nil {
return nil, parseError(path, err)
return content, nil
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
resp, err := d.Bucket.GetResponseWithHeaders(d.s3Path(path), headers)
if err != nil {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "InvalidRange" {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
return nil, parseError(path, err)
return resp.Body, nil
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
key := d.s3Path(path)
if !append {
// TODO (brianbland): cancel other uploads at this path
multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions())
if err != nil {
return nil, err
return d.newWriter(key, multi, nil), nil
multis, _, err := d.Bucket.ListMulti(key, "")
if err != nil {
return nil, parseError(path, err)
for _, multi := range multis {
if key != multi.Key {
parts, err := multi.ListParts()
if err != nil {
return nil, parseError(path, err)
var multiSize int64
for _, part := range parts {
multiSize += part.Size
return d.newWriter(key, multi, parts), nil
return nil, storagedriver.PathNotFoundError{Path: path}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", 1)
if err != nil {
return nil, err
fi := storagedriver.FileInfoFields{
Path: path,
if len(listResponse.Contents) == 1 {
if listResponse.Contents[0].Key != d.s3Path(path) {
fi.IsDir = true
} else {
fi.IsDir = false
fi.Size = listResponse.Contents[0].Size
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
if err != nil {
return nil, err
fi.ModTime = timestamp
} else if len(listResponse.CommonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
path := opath
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
listResponse, err := d.Bucket.List(d.s3Path(path), "/", "", listMax)
if err != nil {
return nil, parseError(opath, err)
files := []string{}
directories := []string{}
for {
for _, key := range listResponse.Contents {
files = append(files, strings.Replace(key.Key, d.s3Path(""), prefix, 1))
for _, commonPrefix := range listResponse.CommonPrefixes {
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1))
if !listResponse.IsTruncated {
listResponse, err = d.Bucket.List(d.s3Path(path), "/", listResponse.NextMarker, listMax)
if err != nil {
return nil, err
if opath != "/" {
if len(files) == 0 && len(directories) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in s3.
return nil, storagedriver.PathNotFoundError{Path: opath}
return append(files, directories...), nil
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(d.s3Path(destPath), getPermissions(),
s3.CopyOptions{Options: d.getOptions(), ContentType: d.getContentType()}, d.Bucket.Name+"/"+d.s3Path(sourcePath))
if err != nil {
return parseError(sourcePath, err)
return d.Delete(ctx, sourcePath)
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
s3Path := d.s3Path(path)
listResponse, err := d.Bucket.List(s3Path, "", "", listMax)
if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
s3Objects := make([]s3.Object, listMax)
for len(listResponse.Contents) > 0 {
numS3Objects := len(listResponse.Contents)
for index, key := range listResponse.Contents {
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
if len(key.Key) > len(s3Path) && (key.Key)[len(s3Path)] != '/' {
numS3Objects = index
s3Objects[index].Key = key.Key
err := d.Bucket.DelMulti(s3.Delete{Quiet: false, Objects: s3Objects[0:numS3Objects]})
if err != nil {
return nil
if numS3Objects < len(listResponse.Contents) {
return nil
listResponse, err = d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil {
return err
return nil
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
methodString := "GET"
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != "GET" && methodString != "HEAD") {
return "", storagedriver.ErrUnsupportedMethod{}
expiresTime := time.Now().Add(20 * time.Minute)
expires, ok := options["expiry"]
if ok {
et, ok := expires.(time.Time)
if ok {
expiresTime = et
return d.Bucket.SignedURLWithMethod(methodString, d.s3Path(path), expiresTime, nil, nil), nil
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).s3Path(path)
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
func parseError(path string, err error) error {
if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}
return err
func (d *driver) getOptions() s3.Options {
return s3.Options{
SSE: d.Encrypt,
StorageClass: d.StorageClass,
func getPermissions() s3.ACL {
return s3.Private
// mustV4Auth checks whether must use v4 auth in specific region.
// Please see documentation at
func mustV4Auth(region string) bool {
switch region {
case "eu-central-1", "cn-north-1", "us-east-2",
"ca-central-1", "ap-south-1", "ap-northeast-2", "eu-west-2":
return true
return false
func (d *driver) getContentType() string {
return "application/octet-stream"
// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
type writer struct {
driver *driver
key string
multi *s3.Multi
parts []s3.Part
size int64
readyPart []byte
pendingPart []byte
closed bool
committed bool
cancelled bool
func (d *driver) newWriter(key string, multi *s3.Multi, parts []s3.Part) storagedriver.FileWriter {
var size int64
for _, part := range parts {
size += part.Size
return &writer{
driver: d,
key: key,
multi: multi,
parts: parts,
size: size,
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len( > 0 && int([len(].Size) < minChunkSize {
err := w.multi.Complete(
if err != nil {
return 0, err
multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions())
if err != nil {
return 0, err
w.multi = multi
// If the entire written file is smaller than minChunkSize, we need to make
// a new part from scratch :double sad face:
if w.size < minChunkSize {
contents, err := w.driver.Bucket.Get(w.key)
if err != nil {
return 0, err
|||| = nil
w.readyPart = contents
} else {
// Otherwise we can use the old file as the new first part
_, part, err := multi.PutPartCopy(1, s3.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key)
if err != nil {
return 0, err
|||| = []s3.Part{part}
var n int
for len(p) > 0 {
// If no parts are ready to write, fill up the first part
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
} else {
w.readyPart = append(w.readyPart, p...)
n += len(p)
p = nil
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
err := w.flushPart()
if err != nil {
w.size += int64(n)
return n, err
} else {
w.pendingPart = append(w.pendingPart, p...)
n += len(p)
p = nil
w.size += int64(n)
return n, nil
func (w *writer) Size() int64 {
return w.size
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
w.closed = true
return w.flushPart()
func (w *writer) Cancel() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
w.cancelled = true
err := w.multi.Abort()
return err
func (w *writer) Commit() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
err := w.flushPart()
if err != nil {
return err
w.committed = true
err = w.multi.Complete(
if err != nil {
return err
return nil
// flushPart flushes buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flushPart() error {
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
// nothing to write
return nil
if len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w.readyPart = append(w.readyPart, w.pendingPart...)
w.pendingPart = nil
part, err := w.multi.PutPart(len(, bytes.NewReader(w.readyPart))
if err != nil {
return err
|||| = append(, part)
w.readyPart = w.pendingPart
w.pendingPart = nil
return nil
@ -1,201 +0,0 @@
package s3
import (
storagedriver ""
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) }
var s3DriverConstructor func(rootDirectory string, storageClass s3.StorageClass) (*Driver, error)
var skipS3 func() string
func init() {
accessKey := os.Getenv("AWS_ACCESS_KEY")
secretKey := os.Getenv("AWS_SECRET_KEY")
bucket := os.Getenv("S3_BUCKET")
encrypt := os.Getenv("S3_ENCRYPT")
secure := os.Getenv("S3_SECURE")
v4auth := os.Getenv("S3_USE_V4_AUTH")
region := os.Getenv("AWS_REGION")
root, err := ioutil.TempDir("", "driver-")
if err != nil {
defer os.Remove(root)
s3DriverConstructor = func(rootDirectory string, storageClass s3.StorageClass) (*Driver, error) {
encryptBool := false
if encrypt != "" {
encryptBool, err = strconv.ParseBool(encrypt)
if err != nil {
return nil, err
secureBool := true
if secure != "" {
secureBool, err = strconv.ParseBool(secure)
if err != nil {
return nil, err
v4AuthBool := false
if v4auth != "" {
v4AuthBool, err = strconv.ParseBool(v4auth)
if err != nil {
return nil, err
parameters := DriverParameters{
driverName + "-test",
return New(parameters)
// Skip S3 storage driver tests if environment variable parameters are not provided
skipS3 = func() string {
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
return ""
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return s3DriverConstructor(root, s3.StandardStorage)
}, skipS3)
func TestEmptyRootList(t *testing.T) {
if skipS3() != "" {
validRoot, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
defer os.Remove(validRoot)
rootedDriver, err := s3DriverConstructor(validRoot, s3.StandardStorage)
if err != nil {
t.Fatalf("unexpected error creating rooted driver: %v", err)
emptyRootDriver, err := s3DriverConstructor("", s3.StandardStorage)
if err != nil {
t.Fatalf("unexpected error creating empty root driver: %v", err)
slashRootDriver, err := s3DriverConstructor("/", s3.StandardStorage)
if err != nil {
t.Fatalf("unexpected error creating slash root driver: %v", err)
filename := "/test"
contents := []byte("contents")
ctx := context.Background()
err = rootedDriver.PutContent(ctx, filename, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
defer rootedDriver.Delete(ctx, filename)
keys, _ := emptyRootDriver.List(ctx, "/")
for _, path := range keys {
if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
keys, _ = slashRootDriver.List(ctx, "/")
for _, path := range keys {
if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
func TestStorageClass(t *testing.T) {
if skipS3() != "" {
rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
defer os.Remove(rootDir)
standardDriver, err := s3DriverConstructor(rootDir, s3.StandardStorage)
if err != nil {
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
rrDriver, err := s3DriverConstructor(rootDir, s3.ReducedRedundancy)
if err != nil {
t.Fatalf("unexpected error creating driver with reduced redundancy storage: %v", err)
standardFilename := "/test-standard"
rrFilename := "/test-rr"
contents := []byte("contents")
ctx := context.Background()
err = standardDriver.PutContent(ctx, standardFilename, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
defer standardDriver.Delete(ctx, standardFilename)
err = rrDriver.PutContent(ctx, rrFilename, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
defer rrDriver.Delete(ctx, rrFilename)
standardDriverUnwrapped := standardDriver.Base.StorageDriver.(*driver)
resp, err := standardDriverUnwrapped.Bucket.GetResponse(standardDriverUnwrapped.s3Path(standardFilename))
if err != nil {
t.Fatalf("unexpected error retrieving standard storage file: %v", err)
defer resp.Body.Close()
// Amazon only populates this header value for non-standard storage classes
if storageClass := resp.Header.Get("x-amz-storage-class"); storageClass != "" {
t.Fatalf("unexpected storage class for standard file: %v", storageClass)
rrDriverUnwrapped := rrDriver.Base.StorageDriver.(*driver)
resp, err = rrDriverUnwrapped.Bucket.GetResponse(rrDriverUnwrapped.s3Path(rrFilename))
if err != nil {
t.Fatalf("unexpected error retrieving reduced-redundancy storage file: %v", err)
defer resp.Body.Close()
if storageClass := resp.Header.Get("x-amz-storage-class"); storageClass != string(s3.ReducedRedundancy) {
t.Fatalf("unexpected storage class for reduced-redundancy file: %v", storageClass)
@ -10,7 +10,6 @@ e2c28503fcd0675329da73bf48b33404db873782
||| afedced274aa9a7fcdd47ac97018f0f8db4e5de2
|||| a601269ab70c205d26370c16f7c81e9017c14e04
|||| 399ea8c73916000c64c2c76e8da00ca82f8387ab
|||| f0a21f5b2e12f83a505ecf79b633bb2035cf6f85
|||| fa567046d9b14f6aa788882a950d69651d230b21
|||| 535138d7bcd717d6531c701ef5933d98b1866257
|||| 2ba15ac2dc9cdf88c110ec2dc0ced7fa45f5678c
// false if it is time to stop trying.
func (a *Attempt) Next() bool {
now := time.Now()
sleep := a.nextSleep(now)
if !a.force && !now.Add(sleep).Before(a.end) && a.strategy.Min <= a.count {
return false
a.force = false
if sleep > 0 && a.count > 0 {
now = time.Now()
a.last = now
return true
func (a *Attempt) nextSleep(now time.Time) time.Duration {
sleep := a.strategy.Delay - now.Sub(a.last)
if sleep < 0 {
return 0
return sleep
// HasNext returns whether another attempt will be made if the current
// one fails. If it returns true, the following call to Next is
// guaranteed to return true.
func (a *Attempt) HasNext() bool {
if a.force || a.strategy.Min > a.count {
return true
now := time.Now()
if now.Add(a.nextSleep(now)).Before(a.end) {
a.force = true
return true
return false
@ -1,636 +0,0 @@
// goamz - Go packages to interact with the Amazon Web Services.
// Copyright (c) 2011 Canonical Ltd.
// Written by Gustavo Niemeyer <>
package aws
import (
// Regular expressions for INI files
var (
iniSectionRegexp = regexp.MustCompile(`^\s*\[([^\[\]]+)\]\s*$`)
iniSettingRegexp = regexp.MustCompile(`^\s*(.+?)\s*=\s*(.*\S)\s*$`)
// Defines the valid signers
const (
V2Signature = iota
V4Signature = iota
Route53Signature = iota
// Defines the service endpoint and correct Signer implementation to use
// to sign requests for this endpoint
type ServiceInfo struct {
Endpoint string
Signer uint
// Region defines the URLs where AWS services may be accessed.
// See for more details.
type Region struct {
Name string // the canonical name of this region.
EC2Endpoint ServiceInfo
S3Endpoint string
S3BucketEndpoint string // Not needed by AWS S3. Use ${bucket} for bucket name.
S3LocationConstraint bool // true if this region requires a LocationConstraint declaration.
S3LowercaseBucket bool // true if the region requires bucket names to be lower case.
SDBEndpoint string
SNSEndpoint string
SQSEndpoint string
SESEndpoint string
IAMEndpoint string
ELBEndpoint string
KMSEndpoint string
DynamoDBEndpoint string
CloudWatchServicepoint ServiceInfo
AutoScalingEndpoint string
RDSEndpoint ServiceInfo
KinesisEndpoint string
STSEndpoint string
CloudFormationEndpoint string
ElastiCacheEndpoint string
var Regions = map[string]Region{
APNortheast.Name: APNortheast,
APNortheast2.Name: APNortheast2,
APSoutheast.Name: APSoutheast,
APSoutheast2.Name: APSoutheast2,
EUCentral.Name: EUCentral,
EUWest.Name: EUWest,
USEast.Name: USEast,
USWest.Name: USWest,
USWest2.Name: USWest2,
USGovWest.Name: USGovWest,
SAEast.Name: SAEast,
CNNorth1.Name: CNNorth1,
// Designates a signer interface suitable for signing AWS requests, params
// should be appropriately encoded for the request before signing.
// A signer should be initialized with Auth and the appropriate endpoint.
type Signer interface {
Sign(method, path string, params map[string]string)
// An AWS Service interface with the API to query the AWS service
// Supplied as an easy way to mock out service calls during testing.
type AWSService interface {
// Queries the AWS service at a given method/path with the params and
// returns an http.Response and error
Query(method, path string, params map[string]string) (*http.Response, error)
// Builds an error given an XML payload in the http.Response, can be used
// to process an error if the status code is not 200 for example.
BuildError(r *http.Response) error
// Implements a Server Query/Post API to easily query AWS services and build
// errors when desired
type Service struct {
service ServiceInfo
signer Signer
// Create a base set of params for an action
func MakeParams(action string) map[string]string {
params := make(map[string]string)
params["Action"] = action
return params
// Create a new AWS server to handle making requests
func NewService(auth Auth, service ServiceInfo) (s *Service, err error) {
var signer Signer
switch service.Signer {
case V2Signature:
signer, err = NewV2Signer(auth, service)
// case V4Signature:
// signer, err = NewV4Signer(auth, service, Regions["eu-west-1"])
err = fmt.Errorf("Unsupported signer for service")
if err != nil {
s = &Service{service: service, signer: signer}
func (s *Service) Query(method, path string, params map[string]string) (resp *http.Response, err error) {
params["Timestamp"] = time.Now().UTC().Format(time.RFC3339)
u, err := url.Parse(s.service.Endpoint)
if err != nil {
return nil, err
u.Path = path
s.signer.Sign(method, path, params)
if method == "GET" {
u.RawQuery = multimap(params).Encode()
resp, err = http.Get(u.String())
} else if method == "POST" {
resp, err = http.PostForm(u.String(), multimap(params))
func (s *Service) BuildError(r *http.Response) error {
errors := ErrorResponse{}
var err Error
err = errors.Errors
err.RequestId = errors.RequestId
err.StatusCode = r.StatusCode
if err.Message == "" {
err.Message = r.Status
return &err
type ServiceError interface {
ErrorCode() string
type ErrorResponse struct {
Errors Error `xml:"Error"`
RequestId string // A unique ID for tracking the request
type Error struct {
StatusCode int
Type string
Code string
Message string
RequestId string
func (err *Error) Error() string {
return fmt.Sprintf("Type: %s, Code: %s, Message: %s",
err.Type, err.Code, err.Message,
func (err *Error) ErrorCode() string {
return err.Code
type Auth struct {
AccessKey, SecretKey string
token string
expiration time.Time
func (a *Auth) Token() string {
if a.token == "" {
return ""
if time.Since(a.expiration) >= -30*time.Second { //in an ideal world this should be zero assuming the instance is synching it's clock
auth, err := GetAuth("", "", "", time.Time{})
if err == nil {
*a = auth
return a.token
func (a *Auth) Expiration() time.Time {
return a.expiration
// To be used with other APIs that return auth credentials such as STS
func NewAuth(accessKey, secretKey, token string, expiration time.Time) *Auth {
return &Auth{
AccessKey: accessKey,
SecretKey: secretKey,
token: token,
expiration: expiration,
// ResponseMetadata
type ResponseMetadata struct {
RequestId string // A unique ID for tracking the request
type BaseResponse struct {
ResponseMetadata ResponseMetadata
var unreserved = make([]bool, 128)
var hex = "0123456789ABCDEF"
func init() {
// RFC3986
u := "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz01234567890-_.~"
for _, c := range u {
unreserved[c] = true
func multimap(p map[string]string) url.Values {
q := make(url.Values, len(p))
for k, v := range p {
q[k] = []string{v}
return q
type credentials struct {
Code string
LastUpdated string
Type string
AccessKeyId string
SecretAccessKey string
Token string
Expiration string
// GetMetaData retrieves instance metadata about the current machine.
// See for more details.
func GetMetaData(path string) (contents []byte, err error) {
c := http.Client{
Transport: &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
deadline := time.Now().Add(5 * time.Second)
c, err := net.DialTimeout(netw, addr, time.Second*2)
if err != nil {
return nil, err
return c, nil
url := "" + path
resp, err := c.Get(url)
if err != nil {
defer resp.Body.Close()
if resp.StatusCode != 200 {
err = fmt.Errorf("Code %d returned for url %s", resp.StatusCode, url)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte(body), err
func GetRegion(regionName string) (region Region) {
region = Regions[regionName]
// GetInstanceCredentials creates an Auth based on the instance's role credentials.
// If the running instance is not in EC2 or does not have a valid IAM role, an error will be returned.
// For more info about setting up IAM roles, see
func GetInstanceCredentials() (cred credentials, err error) {
credentialPath := "iam/security-credentials/"
// Get the instance role
role, err := GetMetaData(credentialPath)
if err != nil {
// Get the instance role credentials
credentialJSON, err := GetMetaData(credentialPath + string(role))
if err != nil {
err = json.Unmarshal([]byte(credentialJSON), &cred)
// GetAuth creates an Auth based on either passed in credentials,
// environment information or instance based role credentials.
func GetAuth(accessKey string, secretKey, token string, expiration time.Time) (auth Auth, err error) {
// First try passed in credentials
if accessKey != "" && secretKey != "" {
return Auth{accessKey, secretKey, token, expiration}, nil
// Next try to get auth from the environment
auth, err = EnvAuth()
if err == nil {
// Found auth, return
// Next try getting auth from the instance role
cred, err := GetInstanceCredentials()
if err == nil {
// Found auth, return
auth.AccessKey = cred.AccessKeyId
auth.SecretKey = cred.SecretAccessKey
auth.token = cred.Token
exptdate, err := time.Parse("2006-01-02T15:04:05Z", cred.Expiration)
if err != nil {
err = fmt.Errorf("Error Parsing expiration date: cred.Expiration :%s , error: %s \n", cred.Expiration, err)
auth.expiration = exptdate
return auth, err
// Next try getting auth from the credentials file
auth, err = CredentialFileAuth("", "", time.Minute*5)
if err == nil {
//err = errors.New("No valid AWS authentication found")
err = fmt.Errorf("No valid AWS authentication found: %s", err)
return auth, err
// EnvAuth creates an Auth based on environment information.
// variables are used.
func EnvAuth() (auth Auth, err error) {
auth.AccessKey = os.Getenv("AWS_ACCESS_KEY_ID")
if auth.AccessKey == "" {
auth.AccessKey = os.Getenv("AWS_ACCESS_KEY")
auth.SecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
if auth.SecretKey == "" {
auth.SecretKey = os.Getenv("AWS_SECRET_KEY")
if auth.AccessKey == "" {
err = errors.New("AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY not found in environment")
if auth.SecretKey == "" {
err = errors.New("AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY not found in environment")
// CredentialFileAuth creates and Auth based on a credentials file. The file
// contains various authentication profiles for use with AWS.
// The credentials file, which is used by other AWS SDKs, is documented at
func CredentialFileAuth(filePath string, profile string, expiration time.Duration) (auth Auth, err error) {
if profile == "" {
profile = os.Getenv("AWS_DEFAULT_PROFILE")
if profile == "" {
profile = os.Getenv("AWS_PROFILE")
if profile == "" {
profile = "default"
if filePath == "" {
u, err := user.Current()
if err != nil {
return auth, err
filePath = path.Join(u.HomeDir, ".aws", "credentials")
// read the file, then parse the INI
contents, err := ioutil.ReadFile(filePath)
if err != nil {
profiles := parseINI(string(contents))
profileData, ok := profiles[profile]
if !ok {
err = errors.New("The credentials file did not contain the profile")
keyId, ok := profileData["aws_access_key_id"]
if !ok {
err = errors.New("The credentials file did not contain required attribute aws_access_key_id")
secretKey, ok := profileData["aws_secret_access_key"]
if !ok {
err = errors.New("The credentials file did not contain required attribute aws_secret_access_key")
auth.AccessKey = keyId
auth.SecretKey = secretKey
if token, ok := profileData["aws_session_token"]; ok {
auth.token = token
auth.expiration = time.Now().Add(expiration)
// parseINI takes the contents of a credentials file and returns a map, whose keys
// are the various profiles, and whose values are maps of the settings for the
// profiles
func parseINI(fileContents string) map[string]map[string]string {
profiles := make(map[string]map[string]string)
lines := strings.Split(fileContents, "\n")
var currentSection map[string]string
for _, line := range lines {
// remove comments, which start with a semi-colon
if split := strings.Split(line, ";"); len(split) > 1 {
line = split[0]
// check if the line is the start of a profile.
// for example:
// [default]
// otherwise, check for the proper setting
// property=value
if sectMatch := iniSectionRegexp.FindStringSubmatch(line); len(sectMatch) == 2 {
currentSection = make(map[string]string)
profiles[sectMatch[1]] = currentSection
} else if setMatch := iniSettingRegexp.FindStringSubmatch(line); len(setMatch) == 3 && currentSection != nil {
currentSection[setMatch[1]] = setMatch[2]
return profiles
// Encode takes a string and URI-encodes it in a way suitable
// to be used in AWS signatures.
func Encode(s string) string {
encode := false
for i := 0; i != len(s); i++ {
c := s[i]
if c > 127 || !unreserved[c] {
encode = true
if !encode {
return s
e := make([]byte, len(s)*3)
ei := 0
for i := 0; i != len(s); i++ {
c := s[i]
if c > 127 || !unreserved[c] {
e[ei] = '%'
e[ei+1] = hex[c>>4]
e[ei+2] = hex[c&0xF]
ei += 3
} else {
e[ei] = c
ei += 1
return string(e[:ei])
func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, time.Duration(2*time.Second))
func AvailabilityZone() string {
transport := http.Transport{Dial: dialTimeout}
client := http.Client{
Transport: &transport,
resp, err := client.Get("")
if err != nil {
return "unknown"
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "unknown"
} else {
return string(body)
func InstanceRegion() string {
az := AvailabilityZone()
if az == "unknown" {
return az
} else {
region := az[:len(az)-1]
return region
func InstanceId() string {
transport := http.Transport{Dial: dialTimeout}
client := http.Client{
Transport: &transport,
resp, err := client.Get("")
if err != nil {
return "unknown"
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "unknown"
} else {
return string(body)
func InstanceType() string {
transport := http.Transport{Dial: dialTimeout}
client := http.Client{
Transport: &transport,
resp, err := client.Get("")
if err != nil {
return "unknown"
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "unknown"
} else {
return string(body)
func ServerLocalIp() string {
transport := http.Transport{Dial: dialTimeout}
client := http.Client{
Transport: &transport,
resp, err := client.Get("")
if err != nil {
return ""
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return ""
} else {
return string(body)
func ServerPublicIp() string {
transport := http.Transport{Dial: dialTimeout}
client := http.Client{
Transport: &transport,
resp, err := client.Get("")
if err != nil {
return ""
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return ""
} else {
return string(body)
@ -1,124 +0,0 @@
package aws
import (
type RetryableFunc func(*http.Request, *http.Response, error) bool
type WaitFunc func(try int)
type DeadlineFunc func() time.Time
type ResilientTransport struct {
// Timeout is the maximum amount of time a dial will wait for
// a connect to complete.
// The default is no timeout.
// With or without a timeout, the operating system may impose
// its own earlier timeout. For instance, TCP timeouts are
// often around 3 minutes.
DialTimeout time.Duration
// MaxTries, if non-zero, specifies the number of times we will retry on
// failure. Retries are only attempted for temporary network errors or known
// safe failures.
MaxTries int
Deadline DeadlineFunc
ShouldRetry RetryableFunc
Wait WaitFunc
transport *http.Transport
// Convenience method for creating an http client
func NewClient(rt *ResilientTransport) *http.Client {
rt.transport = &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, rt.DialTimeout)
if err != nil {
return nil, err
return c, nil
Proxy: http.ProxyFromEnvironment,
// TODO: Would be nice is ResilientTransport allowed clients to initialize
// with http.Transport attributes.
return &http.Client{
Transport: rt,
var retryingTransport = &ResilientTransport{
Deadline: func() time.Time {
return time.Now().Add(5 * time.Second)
DialTimeout: 10 * time.Second,
MaxTries: 3,
ShouldRetry: awsRetry,
Wait: ExpBackoff,
// Exported default client
var RetryingClient = NewClient(retryingTransport)
func (t *ResilientTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.tries(req)
// Retry a request a maximum of t.MaxTries times.
// We'll only retry if the proper criteria are met.
// If a wait function is specified, wait that amount of time
// In between requests.
func (t *ResilientTransport) tries(req *http.Request) (res *http.Response, err error) {
for try := 0; try < t.MaxTries; try += 1 {
res, err = t.transport.RoundTrip(req)
if !t.ShouldRetry(req, res, err) {
if res != nil {
if t.Wait != nil {
func ExpBackoff(try int) {
time.Sleep(100 * time.Millisecond *
func LinearBackoff(try int) {
time.Sleep(time.Duration(try*100) * time.Millisecond)
// Decide if we should retry a request.
// In general, the criteria for retrying a request is described here
func awsRetry(req *http.Request, res *http.Response, err error) bool {
retry := false
// Retry if there's a temporary network error.
if neterr, ok := err.(net.Error); ok {
if neterr.Temporary() {
retry = true
// Retry if we get a 5xx series error.
if res != nil {
if res.StatusCode >= 500 && res.StatusCode < 600 {
retry = true
return retry
@ -1,289 +0,0 @@
package aws
var USGovWest = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var USEast = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var USWest = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var USWest2 = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var EUWest = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var EUCentral = Region{
ServiceInfo{"", V4Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var APSoutheast = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var APSoutheast2 = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var APNortheast = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var APNortheast2 = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var SAEast = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
ServiceInfo{"", V2Signature},
var CNNorth1 = Region{
ServiceInfo{"", V2Signature},
ServiceInfo{"", V4Signature},
ServiceInfo{"", V4Signature},
@ -1,136 +0,0 @@
package aws
import (
const (
maxDelay = 20 * time.Second
defaultScale = 300 * time.Millisecond
throttlingScale = 500 * time.Millisecond
throttlingScaleRange = throttlingScale / 4
defaultMaxRetries = 3
dynamoDBScale = 25 * time.Millisecond
dynamoDBMaxRetries = 10
// A RetryPolicy encapsulates a strategy for implementing client retries.
// Default implementations are provided which match the AWS SDKs.
type RetryPolicy interface {
// ShouldRetry returns whether a client should retry a failed request.
ShouldRetry(target string, r *http.Response, err error, numRetries int) bool
// Delay returns the time a client should wait before issuing a retry.
Delay(target string, r *http.Response, err error, numRetries int) time.Duration
// DefaultRetryPolicy implements the AWS SDK default retry policy.
// It will retry up to 3 times, and uses an exponential backoff with a scale
// factor of 300ms (300ms, 600ms, 1200ms). If the retry is because of
// throttling, the delay will also include some randomness.
// See
type DefaultRetryPolicy struct {
// ShouldRetry implements the RetryPolicy ShouldRetry method.
func (policy DefaultRetryPolicy) ShouldRetry(target string, r *http.Response, err error, numRetries int) bool {
return shouldRetry(r, err, numRetries, defaultMaxRetries)
// Delay implements the RetryPolicy Delay method.
func (policy DefaultRetryPolicy) Delay(target string, r *http.Response, err error, numRetries int) time.Duration {
scale := defaultScale
if err, ok := err.(*Error); ok && isThrottlingException(err) {
scale = throttlingScale + time.Duration(rand.Int63n(int64(throttlingScaleRange)))
return exponentialBackoff(numRetries, scale)
// DynamoDBRetryPolicy implements the AWS SDK DynamoDB retry policy.
// It will retry up to 10 times, and uses an exponential backoff with a scale
// factor of 25ms (25ms, 50ms, 100ms, ...).
// See
type DynamoDBRetryPolicy struct {
// ShouldRetry implements the RetryPolicy ShouldRetry method.
func (policy DynamoDBRetryPolicy) ShouldRetry(target string, r *http.Response, err error, numRetries int) bool {
return shouldRetry(r, err, numRetries, dynamoDBMaxRetries)
// Delay implements the RetryPolicy Delay method.
func (policy DynamoDBRetryPolicy) Delay(target string, r *http.Response, err error, numRetries int) time.Duration {
return exponentialBackoff(numRetries, dynamoDBScale)
// NeverRetryPolicy never retries requests and returns immediately on failure.
type NeverRetryPolicy struct {
// ShouldRetry implements the RetryPolicy ShouldRetry method.
func (policy NeverRetryPolicy) ShouldRetry(target string, r *http.Response, err error, numRetries int) bool {
return false
// Delay implements the RetryPolicy Delay method.
func (policy NeverRetryPolicy) Delay(target string, r *http.Response, err error, numRetries int) time.Duration {
return time.Duration(0)
// shouldRetry determines if we should retry the request.
// See
func shouldRetry(r *http.Response, err error, numRetries int, maxRetries int) bool {
// Once we've exceeded the max retry attempts, game over.
if numRetries >= maxRetries {
return false
// Always retry temporary network errors.
if err, ok := err.(net.Error); ok && err.Temporary() {
return true
// Always retry 5xx responses.
if r != nil && r.StatusCode >= 500 {
return true
// Always retry throttling exceptions.
if err, ok := err.(ServiceError); ok && isThrottlingException(err) {
return true
// Other classes of failures indicate a problem with the request. Retrying
// won't help.
return false
func exponentialBackoff(numRetries int, scale time.Duration) time.Duration {
if numRetries < 0 {
return time.Duration(0)
delay := (1 << uint(numRetries)) * scale
if delay > maxDelay {
return maxDelay
return delay
func isThrottlingException(err ServiceError) bool {
switch err.ErrorCode() {
case "Throttling", "ThrottlingException", "ProvisionedThroughputExceededException":
return true
return false
@ -1,472 +0,0 @@
package aws
import (
// AWS specifies that the parameters in a signed request must
// be provided in the natural order of the keys. This is distinct
// from the natural order of the encoded value of key=value.
// Percent and gocheck.Equals affect the sorting order.
func EncodeSorted(values url.Values) string {
// preallocate the arrays for perfomance
keys := make([]string, 0, len(values))
sarray := make([]string, 0, len(values))
for k := range values {
keys = append(keys, k)
for _, k := range keys {
for _, v := range values[k] {
sarray = append(sarray, Encode(k)+"="+Encode(v))
return strings.Join(sarray, "&")
type V2Signer struct {
auth Auth
service ServiceInfo
host string
var b64 = base64.StdEncoding
func NewV2Signer(auth Auth, service ServiceInfo) (*V2Signer, error) {
u, err := url.Parse(service.Endpoint)
if err != nil {
return nil, err
return &V2Signer{auth: auth, service: service, host: u.Host}, nil
func (s *V2Signer) Sign(method, path string, params map[string]string) {
params["AWSAccessKeyId"] = s.auth.AccessKey
params["SignatureVersion"] = "2"
params["SignatureMethod"] = "HmacSHA256"
if s.auth.Token() != "" {
params["SecurityToken"] = s.auth.Token()
// AWS specifies that the parameters in a signed request must
// be provided in the natural order of the keys. This is distinct
// from the natural order of the encoded value of key=value.
// Percent and gocheck.Equals affect the sorting order.
var keys, sarray []string
for k := range params {
keys = append(keys, k)
for _, k := range keys {
sarray = append(sarray, Encode(k)+"="+Encode(params[k]))
joined := strings.Join(sarray, "&")
payload := method + "\n" + + "\n" + path + "\n" + joined
hash := hmac.New(sha256.New, []byte(s.auth.SecretKey))
signature := make([]byte, b64.EncodedLen(hash.Size()))
b64.Encode(signature, hash.Sum(nil))
params["Signature"] = string(signature)
func (s *V2Signer) SignRequest(req *http.Request) error {
req.Form.Set("AWSAccessKeyId", s.auth.AccessKey)
req.Form.Set("SignatureVersion", "2")
req.Form.Set("SignatureMethod", "HmacSHA256")
if s.auth.Token() != "" {
req.Form.Set("SecurityToken", s.auth.Token())
payload := req.Method + "\n" + req.URL.Host + "\n" + req.URL.Path + "\n" + EncodeSorted(req.Form)
hash := hmac.New(sha256.New, []byte(s.auth.SecretKey))
signature := make([]byte, b64.EncodedLen(hash.Size()))
b64.Encode(signature, hash.Sum(nil))
req.Form.Set("Signature", string(signature))
req.URL.RawQuery = req.Form.Encode()
return nil
// Common date formats for signing requests
const (
ISO8601BasicFormat = "20060102T150405Z"
ISO8601BasicFormatShort = "20060102"
type Route53Signer struct {
auth Auth
func NewRoute53Signer(auth Auth) *Route53Signer {
return &Route53Signer{auth: auth}
// Creates the authorize signature based on the date stamp and secret key
func (s *Route53Signer) getHeaderAuthorize(message string) string {
hmacSha256 := hmac.New(sha256.New, []byte(s.auth.SecretKey))
cryptedString := hmacSha256.Sum(nil)
return base64.StdEncoding.EncodeToString(cryptedString)
// Adds all the required headers for AWS Route53 API to the request
// including the authorization
func (s *Route53Signer) Sign(req *http.Request) {
date := time.Now().UTC().Format(time.RFC1123)
delete(req.Header, "Date")
req.Header.Set("Date", date)
authHeader := fmt.Sprintf("AWS3-HTTPS AWSAccessKeyId=%s,Algorithm=%s,Signature=%s",
s.auth.AccessKey, "HmacSHA256", s.getHeaderAuthorize(date))
req.Header.Set("Host", req.Host)
req.Header.Set("X-Amzn-Authorization", authHeader)
req.Header.Set("Content-Type", "application/xml")
if s.auth.Token() != "" {
req.Header.Set("X-Amz-Security-Token", s.auth.Token())
The V4Signer encapsulates all of the functionality to sign a request with the AWS
Signature Version 4 Signing Process. (
type V4Signer struct {
auth Auth
serviceName string
region Region
// Add the x-amz-content-sha256 header
IncludeXAmzContentSha256 bool
Return a new instance of a V4Signer capable of signing AWS requests.
func NewV4Signer(auth Auth, serviceName string, region Region) *V4Signer {
return &V4Signer{
auth: auth,
serviceName: serviceName,
region: region,
IncludeXAmzContentSha256: false,
Sign a request according to the AWS Signature Version 4 Signing Process. (
The signed request will include an "x-amz-date" header with a current timestamp if a valid "x-amz-date"
or "date" header was not available in the original request. In addition, AWS Signature Version 4 requires
the "host" header to be a signed header, therefor the Sign method will manually set a "host" header from
the request.Host.
The signed request will include a new "Authorization" header indicating that the request has been signed.
Any changes to the request after signing the request will invalidate the signature.
func (s *V4Signer) Sign(req *http.Request) {
req.Header.Set("host", req.Host) // host header must be included as a signed header
t := s.requestTime(req) // Get request time
payloadHash := ""
if _, ok := req.Form["X-Amz-Expires"]; ok {
// We are authenticating the the request by using query params
// (also known as pre-signing a url,
payloadHash = "UNSIGNED-PAYLOAD"
req.Form["X-Amz-SignedHeaders"] = []string{s.signedHeaders(req.Header)}
req.Form["X-Amz-Algorithm"] = []string{"AWS4-HMAC-SHA256"}
req.Form["X-Amz-Credential"] = []string{s.auth.AccessKey + "/" + s.credentialScope(t)}
req.Form["X-Amz-Date"] = []string{t.Format(ISO8601BasicFormat)}
req.URL.RawQuery = req.Form.Encode()
} else {
payloadHash = s.payloadHash(req)
if s.IncludeXAmzContentSha256 {
req.Header.Set("x-amz-content-sha256", payloadHash) // x-amz-content-sha256 contains the payload hash
creq := s.canonicalRequest(req, payloadHash) // Build canonical request
sts := s.stringToSign(t, creq) // Build string to sign
signature := s.signature(t, sts) // Calculate the AWS Signature Version 4
auth := s.authorization(req.Header, t, signature) // Create Authorization header value
if _, ok := req.Form["X-Amz-Expires"]; ok {
req.Form["X-Amz-Signature"] = []string{signature}
} else {
req.Header.Set("Authorization", auth) // Add Authorization header to request
func (s *V4Signer) SignRequest(req *http.Request) error {
return nil
requestTime method will parse the time from the request "x-amz-date" or "date" headers.
If the "x-amz-date" header is present, that will take priority over the "date" header.
If neither header is defined or we are unable to parse either header as a valid date
then we will create a new "x-amz-date" header with the current time.
func (s *V4Signer) requestTime(req *http.Request) time.Time {
// Get "x-amz-date" header
date := req.Header.Get("x-amz-date")
// Attempt to parse as ISO8601BasicFormat
t, err := time.Parse(ISO8601BasicFormat, date)
if err == nil {
return t
// Attempt to parse as http.TimeFormat
t, err = time.Parse(http.TimeFormat, date)
if err == nil {
req.Header.Set("x-amz-date", t.Format(ISO8601BasicFormat))
return t
// Get "date" header
date = req.Header.Get("date")
// Attempt to parse as http.TimeFormat
t, err = time.Parse(http.TimeFormat, date)
if err == nil {
return t
// Create a current time header to be used
t = time.Now().UTC()
req.Header.Set("x-amz-date", t.Format(ISO8601BasicFormat))
return t
canonicalRequest method creates the canonical request according to Task 1 of the AWS Signature Version 4 Signing Process. (
CanonicalRequest =
HTTPRequestMethod + '\n' +
CanonicalURI + '\n' +
CanonicalQueryString + '\n' +
CanonicalHeaders + '\n' +
SignedHeaders + '\n' +
payloadHash is optional; use the empty string and it will be calculated from the request
func (s *V4Signer) canonicalRequest(req *http.Request, payloadHash string) string {
if payloadHash == "" {
payloadHash = s.payloadHash(req)
c := new(bytes.Buffer)
fmt.Fprintf(c, "%s\n", req.Method)
fmt.Fprintf(c, "%s\n", s.canonicalURI(req.URL))
fmt.Fprintf(c, "%s\n", s.canonicalQueryString(req.URL))
fmt.Fprintf(c, "%s\n\n", s.canonicalHeaders(req.Header))
fmt.Fprintf(c, "%s\n", s.signedHeaders(req.Header))
fmt.Fprintf(c, "%s", payloadHash)
return c.String()
func (s *V4Signer) canonicalURI(u *url.URL) string {
u = &url.URL{Path: u.Path}
canonicalPath := u.String()
slash := strings.HasSuffix(canonicalPath, "/")
canonicalPath = path.Clean(canonicalPath)
if canonicalPath == "" || canonicalPath == "." {
canonicalPath = "/"
if canonicalPath != "/" && slash {
canonicalPath += "/"
return canonicalPath
func (s *V4Signer) canonicalQueryString(u *url.URL) string {
keyValues := make(map[string]string, len(u.Query()))
keys := make([]string, len(u.Query()))
key_i := 0
for k, vs := range u.Query() {
k = url.QueryEscape(k)
a := make([]string, len(vs))
for idx, v := range vs {
v = url.QueryEscape(v)
a[idx] = fmt.Sprintf("%s=%s", k, v)
keyValues[k] = strings.Join(a, "&")
keys[key_i] = k
query := make([]string, len(keys))
for idx, key := range keys {
query[idx] = keyValues[key]
query_str := strings.Join(query, "&")
// AWS V4 signing requires that the space characters
// are encoded as %20 instead of +. On the other hand
// golangs url.QueryEscape as well as url.Values.Encode()
// both encode the space as a + character. See:
return strings.Replace(query_str, "+", "%20", -1)
func (s *V4Signer) canonicalHeaders(h http.Header) string {
i, a, lowerCase := 0, make([]string, len(h)), make(map[string][]string)
for k, v := range h {
lowerCase[strings.ToLower(k)] = v
var keys []string
for k := range lowerCase {
keys = append(keys, k)
for _, k := range keys {
v := lowerCase[k]
for j, w := range v {
v[j] = strings.Trim(w, " ")
a[i] = strings.ToLower(k) + ":" + strings.Join(v, ",")
return strings.Join(a, "\n")
func (s *V4Signer) signedHeaders(h http.Header) string {
i, a := 0, make([]string, len(h))
for k := range h {
a[i] = strings.ToLower(k)
return strings.Join(a, ";")
func (s *V4Signer) payloadHash(req *http.Request) string {
var b []byte
if req.Body == nil {
b = []byte("")
} else {
var err error
b, err = ioutil.ReadAll(req.Body)
if err != nil {
req.Body = ioutil.NopCloser(bytes.NewBuffer(b))
return s.hash(string(b))
stringToSign method creates the string to sign accorting to Task 2 of the AWS Signature Version 4 Signing Process. (
StringToSign =
Algorithm + '\n' +
RequestDate + '\n' +
CredentialScope + '\n' +
func (s *V4Signer) stringToSign(t time.Time, creq string) string {
w := new(bytes.Buffer)
fmt.Fprint(w, "AWS4-HMAC-SHA256\n")
fmt.Fprintf(w, "%s\n", t.Format(ISO8601BasicFormat))
fmt.Fprintf(w, "%s\n", s.credentialScope(t))
fmt.Fprintf(w, "%s", s.hash(creq))
return w.String()
func (s *V4Signer) credentialScope(t time.Time) string {
return fmt.Sprintf("%s/%s/%s/aws4_request", t.Format(ISO8601BasicFormatShort), s.region.Name, s.serviceName)
signature method calculates the AWS Signature Version 4 according to Task 3 of the AWS Signature Version 4 Signing Process. (
signature = HexEncode(HMAC(derived-signing-key, string-to-sign))
func (s *V4Signer) signature(t time.Time, sts string) string {
h := s.hmac(s.derivedKey(t), []byte(sts))
return fmt.Sprintf("%x", h)
derivedKey method derives a signing key to be used for signing a request.
kSecret = Your AWS Secret Access Key
kDate = HMAC("AWS4" + kSecret, Date)
kRegion = HMAC(kDate, Region)
kService = HMAC(kRegion, Service)
kSigning = HMAC(kService, "aws4_request")
func (s *V4Signer) derivedKey(t time.Time) []byte {
h := s.hmac([]byte("AWS4"+s.auth.SecretKey), []byte(t.Format(ISO8601BasicFormatShort)))
h = s.hmac(h, []byte(s.region.Name))
h = s.hmac(h, []byte(s.serviceName))
h = s.hmac(h, []byte("aws4_request"))
return h
authorization method generates the authorization header value.
func (s *V4Signer) authorization(header http.Header, t time.Time, signature string) string {
w := new(bytes.Buffer)
fmt.Fprint(w, "AWS4-HMAC-SHA256 ")
fmt.Fprintf(w, "Credential=%s/%s, ", s.auth.AccessKey, s.credentialScope(t))
fmt.Fprintf(w, "SignedHeaders=%s, ", s.signedHeaders(header))
fmt.Fprintf(w, "Signature=%s", signature)
return w.String()
// hash method calculates the sha256 hash for a given string
func (s *V4Signer) hash(in string) string {
h := sha256.New()
fmt.Fprintf(h, "%s", in)
return fmt.Sprintf("%x", h.Sum(nil))
// hmac method calculates the sha256 hmac for a given slice of bytes
func (s *V4Signer) hmac(key, data []byte) []byte {
h := hmac.New(sha256.New, key)
return h.Sum(nil)
@ -1,202 +0,0 @@
package s3
import (
// Implements an interface for s3 bucket lifecycle configuration
// See for details.
const (
LifecycleRuleStatusEnabled = "Enabled"
LifecycleRuleStatusDisabled = "Disabled"
LifecycleRuleDateFormat = "2006-01-02"
StorageClassGlacier = "GLACIER"
type Expiration struct {
Days *uint `xml:"Days,omitempty"`
Date string `xml:"Date,omitempty"`
// Returns Date as a time.Time.
func (r *Expiration) ParseDate() (time.Time, error) {
return time.Parse(LifecycleRuleDateFormat, r.Date)
type Transition struct {
Days *uint `xml:"Days,omitempty"`
Date string `xml:"Date,omitempty"`
StorageClass string `xml:"StorageClass"`
// Returns Date as a time.Time.
func (r *Transition) ParseDate() (time.Time, error) {
return time.Parse(LifecycleRuleDateFormat, r.Date)
type NoncurrentVersionExpiration struct {
Days *uint `xml:"NoncurrentDays,omitempty"`
type NoncurrentVersionTransition struct {
Days *uint `xml:"NoncurrentDays,omitempty"`
StorageClass string `xml:"StorageClass"`
type LifecycleRule struct {
ID string `xml:"ID"`
Prefix string `xml:"Prefix"`
Status string `xml:"Status"`
NoncurrentVersionTransition *NoncurrentVersionTransition `xml:"NoncurrentVersionTransition,omitempty"`
NoncurrentVersionExpiration *NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration,omitempty"`
Transition *Transition `xml:"Transition,omitempty"`
Expiration *Expiration `xml:"Expiration,omitempty"`
// Create a lifecycle rule with arbitrary identifier id and object name prefix
// for which the rules should apply.
func NewLifecycleRule(id, prefix string) *LifecycleRule {
rule := &LifecycleRule{
ID: id,
Prefix: prefix,
Status: LifecycleRuleStatusEnabled,
return rule
// Adds a transition rule in days. Overwrites any previous transition rule.
func (r *LifecycleRule) SetTransitionDays(days uint) {
r.Transition = &Transition{
Days: &days,
StorageClass: StorageClassGlacier,
// Adds a transition rule as a date. Overwrites any previous transition rule.
func (r *LifecycleRule) SetTransitionDate(date time.Time) {
r.Transition = &Transition{
Date: date.Format(LifecycleRuleDateFormat),
StorageClass: StorageClassGlacier,
// Adds an expiration rule in days. Overwrites any previous expiration rule.
// Days must be > 0.
func (r *LifecycleRule) SetExpirationDays(days uint) {
r.Expiration = &Expiration{
Days: &days,
// Adds an expiration rule as a date. Overwrites any previous expiration rule.
func (r *LifecycleRule) SetExpirationDate(date time.Time) {
r.Expiration = &Expiration{
Date: date.Format(LifecycleRuleDateFormat),
// Adds a noncurrent version transition rule. Overwrites any previous
// noncurrent version transition rule.
func (r *LifecycleRule) SetNoncurrentVersionTransitionDays(days uint) {
r.NoncurrentVersionTransition = &NoncurrentVersionTransition{
Days: &days,
StorageClass: StorageClassGlacier,
// Adds a noncurrent version expiration rule. Days must be > 0. Overwrites
// any previous noncurrent version expiration rule.
func (r *LifecycleRule) SetNoncurrentVersionExpirationDays(days uint) {
r.NoncurrentVersionExpiration = &NoncurrentVersionExpiration{
Days: &days,
// Marks the rule as disabled.
func (r *LifecycleRule) Disable() {
r.Status = LifecycleRuleStatusDisabled
// Marks the rule as enabled (default).
func (r *LifecycleRule) Enable() {
r.Status = LifecycleRuleStatusEnabled
type LifecycleConfiguration struct {
XMLName xml.Name `xml:"LifecycleConfiguration"`
Rules *[]*LifecycleRule `xml:"Rule,omitempty"`
// Adds a LifecycleRule to the configuration.
func (c *LifecycleConfiguration) AddRule(r *LifecycleRule) {
var rules []*LifecycleRule
if c.Rules != nil {
rules = *c.Rules
rules = append(rules, r)
c.Rules = &rules
// Sets the bucket's lifecycle configuration.
func (b *Bucket) PutLifecycleConfiguration(c *LifecycleConfiguration) error {
doc, err := xml.Marshal(c)
if err != nil {
return err
buf := makeXmlBuffer(doc)
digest := md5.New()
size, err := digest.Write(buf.Bytes())
if err != nil {
return err
headers := map[string][]string{
"Content-Length": {strconv.FormatInt(int64(size), 10)},
"Content-MD5": {base64.StdEncoding.EncodeToString(digest.Sum(nil))},
req := &request{
path: "/",
method: "PUT",
bucket: b.Name,
headers: headers,
payload: buf,
params: url.Values{"lifecycle": {""}},
return b.S3.queryV4Sign(req, nil)
// Retrieves the lifecycle configuration for the bucket. AWS returns an error
// if no lifecycle found.
func (b *Bucket) GetLifecycleConfiguration() (*LifecycleConfiguration, error) {
req := &request{
method: "GET",
bucket: b.Name,
path: "/",
params: url.Values{"lifecycle": {""}},
conf := &LifecycleConfiguration{}
err := b.S3.queryV4Sign(req, conf)
return conf, err
// Delete the bucket's lifecycle configuration.
func (b *Bucket) DeleteLifecycleConfiguration() error {
req := &request{
method: "DELETE",
bucket: b.Name,
path: "/",
params: url.Values{"lifecycle": {""}},
return b.S3.queryV4Sign(req, nil)
@ -1,508 +0,0 @@
package s3
import (
// Multi represents an unfinished multipart upload.
// Multipart uploads allow sending big objects in smaller chunks.
// After all parts have been sent, the upload must be explicitly
// completed by calling Complete with the list of parts.
// See for an overview of multipart uploads.
type Multi struct {
Bucket *Bucket
Key string
UploadId string
// That's the default. Here just for testing.
var listMultiMax = 1000
type listMultiResp struct {
NextKeyMarker string
NextUploadIdMarker string
IsTruncated bool
Upload []Multi
CommonPrefixes []string `xml:"CommonPrefixes>Prefix"`
// ListMulti returns the list of unfinished multipart uploads in b.
// The prefix parameter limits the response to keys that begin with the
// specified prefix. You can use prefixes to separate a bucket into different
// groupings of keys (to get the feeling of folders, for example).
// The delim parameter causes the response to group all of the keys that
// share a common prefix up to the next delimiter in a single entry within
// the CommonPrefixes field. You can use delimiters to separate a bucket
// into different groupings of keys, similar to how folders would work.
// See for details.
func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
params := map[string][]string{
"uploads": {""},
"max-uploads": {strconv.FormatInt(int64(listMultiMax), 10)},
"prefix": {prefix},
"delimiter": {delim},
for attempt := attempts.Start(); attempt.Next(); {
req := &request{
method: "GET",
bucket: b.Name,
params: params,
var resp listMultiResp
err := b.S3.query(req, &resp)
if shouldRetry(err) && attempt.HasNext() {
if err != nil {
return nil, nil, err
for i := range resp.Upload {
multi := &resp.Upload[i]
multi.Bucket = b
multis = append(multis, multi)
prefixes = append(prefixes, resp.CommonPrefixes...)
if !resp.IsTruncated {
return multis, prefixes, nil
params["key-marker"] = []string{resp.NextKeyMarker}
params["upload-id-marker"] = []string{resp.NextUploadIdMarker}
attempt = attempts.Start() // Last request worked.
// Multi returns a multipart upload handler for the provided key
// inside b. If a multipart upload exists for key, it is returned,
// otherwise a new multipart upload is initiated with contType and perm.
func (b *Bucket) Multi(key, contType string, perm ACL, options Options) (*Multi, error) {
multis, _, err := b.ListMulti(key, "")
if err != nil && !hasCode(err, "NoSuchUpload") {
return nil, err
for _, m := range multis {
if m.Key == key {
return m, nil
return b.InitMulti(key, contType, perm, options)
// InitMulti initializes a new multipart upload at the provided
// key inside b and returns a value for manipulating it.
// See for details.
func (b *Bucket) InitMulti(key string, contType string, perm ACL, options Options) (*Multi, error) {
headers := map[string][]string{
"Content-Type": {contType},
"Content-Length": {"0"},
"x-amz-acl": {string(perm)},
params := map[string][]string{
"uploads": {""},
req := &request{
method: "POST",
bucket: b.Name,
path: key,
headers: headers,
params: params,
var err error
var resp struct {
UploadId string `xml:"UploadId"`
for attempt := attempts.Start(); attempt.Next(); {
err = b.S3.query(req, &resp)
if !shouldRetry(err) {
if err != nil {
return nil, err
return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) {
headers := map[string][]string{
"x-amz-copy-source": {url.QueryEscape(source)},
params := map[string][]string{
"uploadId": {m.UploadId},
"partNumber": {strconv.FormatInt(int64(n), 10)},
sourceBucket := m.Bucket.S3.Bucket(strings.TrimRight(strings.SplitAfterN(source, "/", 2)[0], "/"))
sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 2)[1], nil)
if err != nil {
return nil, Part{}, err
for attempt := attempts.Start(); attempt.Next(); {
req := &request{
method: "PUT",
bucket: m.Bucket.Name,
path: m.Key,
headers: headers,
params: params,
resp := &CopyObjectResult{}
err = m.Bucket.S3.query(req, resp)
if shouldRetry(err) && attempt.HasNext() {
if err != nil {
return nil, Part{}, err
if resp.ETag == "" {
return nil, Part{}, errors.New("part upload succeeded with no ETag")
return resp, Part{n, resp.ETag, sourceMeta.ContentLength}, nil
// PutPart sends part n of the multipart upload, reading all the content from r.
// Each part, except for the last one, must be at least 5MB in size.
// See for details.
func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
partSize, _, md5b64, err := seekerInfo(r)
if err != nil {
return Part{}, err
return m.putPart(n, r, partSize, md5b64)
func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) {
headers := map[string][]string{
"Content-Length": {strconv.FormatInt(partSize, 10)},
"Content-MD5": {md5b64},
params := map[string][]string{
"uploadId": {m.UploadId},
"partNumber": {strconv.FormatInt(int64(n), 10)},
for attempt := attempts.Start(); attempt.Next(); {
_, err := r.Seek(0, 0)
if err != nil {
return Part{}, err
req := &request{
method: "PUT",
bucket: m.Bucket.Name,
path: m.Key,
headers: headers,
params: params,
payload: r,
err = m.Bucket.S3.prepare(req)
if err != nil {
return Part{}, err
resp, err :=, nil)
if shouldRetry(err) && attempt.HasNext() {
if err != nil {
return Part{}, err
etag := resp.Header.Get("ETag")
if etag == "" {
return Part{}, errors.New("part upload succeeded with no ETag")
return Part{n, etag, partSize}, nil
func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) {
_, err = r.Seek(0, 0)
if err != nil {
return 0, "", "", err
digest := md5.New()
size, err = io.Copy(digest, r)
if err != nil {
return 0, "", "", err
sum := digest.Sum(nil)
md5hex = hex.EncodeToString(sum)
md5b64 = base64.StdEncoding.EncodeToString(sum)
return size, md5hex, md5b64, nil
type Part struct {
N int `xml:"PartNumber"`
ETag string
Size int64
type partSlice []Part
func (s partSlice) Len() int { return len(s) }
func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N }
func (s partSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
type listPartsResp struct {
NextPartNumberMarker string
IsTruncated bool
Part []Part
// That's the default. Here just for testing.
var listPartsMax = 1000
// Kept for backcompatability. See the documentation for ListPartsFull
func (m *Multi) ListParts() ([]Part, error) {
return m.ListPartsFull(0, listPartsMax)
// ListParts returns the list of previously uploaded parts in m,
// ordered by part number (Only parts with higher part numbers than
// partNumberMarker will be listed). Only up to maxParts parts will be
// returned.
// See for details.
func (m *Multi) ListPartsFull(partNumberMarker int, maxParts int) ([]Part, error) {
if maxParts > listPartsMax {
maxParts = listPartsMax
params := map[string][]string{
"uploadId": {m.UploadId},
"max-parts": {strconv.FormatInt(int64(maxParts), 10)},
"part-number-marker": {strconv.FormatInt(int64(partNumberMarker), 10)},
var parts partSlice
for attempt := attempts.Start(); attempt.Next(); {
req := &request{
method: "GET",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
var resp listPartsResp
err := m.Bucket.S3.query(req, &resp)
if shouldRetry(err) && attempt.HasNext() {
if err != nil {
return nil, err
parts = append(parts, resp.Part...)
if !resp.IsTruncated {
return parts, nil
params["part-number-marker"] = []string{resp.NextPartNumberMarker}
attempt = attempts.Start() // Last request worked.
type ReaderAtSeeker interface {
// PutAll sends all of r via a multipart upload with parts no larger
// than partSize bytes, which must be set to at least 5MB.
// Parts previously uploaded are either reused if their checksum
// and size match the new part, or otherwise overwritten with the
// new content.
// PutAll returns all the parts of m (reused or not).
func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) {
old, err := m.ListParts()
if err != nil && !hasCode(err, "NoSuchUpload") {
return nil, err
reuse := 0 // Index of next old part to consider reusing.
current := 1 // Part number of latest good part handled.
totalSize, err := r.Seek(0, 2)
if err != nil {
return nil, err
first := true // Must send at least one empty part if the file is empty.
var result []Part
for offset := int64(0); offset < totalSize || first; offset += partSize {
first = false
if offset+partSize > totalSize {
partSize = totalSize - offset
section := io.NewSectionReader(r, offset, partSize)
_, md5hex, md5b64, err := seekerInfo(section)
if err != nil {
return nil, err
for reuse < len(old) && old[reuse].N <= current {
// Looks like this part was already sent.
part := &old[reuse]
etag := `"` + md5hex + `"`
if part.N == current && part.Size == partSize && part.ETag == etag {
// Checksum matches. Reuse the old part.
result = append(result, *part)
continue NextSection
// Part wasn't found or doesn't match. Send it.
part, err := m.putPart(current, section, partSize, md5b64)
if err != nil {
return nil, err
result = append(result, part)
return result, nil
type completeUpload struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts completeParts `xml:"Part"`
type completePart struct {
PartNumber int
ETag string
type completeParts []completePart
func (p completeParts) Len() int { return len(p) }
func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// We can't know in advance whether we'll have an Error or a
// CompleteMultipartUploadResult, so this structure is just a placeholder to
// know the name of the XML object.
type completeUploadResp struct {
XMLName xml.Name
InnerXML string `xml:",innerxml"`
// Complete assembles the given previously uploaded parts into the
// final object. This operation may take several minutes.
// See for details.
func (m *Multi) Complete(parts []Part) error {
params := map[string][]string{
"uploadId": {m.UploadId},
c := completeUpload{}
for _, p := range parts {
c.Parts = append(c.Parts, completePart{p.N, p.ETag})
data, err := xml.Marshal(&c)
if err != nil {
return err
for attempt := attempts.Start(); attempt.Next(); {
req := &request{
method: "POST",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
payload: bytes.NewReader(data),
var resp completeUploadResp
if m.Bucket.Region.Name == "generic" {
headers := make(http.Header)
headers.Add("Content-Length", strconv.FormatInt(int64(len(data)), 10))
req.headers = headers
err := m.Bucket.S3.query(req, &resp)
if shouldRetry(err) && attempt.HasNext() {
if err != nil {
return err
// A 200 error code does not guarantee that there were no errors (see
// ),
// so first figure out what kind of XML "object" we are dealing with.
if resp.XMLName.Local == "Error" {
// S3.query does the unmarshalling for us, so we can't unmarshal
// again in a different struct... So we need to duct-tape back the
// original XML back together.
fullErrorXml := "<Error>" + resp.InnerXML + "</Error>"
s3err := &Error{}
if err := xml.Unmarshal([]byte(fullErrorXml), s3err); err != nil {
return err
return s3err
if resp.XMLName.Local == "CompleteMultipartUploadResult" {
// FIXME: One could probably add a CompleteFull method returning the
// actual contents of the CompleteMultipartUploadResult object.
return nil
return errors.New("Invalid XML struct returned: " + resp.XMLName.Local)
// Abort deletes an unifinished multipart upload and any previously
// uploaded parts for it.
// After a multipart upload is aborted, no additional parts can be
// uploaded using it. However, if any part uploads are currently in
// progress, those part uploads might or might not succeed. As a result,
// it might be necessary to abort a given multipart upload multiple
// times in order to completely free all storage consumed by all parts.
// NOTE: If the described scenario happens to you, please report back to
// the goamz authors with details. In the future such retrying should be
// handled internally, but it's not clear what happens precisely (Is an
// error returned? Is the issue completely undetectable?).
// See for details.
func (m *Multi) Abort() error {
params := map[string][]string{
"uploadId": {m.UploadId},
for attempt := attempts.Start(); attempt.Next(); {
req := &request{
method: "DELETE",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
err := m.Bucket.S3.query(req, nil)
if shouldRetry(err) && attempt.HasNext() {
return err
File diff suppressed because it is too large
Load diff
@ -1,120 +0,0 @@
package s3
import (
var b64 = base64.StdEncoding
// ----------------------------------------------------------------------------
// S3 signing (
var s3ParamsToSign = map[string]bool{
"acl": true,
"location": true,
"logging": true,
"notification": true,
"partNumber": true,
"policy": true,
"requestPayment": true,
"torrent": true,
"uploadId": true,
"uploads": true,
"versionId": true,
"versioning": true,
"versions": true,
"response-content-type": true,
"response-content-language": true,
"response-expires": true,
"response-cache-control": true,
"response-content-disposition": true,
"response-content-encoding": true,
"website": true,
"delete": true,
func sign(auth aws.Auth, method, canonicalPath string, params, headers map[string][]string) {
var md5, ctype, date, xamz string
var xamzDate bool
var keys, sarray []string
xheaders := make(map[string]string)
for k, v := range headers {
k = strings.ToLower(k)
switch k {
case "content-md5":
md5 = v[0]
case "content-type":
ctype = v[0]
case "date":
if !xamzDate {
date = v[0]
if strings.HasPrefix(k, "x-amz-") {
keys = append(keys, k)
xheaders[k] = strings.Join(v, ",")
if k == "x-amz-date" {
xamzDate = true
date = ""
if len(keys) > 0 {
for i := range keys {
key := keys[i]
value := xheaders[key]
sarray = append(sarray, key+":"+value)
xamz = strings.Join(sarray, "\n") + "\n"
expires := false
if v, ok := params["Expires"]; ok {
// Query string request authentication alternative.
expires = true
date = v[0]
params["AWSAccessKeyId"] = []string{auth.AccessKey}
sarray = sarray[0:0]
for k, v := range params {
if s3ParamsToSign[k] {
for _, vi := range v {
if vi == "" {
sarray = append(sarray, k)
} else {
// "When signing you do not encode these values."
sarray = append(sarray, k+"="+vi)
if len(sarray) > 0 {
canonicalPath = canonicalPath + "?" + strings.Join(sarray, "&")
payload := method + "\n" + md5 + "\n" + ctype + "\n" + date + "\n" + xamz + canonicalPath
hash := hmac.New(sha1.New, []byte(auth.SecretKey))
signature := make([]byte, b64.EncodedLen(hash.Size()))
b64.Encode(signature, hash.Sum(nil))
if expires {
params["Signature"] = []string{string(signature)}
} else {
headers["Authorization"] = []string{"AWS " + auth.AccessKey + ":" + string(signature)}
if debug {
log.Printf("Signature payload: %q", payload)
log.Printf("Signature: %q", signature)
Add table
