oss data copy optimize after upload finished

Signed-off-by: yaoyao.xyy <yaoyao.xyy@alibaba-inc.com>
This commit is contained in:
yaoyao.xyy 2016-10-26 09:37:58 +08:00
parent 8234784a1a
commit f394e82d2b
8 changed files with 144 additions and 43 deletions

View file

@ -32,6 +32,7 @@ const DefaultContentType = "application/octet-stream"
type Client struct {
AccessKeyId string
AccessKeySecret string
SecurityToken string
Region Region
Internal bool
Secure bool
@ -87,6 +88,18 @@ var attempts = util.AttemptStrategy{
// NewOSSClient creates a new OSS.
func NewOSSClientForAssumeRole(region Region, internal bool, accessKeyId string, accessKeySecret string, securityToken string, secure bool) *Client {
return &Client{
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
SecurityToken: securityToken,
Region: region,
Internal: internal,
debug: false,
Secure: secure,
}
}
func NewOSSClient(region Region, internal bool, accessKeyId string, accessKeySecret string, secure bool) *Client {
return &Client{
AccessKeyId: accessKeyId,
@ -113,8 +126,12 @@ func (client *Client) Bucket(name string) *Bucket {
}
type BucketInfo struct {
Name string
CreationDate string
Name string
CreationDate string
ExtranetEndpoint string
IntranetEndpoint string
Location string
Grant string `xml:"AccessControlList>Grant"`
}
type GetServiceResp struct {
@ -122,6 +139,10 @@ type GetServiceResp struct {
Buckets []BucketInfo `xml:">Bucket"`
}
type GetBucketInfoResp struct {
Bucket BucketInfo
}
// GetService gets a list of all buckets owned by an account.
func (client *Client) GetService() (*GetServiceResp, error) {
bucket := client.Bucket("")
@ -168,6 +189,27 @@ func (client *Client) SetEndpoint(endpoint string) {
client.endpoint = endpoint
}
// Info query basic information about the bucket
//
// You can read doc at https://help.aliyun.com/document_detail/31968.html
func (b *Bucket) Info() (BucketInfo, error) {
params := make(url.Values)
params.Set("bucketInfo", "")
r, err := b.GetWithParams("/", params)
if err != nil {
return BucketInfo{}, err
}
// Parse the XML response.
var resp GetBucketInfoResp
if err = xml.Unmarshal(r, &resp); err != nil {
return BucketInfo{}, err
}
return resp.Bucket, nil
}
// PutBucket creates a new bucket.
//
// You can read doc at http://docs.aliyun.com/#/pub/oss/api-reference/bucket&PutBucket
@ -844,10 +886,10 @@ func (b *Bucket) UploadSignedURL(name, method, contentType string, expires time.
mac := hmac.New(sha1.New, []byte(secretKey))
mac.Write([]byte(stringToSign))
macsum := mac.Sum(nil)
signature := base64.StdEncoding.EncodeToString([]byte(macsum))
signature := base64.StdEncoding.EncodeToString(macsum)
signature = strings.TrimSpace(signature)
signedurl, err := url.Parse("https://" + b.Name + ".client.amazonaws.com/")
signedurl, err := url.Parse(b.Region.GetEndpoint(b.Internal, b.Name, b.Secure))
if err != nil {
log.Println("ERROR sining url for OSS upload", err)
return ""
@ -982,6 +1024,10 @@ func partiallyEscapedPath(path string) string {
func (client *Client) prepare(req *request) error {
// Copy so they can be mutated without affecting on retries.
headers := copyHeader(req.headers)
if len(client.SecurityToken) != 0 {
headers.Set("x-oss-security-token", client.SecurityToken)
}
params := make(url.Values)
for k, v := range req.params {
@ -1251,9 +1297,6 @@ func (b *Bucket) ACL() (result *AccessControlPolicy, err error) {
return &resp, nil
}
const minChunkSize = 5 << 20
const defaultChunkSize = 2 * minChunkSize
func (b *Bucket) GetContentLength(sourcePath string) (int64, error) {
resp, err := b.Head(sourcePath, nil)
if err != nil {
@ -1265,8 +1308,19 @@ func (b *Bucket) GetContentLength(sourcePath string) (int64, error) {
return currentLength, err
}
// Copy large file in the same bucket
func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType string, perm ACL, options Options) error {
return b.CopyLargeFileInParallel(sourcePath, destPath, contentType, perm, options, 1)
}
const defaultChunkSize = int64(128 * 1024 * 1024) //128MB
const maxCopytSize = int64(1024 * 1024 * 1024) //1G
// Copy large file in the same bucket
func (b *Bucket) CopyLargeFileInParallel(sourcePath string, destPath string, contentType string, perm ACL, options Options, maxConcurrency int) error {
if maxConcurrency < 1 {
maxConcurrency = 1
}
log.Printf("Copy large file from %s to %s\n", sourcePath, destPath)
@ -1276,25 +1330,23 @@ func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType s
return err
}
if currentLength < maxCopytSize {
_, err := b.PutCopy(destPath, perm,
CopyOptions{},
b.Path(sourcePath))
return err
}
multi, err := b.InitMulti(destPath, contentType, perm, options)
if err != nil {
return err
}
parts := []Part{}
numParts := (currentLength + defaultChunkSize - 1) / defaultChunkSize
completedParts := make([]Part, numParts)
defer func() {
if len(parts) > 0 {
if multi == nil {
// Parts should be empty if the multi is not initialized
panic("Unreachable")
} else {
if multi.Complete(parts) != nil {
multi.Abort()
}
}
}
}()
errChan := make(chan error, numParts)
limiter := make(chan struct{}, maxConcurrency)
var start int64 = 0
var to int64 = 0
@ -1309,15 +1361,33 @@ func (b *Bucket) CopyLargeFile(sourcePath string, destPath string, contentType s
partNumber++
rangeStr := fmt.Sprintf("bytes=%d-%d", start, to-1)
limiter <- struct{}{}
go func(partNumber int, rangeStr string) {
_, part, err := multi.PutPartCopyWithContentLength(partNumber,
CopyOptions{CopySourceOptions: rangeStr},
sourcePathForCopy, currentLength)
if err == nil {
completedParts[partNumber-1] = part
} else {
log.Printf("Unable in PutPartCopy of part %d for %s: %v\n", partNumber, sourcePathForCopy, err)
}
errChan <- err
<-limiter
}(partNumber, rangeStr)
}
_, part, err := multi.PutPartCopy(partNumber,
CopyOptions{CopySourceOptions: rangeStr},
sourcePathForCopy)
fullyCompleted := true
for range completedParts {
err := <-errChan
if err != nil {
return err
fullyCompleted = false
}
parts = append(parts, part)
}
if fullyCompleted {
err = multi.Complete(completedParts)
} else {
err = multi.Abort()
}
return err