Merge pull request #864 from AndreyKostov/next-generation

S3 storage driver for next gen modifications
This commit is contained in:
Olivier Gambier 2014-12-23 15:03:10 -08:00
commit e33c5d90b2
4 changed files with 476 additions and 141 deletions

View file

@ -1,13 +1,28 @@
// +build ignore // Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
// This package leverages the crowdmob/goamz client library for interfacing with
// s3.
//
// Because s3 is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Keep in mind that s3 guarantees only eventual consistency, so do not assume
// that a successful write will mean immediate access to the data written (although
// in most regions a new object put has guaranteed read after write). The only true
// guarantee is that once you call Stat and receive a certain file size, that much of
// the file is already accessible.
package s3 package s3
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time"
"github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/aws"
"github.com/crowdmob/goamz/s3" "github.com/crowdmob/goamz/s3"
@ -19,10 +34,10 @@ const driverName = "s3"
// minChunkSize defines the minimum multipart upload chunk size // minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB // S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 * 1024 * 1024 const chunkSize = 5 * 1024 * 1024
// listPartsMax is the largest amount of parts you can request from S3 // listMax is the largest amount of objects you can request from S3 in a list call
const listPartsMax = 1000 const listMax = 1000
func init() { func init() {
factory.Register(driverName, &s3DriverFactory{}) factory.Register(driverName, &s3DriverFactory{})
@ -31,7 +46,7 @@ func init() {
// s3DriverFactory implements the factory.StorageDriverFactory interface // s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct{} type s3DriverFactory struct{}
func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { func (factory *s3DriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters) return FromParameters(parameters)
} }
@ -41,6 +56,7 @@ type Driver struct {
S3 *s3.S3 S3 *s3.S3
Bucket *s3.Bucket Bucket *s3.Bucket
Encrypt bool Encrypt bool
rootDirectory string
} }
// FromParameters constructs a new Driver with a given parameters map // FromParameters constructs a new Driver with a given parameters map
@ -50,28 +66,24 @@ type Driver struct {
// - region // - region
// - bucket // - bucket
// - encrypt // - encrypt
func FromParameters(parameters map[string]string) (*Driver, error) { func FromParameters(parameters map[string]interface{}) (*Driver, error) {
accessKey, ok := parameters["accesskey"] // Providing no values for these is valid in case the user is authenticating
if !ok || accessKey == "" { // with an IAM on an ec2 instance (in which case the instance credentials will
return nil, fmt.Errorf("No accesskey parameter provided") // be summoned when GetAuth is called)
} accessKey, _ := parameters["accesskey"]
secretKey, _ := parameters["secretkey"]
secretKey, ok := parameters["secretkey"]
if !ok || secretKey == "" {
return nil, fmt.Errorf("No secretkey parameter provided")
}
regionName, ok := parameters["region"] regionName, ok := parameters["region"]
if !ok || regionName == "" { if !ok || regionName.(string) == "" {
return nil, fmt.Errorf("No region parameter provided") return nil, fmt.Errorf("No region parameter provided")
} }
region := aws.GetRegion(regionName) region := aws.GetRegion(fmt.Sprint(regionName))
if region.Name == "" { if region.Name == "" {
return nil, fmt.Errorf("Invalid region provided: %v", region) return nil, fmt.Errorf("Invalid region provided: %v", region)
} }
bucket, ok := parameters["bucket"] bucket, ok := parameters["bucket"]
if !ok || bucket == "" { if !ok || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("No bucket parameter provided") return nil, fmt.Errorf("No bucket parameter provided")
} }
@ -80,136 +92,415 @@ func FromParameters(parameters map[string]string) (*Driver, error) {
return nil, fmt.Errorf("No encrypt parameter provided") return nil, fmt.Errorf("No encrypt parameter provided")
} }
encryptBool, err := strconv.ParseBool(encrypt) encryptBool, ok := encrypt.(bool)
if err != nil { if !ok {
return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err) return nil, fmt.Errorf("The encrypt parameter should be a boolean")
} }
return New(accessKey, secretKey, region, encryptBool, bucket)
rootDirectory, ok := parameters["rootdirectory"]
if !ok {
return nil, fmt.Errorf("No rootdirectory parameter provided")
}
return New(fmt.Sprint(accessKey), fmt.Sprint(secretKey), fmt.Sprint(bucket), fmt.Sprint(rootDirectory), region, encryptBool)
} }
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and // New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName // bucketName
func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*Driver, error) { func New(accessKey, secretKey, bucketName, rootDirectory string, region aws.Region, encrypt bool) (*Driver, error) {
auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey} auth, err := aws.GetAuth(accessKey, secretKey, "", time.Time{})
if err != nil {
return nil, err
}
s3obj := s3.New(auth, region) s3obj := s3.New(auth, region)
bucket := s3obj.Bucket(bucketName) bucket := s3obj.Bucket(bucketName)
if err := bucket.PutBucket(getPermissions()); err != nil { if _, err := bucket.List("", "", "", 1); err != nil {
s3Err, ok := err.(*s3.Error)
if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") {
return nil, err return nil, err
} }
}
return &Driver{s3obj, bucket, encrypt}, nil // TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
return &Driver{s3obj, bucket, encrypt, rootDirectory}, nil
} }
// Implement the storagedriver.StorageDriver interface // Implement the storagedriver.StorageDriver interface
// GetContent retrieves the content stored at "path" as a []byte. // GetContent retrieves the content stored at "path" as a []byte.
func (d *Driver) GetContent(path string) ([]byte, error) { func (d *Driver) GetContent(path string) ([]byte, error) {
content, err := d.Bucket.Get(path) if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
}
content, err := d.Bucket.Get(d.s3Path(path))
if err != nil { if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path} return nil, parseError(path, err)
} }
return content, nil return content, nil
} }
// PutContent stores the []byte content at a location designated by "path". // PutContent stores the []byte content at a location designated by "path".
func (d *Driver) PutContent(path string, contents []byte) error { func (d *Driver) PutContent(path string, contents []byte) error {
return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions()) if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
}
return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
} }
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset. // given byte offset.
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
}
if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
headers := make(http.Header) headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
resp, err := d.Bucket.GetResponseWithHeaders(path, headers) resp, err := d.Bucket.GetResponseWithHeaders(d.s3Path(path), headers)
if err != nil { if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path} if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "InvalidRange" {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
return nil, parseError(path, err)
} }
return resp.Body, nil return resp.Body, nil
} }
// WriteStream stores the contents of the provided io.ReadCloser at a location // WriteStream stores the contents of the provided io.Reader at a
// designated by the given path. // location designated by the given path. The driver will know it has
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { // received the full contents when the reader returns io.EOF. The number
defer reader.Close() // of successfully READ bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) {
if !storagedriver.PathRegexp.MatchString(path) {
return 0, storagedriver.InvalidPathError{Path: path}
}
chunkSize := int64(minChunkSize) if offset < 0 {
for size/chunkSize >= listPartsMax { return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
chunkSize *= 2
} }
partNumber := 1 partNumber := 1
var totalRead int64 bytesRead := 0
multi, parts, err := d.getAllParts(path) parts := []s3.Part{}
var part s3.Part
multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions())
if err != nil {
return 0, err
}
buf := make([]byte, chunkSize)
zeroBuf := make([]byte, chunkSize)
// We never want to leave a dangling multipart upload, our only consistent state is
// when there is a whole object at path. This is in order to remain consistent with
// the stat call.
//
// Note that if the machine dies before executing the defer, we will be left with a dangling
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
// made prior to the machine crashing.
defer func() {
if len(parts) > 0 {
if multi == nil {
// Parts should be empty if the multi is not initialized
panic("Unreachable")
} else {
if multi.Complete(parts) != nil {
multi.Abort()
}
}
}
}()
// Fills from 0 to total from current
fromSmallCurrent := func(total int64) error {
current, err := d.ReadStream(path, 0)
if err != nil { if err != nil {
return err return err
} }
if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { bytesRead = 0
return storagedriver.InvalidOffsetError{Path: path, Offset: offset} for int64(bytesRead) < total {
} //The loop should very rarely enter a second iteration
nn, err := current.Read(buf[bytesRead:total])
if len(parts) > 0 { bytesRead += nn
partNumber = int(offset/chunkSize) + 1 if err != nil {
totalRead = offset if err != io.EOF {
parts = parts[0 : partNumber-1]
}
buf := make([]byte, chunkSize)
for {
bytesRead, err := io.ReadFull(reader, buf)
totalRead += int64(bytesRead)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return err return err
} else if (int64(bytesRead) < chunkSize) && totalRead != size { }
break break
} else { }
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))
}
return nil
}
// Fills from parameter to chunkSize from reader
fromReader := func(from int64) error {
bytesRead = 0
for from+int64(bytesRead) < chunkSize {
nn, err := reader.Read(buf[from+int64(bytesRead):])
totalRead += int64(nn)
bytesRead += nn
if err != nil {
if err != io.EOF {
return err
}
break
}
}
if bytesRead > 0 {
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if err != nil { if err != nil {
return err return err
} }
parts = append(parts, part) parts = append(parts, part)
if totalRead == size {
multi.Complete(parts)
break
}
partNumber++ partNumber++
} }
}
return nil return nil
} }
// CurrentSize retrieves the curernt size in bytes of the object at the given if offset > 0 {
// path. resp, err := d.Bucket.Head(d.s3Path(path), nil)
func (d *Driver) CurrentSize(path string) (uint64, error) { if err != nil {
_, parts, err := d.getAllParts(path) if s3Err, ok := err.(*s3.Error); !ok || s3Err.Code != "NoSuchKey" {
return 0, err
}
}
currentLength := int64(0)
if err == nil {
currentLength = resp.ContentLength
}
if currentLength >= offset {
if offset < chunkSize {
// chunkSize > currentLength >= offset
if err = fromSmallCurrent(offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < chunkSize {
return totalRead, nil
}
} else {
// currentLength >= offset >= chunkSize
_, part, err = multi.PutPartCopy(partNumber,
s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
d.Bucket.Name+"/"+d.s3Path(path))
if err != nil { if err != nil {
return 0, err return 0, err
} }
if len(parts) == 0 { parts = append(parts, part)
return 0, nil partNumber++
}
} else {
// Fills between parameters with 0s but only when to - from <= chunkSize
fromZeroFillSmall := func(from, to int64) error {
bytesRead = 0
for from+int64(bytesRead) < to {
nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
bytesRead += nn
if err != nil {
return err
}
} }
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil return nil
}
// Fills between parameters with 0s, making new parts
fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0)
for to-(from+bytesRead64) >= chunkSize {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
if err != nil {
return err
}
bytesRead64 += chunkSize
parts = append(parts, part)
partNumber++
}
return fromZeroFillSmall(0, (to-from)%chunkSize)
}
// currentLength < offset
if currentLength < chunkSize {
if offset < chunkSize {
// chunkSize > offset > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset); err != nil {
return totalRead, err
}
if totalRead+offset < chunkSize {
return totalRead, nil
}
} else {
// offset >= chunkSize > currentLength
if err = fromSmallCurrent(currentLength); err != nil {
return totalRead, err
}
if err = fromZeroFillSmall(currentLength, chunkSize); err != nil {
return totalRead, err
}
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
if err != nil {
return totalRead, err
}
parts = append(parts, part)
partNumber++
//Zero fill from chunkSize up to offset, then some reader
if err = fromZeroFillLarge(chunkSize, offset); err != nil {
return totalRead, err
}
if err = fromReader(offset % chunkSize); err != nil {
return totalRead, err
}
if totalRead+(offset%chunkSize) < chunkSize {
return totalRead, nil
}
}
} else {
// offset > currentLength >= chunkSize
_, part, err = multi.PutPartCopy(partNumber,
s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(currentLength-1, 10)},
d.Bucket.Name+"/"+d.s3Path(path))
if err != nil {
return 0, err
}
parts = append(parts, part)
partNumber++
//Zero fill from currentLength up to offset, then some reader
if err = fromZeroFillLarge(currentLength, offset); err != nil {
return totalRead, err
}
if err = fromReader((offset - currentLength) % chunkSize); err != nil {
return totalRead, err
}
if totalRead+((offset-currentLength)%chunkSize) < chunkSize {
return totalRead, nil
}
}
}
}
for {
if err = fromReader(0); err != nil {
return totalRead, err
}
if int64(bytesRead) < chunkSize {
break
}
}
return totalRead, nil
} }
// List returns a list of the objects that are direct descendants of the given // Stat retrieves the FileInfo for the given path, including the current size
// path. // in bytes and the creation time.
func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) {
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
}
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", 1)
if err != nil {
return nil, err
}
fi := storagedriver.FileInfoFields{
Path: path,
}
if len(listResponse.Contents) == 1 {
if listResponse.Contents[0].Key != d.s3Path(path) {
fi.IsDir = true
} else {
fi.IsDir = false
fi.Size = listResponse.Contents[0].Size
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
if err != nil {
return nil, err
}
fi.ModTime = timestamp
}
} else if len(listResponse.CommonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the given path.
func (d *Driver) List(path string) ([]string, error) { func (d *Driver) List(path string) ([]string, error) {
if path[len(path)-1] != '/' { if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return nil, storagedriver.InvalidPathError{Path: path}
}
if path != "/" && path[len(path)-1] != '/' {
path = path + "/" path = path + "/"
} }
listResponse, err := d.Bucket.List(path, "/", "", listPartsMax) listResponse, err := d.Bucket.List(d.s3Path(path), "/", "", listMax)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -219,15 +510,15 @@ func (d *Driver) List(path string) ([]string, error) {
for { for {
for _, key := range listResponse.Contents { for _, key := range listResponse.Contents {
files = append(files, key.Key) files = append(files, strings.Replace(key.Key, d.s3Path(""), "", 1))
} }
for _, commonPrefix := range listResponse.CommonPrefixes { for _, commonPrefix := range listResponse.CommonPrefixes {
directories = append(directories, commonPrefix[0:len(commonPrefix)-1]) directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), "", 1))
} }
if listResponse.IsTruncated { if listResponse.IsTruncated {
listResponse, err = d.Bucket.List(path, "/", listResponse.NextMarker, listPartsMax) listResponse, err = d.Bucket.List(d.s3Path(path), "/", listResponse.NextMarker, listMax)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -242,12 +533,17 @@ func (d *Driver) List(path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original // Move moves an object stored at sourcePath to destPath, removing the original
// object. // object.
func (d *Driver) Move(sourcePath string, destPath string) error { func (d *Driver) Move(sourcePath string, destPath string) error {
if !storagedriver.PathRegexp.MatchString(sourcePath) {
return storagedriver.InvalidPathError{Path: sourcePath}
} else if !storagedriver.PathRegexp.MatchString(destPath) {
return storagedriver.InvalidPathError{Path: destPath}
}
/* This is terrible, but aws doesn't have an actual move. */ /* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(destPath, getPermissions(), _, err := d.Bucket.PutCopy(d.s3Path(destPath), getPermissions(),
s3.CopyOptions{Options: d.getOptions(), MetadataDirective: "", ContentType: d.getContentType()}, s3.CopyOptions{Options: d.getOptions(), ContentType: d.getContentType()}, d.Bucket.Name+"/"+d.s3Path(sourcePath))
d.Bucket.Name+"/"+sourcePath)
if err != nil { if err != nil {
return storagedriver.PathNotFoundError{Path: sourcePath} return parseError(sourcePath, err)
} }
return d.Delete(sourcePath) return d.Delete(sourcePath)
@ -255,12 +551,16 @@ func (d *Driver) Move(sourcePath string, destPath string) error {
// Delete recursively deletes all objects stored at "path" and its subpaths. // Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *Driver) Delete(path string) error { func (d *Driver) Delete(path string) error {
listResponse, err := d.Bucket.List(path, "", "", listPartsMax) if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
}
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil || len(listResponse.Contents) == 0 { if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}
} }
s3Objects := make([]s3.Object, listPartsMax) s3Objects := make([]s3.Object, listMax)
for len(listResponse.Contents) > 0 { for len(listResponse.Contents) > 0 {
for index, key := range listResponse.Contents { for index, key := range listResponse.Contents {
@ -272,7 +572,7 @@ func (d *Driver) Delete(path string) error {
return nil return nil
} }
listResponse, err = d.Bucket.List(path, "", "", listPartsMax) listResponse, err = d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil { if err != nil {
return err return err
} }
@ -281,35 +581,16 @@ func (d *Driver) Delete(path string) error {
return nil return nil
} }
func (d *Driver) getHighestIDMulti(path string) (multi *s3.Multi, err error) { func (d *Driver) s3Path(path string) string {
multis, _, err := d.Bucket.ListMulti(path, "") return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/")
if err != nil && !hasCode(err, "NoSuchUpload") {
return nil, err
}
uploadID := ""
if len(multis) > 0 {
for _, m := range multis {
if m.Key == path && m.UploadId >= uploadID {
uploadID = m.UploadId
multi = m
}
}
return multi, nil
}
multi, err = d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions())
return multi, err
} }
func (d *Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) { func parseError(path string, err error) error {
multi, err := d.getHighestIDMulti(path) if s3Err, ok := err.(*s3.Error); ok && s3Err.Code == "NoSuchKey" {
if err != nil { return storagedriver.PathNotFoundError{Path: path}
return nil, nil, err
} }
parts, err := multi.ListParts() return err
return multi, parts, err
} }
func hasCode(err error, code string) bool { func hasCode(err error, code string) bool {

View file

@ -1,8 +1,7 @@
// +build ignore
package s3 package s3
import ( import (
"io/ioutil"
"os" "os"
"strconv" "strconv"
"testing" "testing"
@ -22,13 +21,18 @@ func init() {
secretKey := os.Getenv("AWS_SECRET_KEY") secretKey := os.Getenv("AWS_SECRET_KEY")
bucket := os.Getenv("S3_BUCKET") bucket := os.Getenv("S3_BUCKET")
encrypt := os.Getenv("S3_ENCRYPT") encrypt := os.Getenv("S3_ENCRYPT")
region := os.Getenv("AWS_REGION")
root, err := ioutil.TempDir("", "driver-")
if err != nil {
panic(err)
}
s3DriverConstructor := func(region aws.Region) (storagedriver.StorageDriver, error) { s3DriverConstructor := func(region aws.Region) (storagedriver.StorageDriver, error) {
shouldEncrypt, err := strconv.ParseBool(encrypt) shouldEncrypt, err := strconv.ParseBool(encrypt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return New(accessKey, secretKey, region, shouldEncrypt, bucket) return New(accessKey, secretKey, bucket, root, region, shouldEncrypt)
} }
// Skip S3 storage driver tests if environment variable parameters are not provided // Skip S3 storage driver tests if environment variable parameters are not provided
@ -39,18 +43,20 @@ func init() {
return "" return ""
} }
for _, region := range aws.Regions { // for _, region := range aws.Regions {
if region == aws.USGovWest { // if region == aws.USGovWest {
continue // continue
} // }
testsuites.RegisterInProcessSuite(s3DriverConstructor(region), skipCheck) testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) {
testsuites.RegisterIPCSuite(driverName, map[string]string{ return s3DriverConstructor(aws.GetRegion(region))
"accesskey": accessKey,
"secretkey": secretKey,
"region": region.Name,
"bucket": bucket,
"encrypt": encrypt,
}, skipCheck) }, skipCheck)
} // testsuites.RegisterIPCSuite(driverName, map[string]string{
// "accesskey": accessKey,
// "secretkey": secretKey,
// "region": region.Name,
// "bucket": bucket,
// "encrypt": encrypt,
// }, skipCheck)
// }
} }

View file

@ -49,8 +49,6 @@ type StorageDriver interface {
// WriteStream stores the contents of the provided io.ReadCloser at a // WriteStream stores the contents of the provided io.ReadCloser at a
// location designated by the given path. // location designated by the given path.
// The driver will know it has received the full contents when it has read
// "size" bytes.
// May be used to resume writing a stream by providing a nonzero offset. // May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path. // The offset must be no larger than the CurrentSize for this path.
WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error)

View file

@ -362,7 +362,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) {
filename := randomPath(32) filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename)) defer suite.StorageDriver.Delete(firstPart(filename))
chunkSize := int64(10 * 1024 * 1024) chunkSize := int64(5 * 1024 * 1024)
contentsChunk1 := randomContents(chunkSize) contentsChunk1 := randomContents(chunkSize)
contentsChunk2 := randomContents(chunkSize) contentsChunk2 := randomContents(chunkSize)
@ -687,9 +687,11 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
c.Assert(fi.Size(), check.Equals, int64(0)) c.Assert(fi.Size(), check.Equals, int64(0))
c.Assert(fi.IsDir(), check.Equals, true) c.Assert(fi.IsDir(), check.Equals, true)
if start.After(fi.ModTime()) { // Directories do not need to support ModTime, since key-value stores
c.Errorf("modtime %s before file created (%v)", fi.ModTime(), start) // cannot support it efficiently.
} // if start.After(fi.ModTime()) {
// c.Errorf("modtime %s before file created (%v)", fi.ModTime(), start)
// }
if fi.ModTime().After(expectedModTime) { if fi.ModTime().After(expectedModTime) {
c.Errorf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime) c.Errorf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime)
@ -763,6 +765,54 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
wg.Wait() wg.Wait()
} }
// TestEventualConsistency checks that if stat says that a file is a certain size, then
// you can freely read from the file (this is the only guarantee that the driver needs to provide)
func (suite *DriverSuite) TestEventualConsistency(c *check.C) {
if testing.Short() {
c.Skip("Skipping test in short mode")
}
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
var offset int64
var misswrites int
var chunkSize int64 = 32
for i := 0; i < 1024; i++ {
contents := randomContents(chunkSize)
read, err := suite.StorageDriver.WriteStream(filename, offset, bytes.NewReader(contents))
c.Assert(err, check.IsNil)
fi, err := suite.StorageDriver.Stat(filename)
c.Assert(err, check.IsNil)
// We are most concerned with being able to read data as soon as Stat declares
// it is uploaded. This is the strongest guarantee that some drivers (that guarantee
// at best eventual consistency) absolutely need to provide.
if fi.Size() == offset+chunkSize {
reader, err := suite.StorageDriver.ReadStream(filename, offset)
c.Assert(err, check.IsNil)
readContents, err := ioutil.ReadAll(reader)
c.Assert(err, check.IsNil)
c.Assert(readContents, check.DeepEquals, contents)
reader.Close()
offset += read
} else {
misswrites++
}
}
if misswrites > 0 {
c.Log("There were " + string(misswrites) + " occurences of a write not being instantly available.")
}
c.Assert(misswrites, check.Not(check.Equals), 1024)
}
// BenchmarkPutGetEmptyFiles benchmarks PutContent/GetContent for 0B files // BenchmarkPutGetEmptyFiles benchmarks PutContent/GetContent for 0B files
func (suite *DriverSuite) BenchmarkPutGetEmptyFiles(c *check.C) { func (suite *DriverSuite) BenchmarkPutGetEmptyFiles(c *check.C) {
suite.benchmarkPutGetFiles(c, 0) suite.benchmarkPutGetFiles(c, 0)