Support large layer for OSS driver

Signed-off-by: Li Yi <denverdino@gmail.com>
This commit is contained in:
Li Yi 2015-10-25 11:01:15 +08:00 committed by weiyuan.yl
parent e6363e8a20
commit cfd2f03920

View file

@ -39,6 +39,7 @@ const driverName = "oss"
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
const defaultTimeout = 2 * time.Minute // 2 minute timeout per chunk
// listMax is the largest amount of objects you can request from OSS in a list call
const listMax = 1000
@ -195,13 +196,14 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
return New(params)
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// New constructs a new Driver with the given Aliyun credentials, region, encryption flag, and
// bucketName
func New(params DriverParameters) (*Driver, error) {
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
client.SetEndpoint(params.Endpoint)
bucket := client.Bucket(params.Bucket)
client.SetDebug(false)
// Validate that the given credentials have at least read permissions in the
// given bucket scope.
@ -403,35 +405,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
var err error
var part oss.Part
loop:
for retries := 0; retries < 5; retries++ {
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
if err == nil {
break // success!
}
// NOTE(stevvooe): This retry code tries to only retry under
// conditions where the OSS package does not. We may add oss
// error codes to the below if we see others bubble up in the
// application. Right now, the most troubling is
// RequestTimeout, which seems to only triggered when a tcp
// connection to OSS slows to a crawl. If the RequestTimeout
// ends up getting added to the OSS library and we don't see
// other errors, this retry loop can be removed.
switch err := err.(type) {
case *oss.Error:
switch err.Code {
case "RequestTimeout":
// allow retries on only this error.
default:
break loop
}
}
backoff := 100 * time.Millisecond * time.Duration(retries+1)
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
time.Sleep(backoff)
}
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]), defaultTimeout)
if err != nil {
logrus.Errorf("error putting part, aborting: %v", err)
@ -456,7 +430,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
if offset > 0 {
resp, err := d.Bucket.Head(d.ossPath(path), nil)
if err != nil {
if ossErr, ok := err.(*oss.Error); !ok || ossErr.Code != "NoSuchKey" {
if ossErr, ok := err.(*oss.Error); !ok || ossErr.StatusCode != 404 {
return 0, err
}
}
@ -511,7 +485,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
fromZeroFillLarge := func(from, to int64) error {
bytesRead64 := int64(0)
for to-(from+bytesRead64) >= d.ChunkSize {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
part, err := multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(d.zeros), defaultTimeout)
if err != nil {
return err
}
@ -553,7 +527,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
return totalRead, err
}
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf), defaultTimeout)
if err != nil {
return totalRead, err
}
@ -706,15 +680,14 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
logrus.Infof("Move from %s to %s", d.Bucket.Path("/"+d.ossPath(sourcePath)), d.ossPath(destPath))
/* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(d.ossPath(destPath), getPermissions(),
oss.CopyOptions{
//Options: d.getOptions(),
//ContentType: d.getContentType()
},
d.Bucket.Path(d.ossPath(sourcePath)))
logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath))
err := d.Bucket.CopyLargeFile(d.ossPath(sourcePath), d.ossPath(destPath),
d.getContentType(),
getPermissions(),
oss.Options{})
if err != nil {
logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err)
return parseError(sourcePath, err)
}
@ -756,13 +729,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != "GET" && methodString != "HEAD") {
return "", storagedriver.ErrUnsupportedMethod{}
if !ok || (methodString != "GET" && methodString != "PUT") {
return "", storagedriver.ErrUnsupportedMethod{driverName}
}
}
expiresTime := time.Now().Add(20 * time.Minute)
logrus.Infof("expiresTime: %d", expiresTime)
expires, ok := options["expiry"]
if ok {
@ -771,7 +743,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
expiresTime = et
}
}
logrus.Infof("expiresTime: %d", expiresTime)
logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
logrus.Infof("testURL: %s", testURL)
return testURL, nil
@ -781,11 +753,6 @@ func (d *driver) ossPath(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
// S3BucketKey returns the OSS bucket key for the given storage driver path.
func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).ossPath(path)
}
func parseError(path string, err error) error {
if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}