From cfd2f039209d43036f46de0b6ef8d7462766d598 Mon Sep 17 00:00:00 2001 From: Li Yi Date: Sun, 25 Oct 2015 11:01:15 +0800 Subject: [PATCH] Support large layer for OSS driver Signed-off-by: Li Yi --- docs/storage/driver/oss/oss.go | 67 +++++++++------------------------- 1 file changed, 17 insertions(+), 50 deletions(-) diff --git a/docs/storage/driver/oss/oss.go b/docs/storage/driver/oss/oss.go index c6e4f8a3..4dfe5675 100644 --- a/docs/storage/driver/oss/oss.go +++ b/docs/storage/driver/oss/oss.go @@ -39,6 +39,7 @@ const driverName = "oss" const minChunkSize = 5 << 20 const defaultChunkSize = 2 * minChunkSize +const defaultTimeout = 2 * time.Minute // 2 minute timeout per chunk // listMax is the largest amount of objects you can request from OSS in a list call const listMax = 1000 @@ -195,13 +196,14 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) { return New(params) } -// New constructs a new Driver with the given AWS credentials, region, encryption flag, and +// New constructs a new Driver with the given Aliyun credentials, region, encryption flag, and // bucketName func New(params DriverParameters) (*Driver, error) { client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure) client.SetEndpoint(params.Endpoint) bucket := client.Bucket(params.Bucket) + client.SetDebug(false) // Validate that the given credentials have at least read permissions in the // given bucket scope. @@ -403,35 +405,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea var err error var part oss.Part - loop: - for retries := 0; retries < 5; retries++ { - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) - if err == nil { - break // success! - } - - // NOTE(stevvooe): This retry code tries to only retry under - // conditions where the OSS package does not. We may add oss - // error codes to the below if we see others bubble up in the - // application. Right now, the most troubling is - // RequestTimeout, which seems to only triggered when a tcp - // connection to OSS slows to a crawl. If the RequestTimeout - // ends up getting added to the OSS library and we don't see - // other errors, this retry loop can be removed. - switch err := err.(type) { - case *oss.Error: - switch err.Code { - case "RequestTimeout": - // allow retries on only this error. - default: - break loop - } - } - - backoff := 100 * time.Millisecond * time.Duration(retries+1) - logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String()) - time.Sleep(backoff) - } + part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]), defaultTimeout) if err != nil { logrus.Errorf("error putting part, aborting: %v", err) @@ -456,7 +430,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea if offset > 0 { resp, err := d.Bucket.Head(d.ossPath(path), nil) if err != nil { - if ossErr, ok := err.(*oss.Error); !ok || ossErr.Code != "NoSuchKey" { + if ossErr, ok := err.(*oss.Error); !ok || ossErr.StatusCode != 404 { return 0, err } } @@ -511,7 +485,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea fromZeroFillLarge := func(from, to int64) error { bytesRead64 := int64(0) for to-(from+bytesRead64) >= d.ChunkSize { - part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros)) + part, err := multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(d.zeros), defaultTimeout) if err != nil { return err } @@ -553,7 +527,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea return totalRead, err } - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) + part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf), defaultTimeout) if err != nil { return totalRead, err } @@ -706,15 +680,14 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { - logrus.Infof("Move from %s to %s", d.Bucket.Path("/"+d.ossPath(sourcePath)), d.ossPath(destPath)) - /* This is terrible, but aws doesn't have an actual move. */ - _, err := d.Bucket.PutCopy(d.ossPath(destPath), getPermissions(), - oss.CopyOptions{ - //Options: d.getOptions(), - //ContentType: d.getContentType() - }, - d.Bucket.Path(d.ossPath(sourcePath))) + logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath)) + + err := d.Bucket.CopyLargeFile(d.ossPath(sourcePath), d.ossPath(destPath), + d.getContentType(), + getPermissions(), + oss.Options{}) if err != nil { + logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err) return parseError(sourcePath, err) } @@ -756,13 +729,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int method, ok := options["method"] if ok { methodString, ok = method.(string) - if !ok || (methodString != "GET" && methodString != "HEAD") { - return "", storagedriver.ErrUnsupportedMethod{} + if !ok || (methodString != "GET" && methodString != "PUT") { + return "", storagedriver.ErrUnsupportedMethod{driverName} } } expiresTime := time.Now().Add(20 * time.Minute) - logrus.Infof("expiresTime: %d", expiresTime) expires, ok := options["expiry"] if ok { @@ -771,7 +743,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int expiresTime = et } } - logrus.Infof("expiresTime: %d", expiresTime) + logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime) testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil) logrus.Infof("testURL: %s", testURL) return testURL, nil @@ -781,11 +753,6 @@ func (d *driver) ossPath(path string) string { return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") } -// S3BucketKey returns the OSS bucket key for the given storage driver path. -func (d *Driver) S3BucketKey(path string) string { - return d.StorageDriver.(*driver).ossPath(path) -} - func parseError(path string, err error) error { if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "NoSuchKey" { return storagedriver.PathNotFoundError{Path: path}