From f394e82d2ba4a623a92fe96747a783f5a8b9e4e4 Mon Sep 17 00:00:00 2001 From: "yaoyao.xyy" Date: Wed, 26 Oct 2016 09:37:58 +0800 Subject: [PATCH] oss data copy optimize after upload finished Signed-off-by: yaoyao.xyy --- Godeps/Godeps.json | 6 +- .../denverdino/aliyungo/common/regions.go | 3 +- .../denverdino/aliyungo/common/types.go | 7 + .../denverdino/aliyungo/oss/client.go | 124 ++++++++++++++---- .../denverdino/aliyungo/oss/multi.go | 29 ++-- .../denverdino/aliyungo/oss/regions.go | 14 ++ .../denverdino/aliyungo/oss/signature.go | 2 + .../denverdino/aliyungo/util/encoding.go | 2 +- 8 files changed, 144 insertions(+), 43 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 92252c69..d76c7aa6 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -167,15 +167,15 @@ }, { "ImportPath": "github.com/denverdino/aliyungo/common", - "Rev": "6ffb587da9da6d029d0ce517b85fecc82172d502" + "Rev": "ce70ed03a598bb3ba258ff9c90a83a257959067c" }, { "ImportPath": "github.com/denverdino/aliyungo/oss", - "Rev": "6ffb587da9da6d029d0ce517b85fecc82172d502" + "Rev": "ce70ed03a598bb3ba258ff9c90a83a257959067c" }, { "ImportPath": "github.com/denverdino/aliyungo/util", - "Rev": "6ffb587da9da6d029d0ce517b85fecc82172d502" + "Rev": "ce70ed03a598bb3ba258ff9c90a83a257959067c" }, { "ImportPath": "github.com/docker/goamz/aws", diff --git a/vendor/github.com/denverdino/aliyungo/common/regions.go b/vendor/github.com/denverdino/aliyungo/common/regions.go index db36f49b..9ceda465 100644 --- a/vendor/github.com/denverdino/aliyungo/common/regions.go +++ b/vendor/github.com/denverdino/aliyungo/common/regions.go @@ -11,8 +11,9 @@ const ( Hongkong = Region("cn-hongkong") Shenzhen = Region("cn-shenzhen") USWest1 = Region("us-west-1") + USEast1 = Region("us-east-1") APSouthEast1 = Region("ap-southeast-1") Shanghai = Region("cn-shanghai") ) -var ValidRegions = []Region{Hangzhou, Qingdao, Beijing, Shenzhen, Hongkong, Shanghai, USWest1, APSouthEast1} +var ValidRegions = []Region{Hangzhou, Qingdao, Beijing, Shenzhen, Hongkong, Shanghai, USWest1, USEast1, APSouthEast1} diff --git a/vendor/github.com/denverdino/aliyungo/common/types.go b/vendor/github.com/denverdino/aliyungo/common/types.go index c9f1dc8c..c562aedf 100644 --- a/vendor/github.com/denverdino/aliyungo/common/types.go +++ b/vendor/github.com/denverdino/aliyungo/common/types.go @@ -6,3 +6,10 @@ const ( PayByBandwidth = InternetChargeType("PayByBandwidth") PayByTraffic = InternetChargeType("PayByTraffic") ) + +type InstanceChargeType string + +const ( + PrePaid = InstanceChargeType("PrePaid") + PostPaid = InstanceChargeType("PostPaid") +) diff --git a/vendor/github.com/denverdino/aliyungo/oss/client.go b/vendor/github.com/denverdino/aliyungo/oss/client.go index ac3e8f02..bc1bd6b0 100644 --- a/vendor/github.com/denverdino/aliyungo/oss/client.go +++ b/vendor/github.com/denverdino/aliyungo/oss/client.go @@ -32,6 +32,7 @@ const DefaultContentType = "application/octet-stream" type Client struct { AccessKeyId string AccessKeySecret string + SecurityToken string Region Region Internal bool Secure bool @@ -87,6 +88,18 @@ var attempts = util.AttemptStrategy{ // NewOSSClient creates a new OSS. +func NewOSSClientForAssumeRole(region Region, internal bool, accessKeyId string, accessKeySecret string, securityToken string, secure bool) *Client { + return &Client{ + AccessKeyId: accessKeyId, + AccessKeySecret: accessKeySecret, + SecurityToken: securityToken, + Region: region, + Internal: internal, + debug: false, + Secure: secure, + } +} + func NewOSSClient(region Region, internal bool, accessKeyId string, accessKeySecret string, secure bool) *Client { return &Client{ AccessKeyId: accessKeyId, @@ -113,8 +126,12 @@ func (client *Client) Bucket(name string) *Bucket { } type BucketInfo struct { - Name string - CreationDate string + Name string + CreationDate string + ExtranetEndpoint string + IntranetEndpoint string + Location string + Grant string `xml:"AccessControlList>Grant"` } type GetServiceResp struct { @@ -122,6 +139,10 @@ type GetServiceResp struct { Buckets []BucketInfo `xml:">Bucket"` } +type GetBucketInfoResp struct { + Bucket BucketInfo +} + // GetService gets a list of all buckets owned by an account. func (client *Client) GetService() (*GetServiceResp, error) { bucket := client.Bucket("") @@ -168,6 +189,27 @@ func (client *Client) SetEndpoint(endpoint string) { client.endpoint = endpoint } +// Info query basic information about the bucket +// +// You can read doc at https://help.aliyun.com/document_detail/31968.html +func (b *Bucket) Info() (BucketInfo, error) { + params := make(url.Values) + params.Set("bucketInfo", "") + r, err := b.GetWithParams("/", params) + + if err != nil { + return BucketInfo{}, err + } + + // Parse the XML response. + var resp GetBucketInfoResp + if err = xml.Unmarshal(r, &resp); err != nil { + return BucketInfo{}, err + } + + return resp.Bucket, nil +} + // PutBucket creates a new bucket. // // You can read doc at http://docs.aliyun.com/#/pub/oss/api-reference/bucket&PutBucket @@ -844,10 +886,10 @@ func (b *Bucket) UploadSignedURL(name, method, contentType string, expires time. mac := hmac.New(sha1.New, []byte(secretKey)) mac.Write([]byte(stringToSign)) macsum := mac.Sum(nil) - signature := base64.StdEncoding.EncodeToString([]byte(macsum)) + signature := base64.StdEncoding.EncodeToString(macsum) signature = strings.TrimSpace(signature) - signedurl, err := url.Parse("https://" + b.Name + ".client.amazonaws.com/") + signedurl, err := url.Parse(b.Region.GetEndpoint(b.Internal, b.Name, b.Secure)) if err != nil { log.Println("ERROR sining url for OSS upload", err) return "" @@ -982,6 +1024,10 @@ func partiallyEscapedPath(path string) string { func (client *Client) prepare(req *request) error { // Copy so they can be mutated without affecting on retries. headers := copyHeader(req.headers) + if len(client.SecurityToken) != 0 { + headers.Set("x-oss-security-token", client.SecurityToken) + } + params := make(url.Values) for k, v := range req.params { @@ -1251,9 +1297,6 @@ func (b *Bucket) ACL() (result *AccessControlPolicy, err error) { return &resp, nil } -const minChunkSize = 5 << 20 -const defaultChunkSize = 2 * minChunkSize - func (b *Bucket) GetContentLength(sourcePath string) (int64, error) { resp, err := b.Head(sourcePath, nil) if err != nil { @@ -1265,8 +1308,19 @@ func (b *Bucket) GetContentLength(sourcePath string) (int64, error) { return currentLength, err } -// Copy large file in the same bucket func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType string, perm ACL, options Options) error { + return b.CopyLargeFileInParallel(sourcePath, destPath, contentType, perm, options, 1) +} + +const defaultChunkSize = int64(128 * 1024 * 1024) //128MB +const maxCopytSize = int64(1024 * 1024 * 1024) //1G + +// Copy large file in the same bucket +func (b *Bucket) CopyLargeFileInParallel(sourcePath string, destPath string, contentType string, perm ACL, options Options, maxConcurrency int) error { + + if maxConcurrency < 1 { + maxConcurrency = 1 + } log.Printf("Copy large file from %s to %s\n", sourcePath, destPath) @@ -1276,25 +1330,23 @@ func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType s return err } + if currentLength < maxCopytSize { + _, err := b.PutCopy(destPath, perm, + CopyOptions{}, + b.Path(sourcePath)) + return err + } + multi, err := b.InitMulti(destPath, contentType, perm, options) if err != nil { return err } - parts := []Part{} + numParts := (currentLength + defaultChunkSize - 1) / defaultChunkSize + completedParts := make([]Part, numParts) - defer func() { - if len(parts) > 0 { - if multi == nil { - // Parts should be empty if the multi is not initialized - panic("Unreachable") - } else { - if multi.Complete(parts) != nil { - multi.Abort() - } - } - } - }() + errChan := make(chan error, numParts) + limiter := make(chan struct{}, maxConcurrency) var start int64 = 0 var to int64 = 0 @@ -1309,15 +1361,33 @@ func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType s partNumber++ rangeStr := fmt.Sprintf("bytes=%d-%d", start, to-1) + limiter <- struct{}{} + go func(partNumber int, rangeStr string) { + _, part, err := multi.PutPartCopyWithContentLength(partNumber, + CopyOptions{CopySourceOptions: rangeStr}, + sourcePathForCopy, currentLength) + if err == nil { + completedParts[partNumber-1] = part + } else { + log.Printf("Unable in PutPartCopy of part %d for %s: %v\n", partNumber, sourcePathForCopy, err) + } + errChan <- err + <-limiter + }(partNumber, rangeStr) + } - _, part, err := multi.PutPartCopy(partNumber, - CopyOptions{CopySourceOptions: rangeStr}, - sourcePathForCopy) - + fullyCompleted := true + for range completedParts { + err := <-errChan if err != nil { - return err + fullyCompleted = false } - parts = append(parts, part) + } + + if fullyCompleted { + err = multi.Complete(completedParts) + } else { + err = multi.Abort() } return err diff --git a/vendor/github.com/denverdino/aliyungo/oss/multi.go b/vendor/github.com/denverdino/aliyungo/oss/multi.go index f17d75c3..d720e188 100644 --- a/vendor/github.com/denverdino/aliyungo/oss/multi.go +++ b/vendor/github.com/denverdino/aliyungo/oss/multi.go @@ -141,9 +141,13 @@ func (b *Bucket) InitMulti(key string, contType string, perm ACL, options Option return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil } +func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) { + return m.PutPartCopyWithContentLength(n, options, source, -1) +} + // // You can read doc at http://docs.aliyun.com/#/pub/oss/api-reference/multipart-upload&UploadPartCopy -func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) { +func (m *Multi) PutPartCopyWithContentLength(n int, options CopyOptions, source string, contentLength int64) (*CopyObjectResult, Part, error) { // TODO source format a /BUCKET/PATH/TO/OBJECT // TODO not a good design. API could be changed to PutPartCopyWithinBucket(..., path) and PutPartCopyFromBucket(bucket, path) @@ -155,14 +159,17 @@ func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObj params.Set("uploadId", m.UploadId) params.Set("partNumber", strconv.FormatInt(int64(n), 10)) - sourceBucket := m.Bucket.Client.Bucket(strings.TrimRight(strings.Split(source, "/")[1], "/")) - //log.Println("source: ", source) - //log.Println("sourceBucket: ", sourceBucket.Name) - //log.Println("HEAD: ", strings.strings.SplitAfterN(source, "/", 3)[2]) - // TODO SplitAfterN can be use in bucket name - sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 3)[2], nil) - if err != nil { - return nil, Part{}, err + if contentLength < 0 { + sourceBucket := m.Bucket.Client.Bucket(strings.TrimRight(strings.Split(source, "/")[1], "/")) + //log.Println("source: ", source) + //log.Println("sourceBucket: ", sourceBucket.Name) + //log.Println("HEAD: ", strings.strings.SplitAfterN(source, "/", 3)[2]) + // TODO SplitAfterN can be use in bucket name + sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 3)[2], nil) + if err != nil { + return nil, Part{}, err + } + contentLength = sourceMeta.ContentLength } for attempt := attempts.Start(); attempt.Next(); { @@ -174,7 +181,7 @@ func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObj params: params, } resp := &CopyObjectResult{} - err = m.Bucket.Client.query(req, resp) + err := m.Bucket.Client.query(req, resp) if shouldRetry(err) && attempt.HasNext() { continue } @@ -184,7 +191,7 @@ func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObj if resp.ETag == "" { return nil, Part{}, errors.New("part upload succeeded with no ETag") } - return resp, Part{n, resp.ETag, sourceMeta.ContentLength}, nil + return resp, Part{n, resp.ETag, contentLength}, nil } panic("unreachable") } diff --git a/vendor/github.com/denverdino/aliyungo/oss/regions.go b/vendor/github.com/denverdino/aliyungo/oss/regions.go index e2daf3da..0f250604 100644 --- a/vendor/github.com/denverdino/aliyungo/oss/regions.go +++ b/vendor/github.com/denverdino/aliyungo/oss/regions.go @@ -15,6 +15,7 @@ const ( Hongkong = Region("oss-cn-hongkong") Shenzhen = Region("oss-cn-shenzhen") USWest1 = Region("oss-us-west-1") + USEast1 = Region("oss-us-east-1") APSouthEast1 = Region("oss-ap-southeast-1") Shanghai = Region("oss-cn-shanghai") @@ -54,3 +55,16 @@ func (r Region) GetInternalEndpoint(bucket string, secure bool) string { } return fmt.Sprintf("%s://%s.%s-internal.aliyuncs.com", protocol, bucket, string(r)) } + +// GetInternalEndpoint returns internal endpoint of region +func (r Region) GetVPCInternalEndpoint(bucket string, secure bool) string { + protocol := getProtocol(secure) + if bucket == "" { + return fmt.Sprintf("%s://vpc100-oss-cn-hangzhou.aliyuncs.com", protocol) + } + if r == USEast1 { + return r.GetInternalEndpoint(bucket, secure) + } else { + return fmt.Sprintf("%s://%s.vpc100-%s.aliyuncs.com", protocol, bucket, string(r)) + } +} diff --git a/vendor/github.com/denverdino/aliyungo/oss/signature.go b/vendor/github.com/denverdino/aliyungo/oss/signature.go index a261644a..12677175 100644 --- a/vendor/github.com/denverdino/aliyungo/oss/signature.go +++ b/vendor/github.com/denverdino/aliyungo/oss/signature.go @@ -32,6 +32,7 @@ var ossParamsToSign = map[string]bool{ "response-cache-control": true, "response-content-disposition": true, "response-content-encoding": true, + "bucketInfo": true, } func (client *Client) signRequest(request *request) { @@ -101,5 +102,6 @@ func canonicalizeHeader(headers http.Header) (newHeaders http.Header, result str for _, k := range canonicalizedHeaders { canonicalizedHeader += k + ":" + headers.Get(k) + "\n" } + return newHeaders, canonicalizedHeader } diff --git a/vendor/github.com/denverdino/aliyungo/util/encoding.go b/vendor/github.com/denverdino/aliyungo/util/encoding.go index 44139570..e545e069 100644 --- a/vendor/github.com/denverdino/aliyungo/util/encoding.go +++ b/vendor/github.com/denverdino/aliyungo/util/encoding.go @@ -37,8 +37,8 @@ func setQueryValues(i interface{}, values *url.Values, prefix string) { elem = elem.Elem() } elemType := elem.Type() - for i := 0; i < elem.NumField(); i++ { + fieldName := elemType.Field(i).Name anonymous := elemType.Field(i).Anonymous field := elem.Field(i)