Merge pull request #514 from denverdino/master
Storage Driver for Aliyun OSS
This commit is contained in:
commit
3a414deddb
26 changed files with 3982 additions and 1 deletions
8
Godeps/Godeps.json
generated
8
Godeps/Godeps.json
generated
|
@ -44,6 +44,14 @@
|
|||
"Comment": "1.2.0-66-g6086d79",
|
||||
"Rev": "6086d7927ec35315964d9fea46df6c04e6d697c1"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/denverdino/aliyungo/oss",
|
||||
"Rev": "0e0f322d0a54b994dea9d32541050d177edf6aa3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/denverdino/aliyungo/util",
|
||||
"Rev": "0e0f322d0a54b994dea9d32541050d177edf6aa3"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/docker/docker/pkg/tarsum",
|
||||
"Comment": "v1.4.1-3932-gb63ec6e",
|
||||
|
|
1265
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/client.go
generated
vendored
Normal file
1265
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/client.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load diff
211
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/client_test.go
generated
vendored
Normal file
211
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/client_test.go
generated
vendored
Normal file
|
@ -0,0 +1,211 @@
|
|||
package oss_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
//"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/denverdino/aliyungo/oss"
|
||||
)
|
||||
|
||||
var (
|
||||
//If you test on ECS, you can set the internal param to true
|
||||
client = oss.NewOSSClient(TestRegion, false, TestAccessKeyId, TestAccessKeySecret, false)
|
||||
)
|
||||
|
||||
func TestCreateBucket(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
err := b.PutBucket(oss.Private)
|
||||
if err != nil {
|
||||
t.Errorf("Failed for PutBucket: %v", err)
|
||||
}
|
||||
t.Log("Wait a while for bucket creation ...")
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
func TestHead(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
_, err := b.Head("name", nil)
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("Failed for Head: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPutObject(t *testing.T) {
|
||||
const DISPOSITION = "attachment; filename=\"0x1a2b3c.jpg\""
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
err := b.Put("name", []byte("content"), "content-type", oss.Private, oss.Options{ContentDisposition: DISPOSITION})
|
||||
if err != nil {
|
||||
t.Errorf("Failed for Put: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
data, err := b.Get("name")
|
||||
|
||||
if err != nil || string(data) != "content" {
|
||||
t.Errorf("Failed for Get: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestURL(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
url := b.URL("name")
|
||||
|
||||
t.Log("URL: ", url)
|
||||
// /c.Assert(req.URL.Path, check.Equals, "/denverdino_test/name")
|
||||
}
|
||||
|
||||
func TestGetReader(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
rc, err := b.GetReader("name")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for GetReader: %v", err)
|
||||
}
|
||||
data, err := ioutil.ReadAll(rc)
|
||||
rc.Close()
|
||||
if err != nil || string(data) != "content" {
|
||||
t.Errorf("Failed for ReadAll: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func aTestGetNotFound(t *testing.T) {
|
||||
|
||||
b := client.Bucket("non-existent-bucket")
|
||||
_, err := b.Get("non-existent")
|
||||
if err == nil {
|
||||
t.Fatalf("Failed for TestGetNotFound: %v", err)
|
||||
}
|
||||
ossErr, _ := err.(*oss.Error)
|
||||
if ossErr.StatusCode != 404 || ossErr.BucketName != "non-existent-bucket" {
|
||||
t.Errorf("Failed for TestGetNotFound: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPutCopy(t *testing.T) {
|
||||
b := client.Bucket(TestBucket)
|
||||
t.Log("Source: ", b.Path("name"))
|
||||
res, err := b.PutCopy("newname", oss.Private, oss.CopyOptions{},
|
||||
b.Path("name"))
|
||||
if err == nil {
|
||||
t.Logf("Copy result: %v", res)
|
||||
} else {
|
||||
t.Errorf("Failed for PutCopy: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
data, err := b.List("n", "", "", 0)
|
||||
if err != nil || len(data.Contents) != 2 {
|
||||
t.Errorf("Failed for List: %v", err)
|
||||
} else {
|
||||
t.Logf("Contents = %++v", data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListWithDelimiter(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
data, err := b.List("photos/2006/", "/", "some-marker", 1000)
|
||||
if err != nil || len(data.Contents) != 0 {
|
||||
t.Errorf("Failed for List: %v", err)
|
||||
} else {
|
||||
t.Logf("Contents = %++v", data)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPutReader(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
buf := bytes.NewBufferString("content")
|
||||
err := b.PutReader("name", buf, int64(buf.Len()), "content-type", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed for PutReader: %v", err)
|
||||
}
|
||||
TestGetReader(t)
|
||||
}
|
||||
|
||||
func TestExists(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
result, err := b.Exists("name")
|
||||
if err != nil || result != true {
|
||||
t.Errorf("Failed for Exists: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocation(t *testing.T) {
|
||||
b := client.Bucket(TestBucket)
|
||||
result, err := b.Location()
|
||||
|
||||
if err != nil || result != string(TestRegion) {
|
||||
t.Errorf("Failed for Location: %v %s", err, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACL(t *testing.T) {
|
||||
b := client.Bucket(TestBucket)
|
||||
result, err := b.ACL()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Failed for ACL: %v", err)
|
||||
} else {
|
||||
t.Logf("AccessControlPolicy: %++v", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelObject(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
err := b.Del("name")
|
||||
if err != nil {
|
||||
t.Errorf("Failed for Del: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelMultiObjects(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
objects := []oss.Object{oss.Object{Key: "newname"}}
|
||||
err := b.DelMulti(oss.Delete{
|
||||
Quiet: false,
|
||||
Objects: objects,
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Failed for DelMulti: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetService(t *testing.T) {
|
||||
bucketList, err := client.GetService()
|
||||
if err != nil {
|
||||
t.Errorf("Unable to get service: %v", err)
|
||||
} else {
|
||||
t.Logf("GetService: %++v", bucketList)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelBucket(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
err := b.DelBucket()
|
||||
if err != nil {
|
||||
t.Errorf("Failed for DelBucket: %v", err)
|
||||
}
|
||||
}
|
14
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/config_test.go
generated
vendored
Normal file
14
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/config_test.go
generated
vendored
Normal file
|
@ -0,0 +1,14 @@
|
|||
package oss_test
|
||||
|
||||
import (
|
||||
"github.com/denverdino/aliyungo/oss"
|
||||
)
|
||||
|
||||
//Modify with your Access Key Id and Access Key Secret
|
||||
const (
|
||||
TestAccessKeyId = "MY_ACCESS_KEY_ID"
|
||||
TestAccessKeySecret = "MY_ACCESS_KEY_ID"
|
||||
TestIAmRich = false
|
||||
TestRegion = oss.Beijing
|
||||
TestBucket = "denverdino"
|
||||
)
|
23
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/export.go
generated
vendored
Normal file
23
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/export.go
generated
vendored
Normal file
|
@ -0,0 +1,23 @@
|
|||
package oss
|
||||
|
||||
import (
|
||||
"github.com/denverdino/aliyungo/util"
|
||||
)
|
||||
|
||||
var originalStrategy = attempts
|
||||
|
||||
func SetAttemptStrategy(s *util.AttemptStrategy) {
|
||||
if s == nil {
|
||||
attempts = originalStrategy
|
||||
} else {
|
||||
attempts = *s
|
||||
}
|
||||
}
|
||||
|
||||
func SetListPartsMax(n int) {
|
||||
listPartsMax = n
|
||||
}
|
||||
|
||||
func SetListMultiMax(n int) {
|
||||
listMultiMax = n
|
||||
}
|
464
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/multi.go
generated
vendored
Normal file
464
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/multi.go
generated
vendored
Normal file
|
@ -0,0 +1,464 @@
|
|||
package oss
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"io"
|
||||
//"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Multi represents an unfinished multipart upload.
|
||||
//
|
||||
// Multipart uploads allow sending big objects in smaller chunks.
|
||||
// After all parts have been sent, the upload must be explicitly
|
||||
// completed by calling Complete with the list of parts.
|
||||
|
||||
type Multi struct {
|
||||
Bucket *Bucket
|
||||
Key string
|
||||
UploadId string
|
||||
}
|
||||
|
||||
// That's the default. Here just for testing.
|
||||
var listMultiMax = 1000
|
||||
|
||||
type listMultiResp struct {
|
||||
NextKeyMarker string
|
||||
NextUploadIdMarker string
|
||||
IsTruncated bool
|
||||
Upload []Multi
|
||||
CommonPrefixes []string `xml:"CommonPrefixes>Prefix"`
|
||||
}
|
||||
|
||||
// ListMulti returns the list of unfinished multipart uploads in b.
|
||||
//
|
||||
// The prefix parameter limits the response to keys that begin with the
|
||||
// specified prefix. You can use prefixes to separate a bucket into different
|
||||
// groupings of keys (to get the feeling of folders, for example).
|
||||
//
|
||||
// The delim parameter causes the response to group all of the keys that
|
||||
// share a common prefix up to the next delimiter in a single entry within
|
||||
// the CommonPrefixes field. You can use delimiters to separate a bucket
|
||||
// into different groupings of keys, similar to how folders would work.
|
||||
//
|
||||
func (b *Bucket) ListMulti(prefix, delim string) (multis []*Multi, prefixes []string, err error) {
|
||||
params := make(url.Values)
|
||||
params.Set("uploads", "")
|
||||
params.Set("max-uploads", strconv.FormatInt(int64(listMultiMax), 10))
|
||||
params.Set("prefix", prefix)
|
||||
params.Set("delimiter", delim)
|
||||
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
req := &request{
|
||||
method: "GET",
|
||||
bucket: b.Name,
|
||||
params: params,
|
||||
}
|
||||
var resp listMultiResp
|
||||
err := b.Client.query(req, &resp)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for i := range resp.Upload {
|
||||
multi := &resp.Upload[i]
|
||||
multi.Bucket = b
|
||||
multis = append(multis, multi)
|
||||
}
|
||||
prefixes = append(prefixes, resp.CommonPrefixes...)
|
||||
if !resp.IsTruncated {
|
||||
return multis, prefixes, nil
|
||||
}
|
||||
params.Set("key-marker", resp.NextKeyMarker)
|
||||
params.Set("upload-id-marker", resp.NextUploadIdMarker)
|
||||
attempt = attempts.Start() // Last request worked.
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// Multi returns a multipart upload handler for the provided key
|
||||
// inside b. If a multipart upload exists for key, it is returned,
|
||||
// otherwise a new multipart upload is initiated with contType and perm.
|
||||
func (b *Bucket) Multi(key, contType string, perm ACL, options Options) (*Multi, error) {
|
||||
multis, _, err := b.ListMulti(key, "")
|
||||
if err != nil && !hasCode(err, "NoSuchUpload") {
|
||||
return nil, err
|
||||
}
|
||||
for _, m := range multis {
|
||||
if m.Key == key {
|
||||
return m, nil
|
||||
}
|
||||
}
|
||||
return b.InitMulti(key, contType, perm, options)
|
||||
}
|
||||
|
||||
// InitMulti initializes a new multipart upload at the provided
|
||||
// key inside b and returns a value for manipulating it.
|
||||
//
|
||||
func (b *Bucket) InitMulti(key string, contType string, perm ACL, options Options) (*Multi, error) {
|
||||
headers := make(http.Header)
|
||||
headers.Set("Content-Length", "0")
|
||||
headers.Set("Content-Type", contType)
|
||||
headers.Set("x-oss-acl", string(perm))
|
||||
|
||||
options.addHeaders(headers)
|
||||
params := make(url.Values)
|
||||
params.Set("uploads", "")
|
||||
req := &request{
|
||||
method: "POST",
|
||||
bucket: b.Name,
|
||||
path: key,
|
||||
headers: headers,
|
||||
params: params,
|
||||
}
|
||||
var err error
|
||||
var resp struct {
|
||||
UploadId string `xml:"UploadId"`
|
||||
}
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
err = b.Client.query(req, &resp)
|
||||
if !shouldRetry(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Multi{Bucket: b, Key: key, UploadId: resp.UploadId}, nil
|
||||
}
|
||||
|
||||
func (m *Multi) PutPartCopy(n int, options CopyOptions, source string) (*CopyObjectResult, Part, error) {
|
||||
// TODO source format a /BUCKET/PATH/TO/OBJECT
|
||||
// TODO not a good design. API could be changed to PutPartCopyWithinBucket(..., path) and PutPartCopyFromBucket(bucket, path)
|
||||
|
||||
headers := make(http.Header)
|
||||
headers.Set("x-oss-copy-source", source)
|
||||
|
||||
options.addHeaders(headers)
|
||||
params := make(url.Values)
|
||||
params.Set("uploadId", m.UploadId)
|
||||
params.Set("partNumber", strconv.FormatInt(int64(n), 10))
|
||||
|
||||
sourceBucket := m.Bucket.Client.Bucket(strings.TrimRight(strings.Split(source, "/")[1], "/"))
|
||||
//log.Println("source: ", source)
|
||||
//log.Println("sourceBucket: ", sourceBucket.Name)
|
||||
//log.Println("HEAD: ", strings.strings.SplitAfterN(source, "/", 3)[2])
|
||||
// TODO SplitAfterN can be use in bucket name
|
||||
sourceMeta, err := sourceBucket.Head(strings.SplitAfterN(source, "/", 3)[2], nil)
|
||||
if err != nil {
|
||||
return nil, Part{}, err
|
||||
}
|
||||
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
req := &request{
|
||||
method: "PUT",
|
||||
bucket: m.Bucket.Name,
|
||||
path: m.Key,
|
||||
headers: headers,
|
||||
params: params,
|
||||
}
|
||||
resp := &CopyObjectResult{}
|
||||
err = m.Bucket.Client.query(req, resp)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, Part{}, err
|
||||
}
|
||||
if resp.ETag == "" {
|
||||
return nil, Part{}, errors.New("part upload succeeded with no ETag")
|
||||
}
|
||||
return resp, Part{n, resp.ETag, sourceMeta.ContentLength}, nil
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// PutPart sends part n of the multipart upload, reading all the content from r.
|
||||
// Each part, except for the last one, must be at least 5MB in size.
|
||||
//
|
||||
func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) {
|
||||
partSize, _, md5b64, err := seekerInfo(r)
|
||||
if err != nil {
|
||||
return Part{}, err
|
||||
}
|
||||
return m.putPart(n, r, partSize, md5b64)
|
||||
}
|
||||
|
||||
func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) {
|
||||
headers := make(http.Header)
|
||||
headers.Set("Content-Length", strconv.FormatInt(partSize, 10))
|
||||
headers.Set("Content-MD5", md5b64)
|
||||
|
||||
params := make(url.Values)
|
||||
params.Set("uploadId", m.UploadId)
|
||||
params.Set("partNumber", strconv.FormatInt(int64(n), 10))
|
||||
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
_, err := r.Seek(0, 0)
|
||||
if err != nil {
|
||||
return Part{}, err
|
||||
}
|
||||
req := &request{
|
||||
method: "PUT",
|
||||
bucket: m.Bucket.Name,
|
||||
path: m.Key,
|
||||
headers: headers,
|
||||
params: params,
|
||||
payload: r,
|
||||
}
|
||||
err = m.Bucket.Client.prepare(req)
|
||||
if err != nil {
|
||||
return Part{}, err
|
||||
}
|
||||
resp, err := m.Bucket.Client.run(req, nil)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return Part{}, err
|
||||
}
|
||||
etag := resp.Header.Get("ETag")
|
||||
if etag == "" {
|
||||
return Part{}, errors.New("part upload succeeded with no ETag")
|
||||
}
|
||||
return Part{n, etag, partSize}, nil
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) {
|
||||
_, err = r.Seek(0, 0)
|
||||
if err != nil {
|
||||
return 0, "", "", err
|
||||
}
|
||||
digest := md5.New()
|
||||
size, err = io.Copy(digest, r)
|
||||
if err != nil {
|
||||
return 0, "", "", err
|
||||
}
|
||||
sum := digest.Sum(nil)
|
||||
md5hex = hex.EncodeToString(sum)
|
||||
md5b64 = base64.StdEncoding.EncodeToString(sum)
|
||||
return size, md5hex, md5b64, nil
|
||||
}
|
||||
|
||||
type Part struct {
|
||||
N int `xml:"PartNumber"`
|
||||
ETag string
|
||||
Size int64
|
||||
}
|
||||
|
||||
type partSlice []Part
|
||||
|
||||
func (s partSlice) Len() int { return len(s) }
|
||||
func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N }
|
||||
func (s partSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
type listPartsResp struct {
|
||||
NextPartNumberMarker string
|
||||
IsTruncated bool
|
||||
Part []Part
|
||||
}
|
||||
|
||||
// That's the default. Here just for testing.
|
||||
var listPartsMax = 1000
|
||||
|
||||
// ListParts for backcompatability. See the documentation for ListPartsFull
|
||||
func (m *Multi) ListParts() ([]Part, error) {
|
||||
return m.ListPartsFull(0, listPartsMax)
|
||||
}
|
||||
|
||||
// ListPartsFull returns the list of previously uploaded parts in m,
|
||||
// ordered by part number (Only parts with higher part numbers than
|
||||
// partNumberMarker will be listed). Only up to maxParts parts will be
|
||||
// returned.
|
||||
//
|
||||
func (m *Multi) ListPartsFull(partNumberMarker int, maxParts int) ([]Part, error) {
|
||||
if maxParts > listPartsMax {
|
||||
maxParts = listPartsMax
|
||||
}
|
||||
|
||||
params := make(url.Values)
|
||||
params.Set("uploadId", m.UploadId)
|
||||
params.Set("max-parts", strconv.FormatInt(int64(maxParts), 10))
|
||||
params.Set("part-number-marker", strconv.FormatInt(int64(partNumberMarker), 10))
|
||||
|
||||
var parts partSlice
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
req := &request{
|
||||
method: "GET",
|
||||
bucket: m.Bucket.Name,
|
||||
path: m.Key,
|
||||
params: params,
|
||||
}
|
||||
var resp listPartsResp
|
||||
err := m.Bucket.Client.query(req, &resp)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parts = append(parts, resp.Part...)
|
||||
if !resp.IsTruncated {
|
||||
sort.Sort(parts)
|
||||
return parts, nil
|
||||
}
|
||||
params.Set("part-number-marker", resp.NextPartNumberMarker)
|
||||
attempt = attempts.Start() // Last request worked.
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
type ReaderAtSeeker interface {
|
||||
io.ReaderAt
|
||||
io.ReadSeeker
|
||||
}
|
||||
|
||||
// PutAll sends all of r via a multipart upload with parts no larger
|
||||
// than partSize bytes, which must be set to at least 5MB.
|
||||
// Parts previously uploaded are either reused if their checksum
|
||||
// and size match the new part, or otherwise overwritten with the
|
||||
// new content.
|
||||
// PutAll returns all the parts of m (reused or not).
|
||||
func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) {
|
||||
old, err := m.ListParts()
|
||||
if err != nil && !hasCode(err, "NoSuchUpload") {
|
||||
return nil, err
|
||||
}
|
||||
reuse := 0 // Index of next old part to consider reusing.
|
||||
current := 1 // Part number of latest good part handled.
|
||||
totalSize, err := r.Seek(0, 2)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
first := true // Must send at least one empty part if the file is empty.
|
||||
var result []Part
|
||||
NextSection:
|
||||
for offset := int64(0); offset < totalSize || first; offset += partSize {
|
||||
first = false
|
||||
if offset+partSize > totalSize {
|
||||
partSize = totalSize - offset
|
||||
}
|
||||
section := io.NewSectionReader(r, offset, partSize)
|
||||
_, md5hex, md5b64, err := seekerInfo(section)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for reuse < len(old) && old[reuse].N <= current {
|
||||
// Looks like this part was already sent.
|
||||
part := &old[reuse]
|
||||
etag := `"` + md5hex + `"`
|
||||
if part.N == current && part.Size == partSize && part.ETag == etag {
|
||||
// Checksum matches. Reuse the old part.
|
||||
result = append(result, *part)
|
||||
current++
|
||||
continue NextSection
|
||||
}
|
||||
reuse++
|
||||
}
|
||||
|
||||
// Part wasn't found or doesn't match. Send it.
|
||||
part, err := m.putPart(current, section, partSize, md5b64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, part)
|
||||
current++
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type completeUpload struct {
|
||||
XMLName xml.Name `xml:"CompleteMultipartUpload"`
|
||||
Parts completeParts `xml:"Part"`
|
||||
}
|
||||
|
||||
type completePart struct {
|
||||
PartNumber int
|
||||
ETag string
|
||||
}
|
||||
|
||||
type completeParts []completePart
|
||||
|
||||
func (p completeParts) Len() int { return len(p) }
|
||||
func (p completeParts) Less(i, j int) bool { return p[i].PartNumber < p[j].PartNumber }
|
||||
func (p completeParts) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
// Complete assembles the given previously uploaded parts into the
|
||||
// final object. This operation may take several minutes.
|
||||
//
|
||||
func (m *Multi) Complete(parts []Part) error {
|
||||
params := make(url.Values)
|
||||
params.Set("uploadId", m.UploadId)
|
||||
|
||||
c := completeUpload{}
|
||||
for _, p := range parts {
|
||||
c.Parts = append(c.Parts, completePart{p.N, p.ETag})
|
||||
}
|
||||
sort.Sort(c.Parts)
|
||||
data, err := xml.Marshal(&c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
req := &request{
|
||||
method: "POST",
|
||||
bucket: m.Bucket.Name,
|
||||
path: m.Key,
|
||||
params: params,
|
||||
payload: bytes.NewReader(data),
|
||||
}
|
||||
err := m.Bucket.Client.query(req, nil)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// Abort deletes an unifinished multipart upload and any previously
|
||||
// uploaded parts for it.
|
||||
//
|
||||
// After a multipart upload is aborted, no additional parts can be
|
||||
// uploaded using it. However, if any part uploads are currently in
|
||||
// progress, those part uploads might or might not succeed. As a result,
|
||||
// it might be necessary to abort a given multipart upload multiple
|
||||
// times in order to completely free all storage consumed by all parts.
|
||||
//
|
||||
// NOTE: If the described scenario happens to you, please report back to
|
||||
// the goamz authors with details. In the future such retrying should be
|
||||
// handled internally, but it's not clear what happens precisely (Is an
|
||||
// error returned? Is the issue completely undetectable?).
|
||||
//
|
||||
func (m *Multi) Abort() error {
|
||||
params := make(url.Values)
|
||||
params.Set("uploadId", m.UploadId)
|
||||
|
||||
for attempt := attempts.Start(); attempt.Next(); {
|
||||
req := &request{
|
||||
method: "DELETE",
|
||||
bucket: m.Bucket.Name,
|
||||
path: m.Key,
|
||||
params: params,
|
||||
}
|
||||
err := m.Bucket.Client.query(req, nil)
|
||||
if shouldRetry(err) && attempt.HasNext() {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
161
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/multi_test.go
generated
vendored
Normal file
161
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/multi_test.go
generated
vendored
Normal file
|
@ -0,0 +1,161 @@
|
|||
package oss_test
|
||||
|
||||
import (
|
||||
//"encoding/xml"
|
||||
"github.com/denverdino/aliyungo/oss"
|
||||
"testing"
|
||||
//"io"
|
||||
//"io/ioutil"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func TestCreateBucketMulti(t *testing.T) {
|
||||
TestCreateBucket(t)
|
||||
}
|
||||
|
||||
func TestInitMulti(t *testing.T) {
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
metadata := make(map[string][]string)
|
||||
metadata["key1"] = []string{"value1"}
|
||||
metadata["key2"] = []string{"value2"}
|
||||
options := oss.Options{
|
||||
ServerSideEncryption: true,
|
||||
Meta: metadata,
|
||||
ContentEncoding: "text/utf8",
|
||||
CacheControl: "no-cache",
|
||||
ContentMD5: "0000000000000000",
|
||||
}
|
||||
|
||||
multi, err := b.InitMulti("multi", "text/plain", oss.Private, options)
|
||||
if err != nil {
|
||||
t.Errorf("Failed for InitMulti: %v", err)
|
||||
} else {
|
||||
t.Logf("InitMulti result: %++v", multi)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiReturnOld(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed for Multi: %v", err)
|
||||
} else {
|
||||
t.Logf("Multi result: %++v", multi)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPutPart(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for Multi: %v", err)
|
||||
}
|
||||
|
||||
part, err := multi.PutPart(1, strings.NewReader("<part 1>"))
|
||||
if err != nil {
|
||||
t.Errorf("Failed for PutPart: %v", err)
|
||||
} else {
|
||||
t.Logf("PutPart result: %++v", part)
|
||||
}
|
||||
|
||||
}
|
||||
func TestPutPartCopy(t *testing.T) {
|
||||
|
||||
TestPutObject(t)
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for Multi: %v", err)
|
||||
}
|
||||
|
||||
res, part, err := multi.PutPartCopy(2, oss.CopyOptions{}, b.Path("name"))
|
||||
if err != nil {
|
||||
t.Errorf("Failed for PutPartCopy: %v", err)
|
||||
} else {
|
||||
t.Logf("PutPartCopy result: %++v %++v", part, res)
|
||||
}
|
||||
TestDelObject(t)
|
||||
}
|
||||
|
||||
func TestListParts(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for Multi: %v", err)
|
||||
}
|
||||
|
||||
parts, err := multi.ListParts()
|
||||
if err != nil {
|
||||
t.Errorf("Failed for ListParts: %v", err)
|
||||
} else {
|
||||
t.Logf("ListParts result: %++v", parts)
|
||||
}
|
||||
}
|
||||
func TestListMulti(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multis, prefixes, err := b.ListMulti("", "/")
|
||||
if err != nil {
|
||||
t.Errorf("Failed for ListMulti: %v", err)
|
||||
} else {
|
||||
t.Logf("ListMulti result : %++v %++v", multis, prefixes)
|
||||
}
|
||||
}
|
||||
func TestMultiAbort(t *testing.T) {
|
||||
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for Multi: %v", err)
|
||||
}
|
||||
|
||||
err = multi.Abort()
|
||||
if err != nil {
|
||||
t.Errorf("Failed for Abort: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPutAll(t *testing.T) {
|
||||
TestInitMulti(t)
|
||||
// Don't retry the NoSuchUpload error.
|
||||
b := client.Bucket(TestBucket)
|
||||
|
||||
multi, err := b.Multi("multi", "text/plain", oss.Private, oss.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed for Multi: %v", err)
|
||||
}
|
||||
|
||||
// Must send at least one part, so that completing it will work.
|
||||
parts, err := multi.PutAll(strings.NewReader("part1part2last"), 5)
|
||||
if err != nil {
|
||||
t.Errorf("Failed for PutAll: %v", err)
|
||||
} else {
|
||||
t.Logf("PutAll result: %++v", parts)
|
||||
}
|
||||
// // Must send at least one part, so that completing it will work.
|
||||
// err = multi.Complete(parts)
|
||||
// if err != nil {
|
||||
// t.Errorf("Failed for Complete: %v", err)
|
||||
// }
|
||||
err = multi.Abort()
|
||||
if err != nil {
|
||||
t.Errorf("Failed for Abort: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanUp(t *testing.T) {
|
||||
TestDelBucket(t)
|
||||
}
|
53
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/regions.go
generated
vendored
Normal file
53
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/regions.go
generated
vendored
Normal file
|
@ -0,0 +1,53 @@
|
|||
package oss
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Region represents OSS region
|
||||
type Region string
|
||||
|
||||
// Constants of region definition
|
||||
const (
|
||||
Hangzhou = Region("oss-cn-hangzhou")
|
||||
Qingdao = Region("oss-cn-qingdao")
|
||||
Beijing = Region("oss-cn-beijing")
|
||||
Hongkong = Region("oss-cn-hongkong")
|
||||
Shenzhen = Region("oss-cn-shenzhen")
|
||||
USWest1 = Region("oss-us-west-1")
|
||||
DefaultRegion = Hangzhou
|
||||
)
|
||||
|
||||
// GetEndpoint returns endpoint of region
|
||||
func (r Region) GetEndpoint(internal bool, bucket string, secure bool) string {
|
||||
if internal {
|
||||
return r.GetInternalEndpoint(bucket, secure)
|
||||
}
|
||||
return r.GetInternetEndpoint(bucket, secure)
|
||||
}
|
||||
|
||||
func getProtocol(secure bool) string {
|
||||
protocol := "http"
|
||||
if secure {
|
||||
protocol = "https"
|
||||
}
|
||||
return protocol
|
||||
}
|
||||
|
||||
// GetInternetEndpoint returns internet endpoint of region
|
||||
func (r Region) GetInternetEndpoint(bucket string, secure bool) string {
|
||||
protocol := getProtocol(secure)
|
||||
if bucket == "" {
|
||||
return fmt.Sprintf("%s://oss.aliyuncs.com", protocol)
|
||||
}
|
||||
return fmt.Sprintf("%s://%s.%s.aliyuncs.com", protocol, bucket, string(r))
|
||||
}
|
||||
|
||||
// GetInternalEndpoint returns internal endpoint of region
|
||||
func (r Region) GetInternalEndpoint(bucket string, secure bool) string {
|
||||
protocol := getProtocol(secure)
|
||||
if bucket == "" {
|
||||
return fmt.Sprintf("%s://oss-internal.aliyuncs.com", protocol)
|
||||
}
|
||||
return fmt.Sprintf("%s://%s.%s-internal.aliyuncs.com", protocol, bucket, string(r))
|
||||
}
|
105
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/signature.go
generated
vendored
Normal file
105
Godeps/_workspace/src/github.com/denverdino/aliyungo/oss/signature.go
generated
vendored
Normal file
|
@ -0,0 +1,105 @@
|
|||
package oss
|
||||
|
||||
import (
|
||||
"github.com/denverdino/aliyungo/util"
|
||||
//"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const HeaderOSSPrefix = "x-oss-"
|
||||
|
||||
var ossParamsToSign = map[string]bool{
|
||||
"acl": true,
|
||||
"delete": true,
|
||||
"location": true,
|
||||
"logging": true,
|
||||
"notification": true,
|
||||
"partNumber": true,
|
||||
"policy": true,
|
||||
"requestPayment": true,
|
||||
"torrent": true,
|
||||
"uploadId": true,
|
||||
"uploads": true,
|
||||
"versionId": true,
|
||||
"versioning": true,
|
||||
"versions": true,
|
||||
"response-content-type": true,
|
||||
"response-content-language": true,
|
||||
"response-expires": true,
|
||||
"response-cache-control": true,
|
||||
"response-content-disposition": true,
|
||||
"response-content-encoding": true,
|
||||
}
|
||||
|
||||
func (client *Client) signRequest(request *request) {
|
||||
query := request.params
|
||||
|
||||
urlSignature := query.Get("OSSAccessKeyId") != ""
|
||||
|
||||
headers := request.headers
|
||||
contentMd5 := headers.Get("Content-Md5")
|
||||
contentType := headers.Get("Content-Type")
|
||||
date := ""
|
||||
if urlSignature {
|
||||
date = query.Get("Expires")
|
||||
} else {
|
||||
date = headers.Get("Date")
|
||||
}
|
||||
|
||||
resource := request.path
|
||||
if request.bucket != "" {
|
||||
resource = "/" + request.bucket + request.path
|
||||
}
|
||||
params := make(url.Values)
|
||||
for k, v := range query {
|
||||
if ossParamsToSign[k] {
|
||||
params[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if len(params) > 0 {
|
||||
resource = resource + "?" + util.Encode(params)
|
||||
}
|
||||
|
||||
canonicalizedResource := resource
|
||||
|
||||
_, canonicalizedHeader := canonicalizeHeader(headers)
|
||||
|
||||
stringToSign := request.method + "\n" + contentMd5 + "\n" + contentType + "\n" + date + "\n" + canonicalizedHeader + canonicalizedResource
|
||||
|
||||
//log.Println("stringToSign: ", stringToSign)
|
||||
signature := util.CreateSignature(stringToSign, client.AccessKeySecret)
|
||||
|
||||
if query.Get("OSSAccessKeyId") != "" {
|
||||
query.Set("Signature", signature)
|
||||
} else {
|
||||
headers.Set("Authorization", "OSS "+client.AccessKeyId+":"+signature)
|
||||
}
|
||||
}
|
||||
|
||||
//Have to break the abstraction to append keys with lower case.
|
||||
func canonicalizeHeader(headers http.Header) (newHeaders http.Header, result string) {
|
||||
var canonicalizedHeaders []string
|
||||
newHeaders = http.Header{}
|
||||
|
||||
for k, v := range headers {
|
||||
if lower := strings.ToLower(k); strings.HasPrefix(lower, HeaderOSSPrefix) {
|
||||
newHeaders[lower] = v
|
||||
canonicalizedHeaders = append(canonicalizedHeaders, lower)
|
||||
} else {
|
||||
newHeaders[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(canonicalizedHeaders)
|
||||
|
||||
var canonicalizedHeader string
|
||||
|
||||
for _, k := range canonicalizedHeaders {
|
||||
canonicalizedHeader += k + ":" + headers.Get(k) + "\n"
|
||||
}
|
||||
return newHeaders, canonicalizedHeader
|
||||
}
|
76
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/attempt.go
generated
vendored
Normal file
76
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/attempt.go
generated
vendored
Normal file
|
@ -0,0 +1,76 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// AttemptStrategy is reused from the goamz package
|
||||
|
||||
// AttemptStrategy represents a strategy for waiting for an action
|
||||
// to complete successfully. This is an internal type used by the
|
||||
// implementation of other packages.
|
||||
type AttemptStrategy struct {
|
||||
Total time.Duration // total duration of attempt.
|
||||
Delay time.Duration // interval between each try in the burst.
|
||||
Min int // minimum number of retries; overrides Total
|
||||
}
|
||||
|
||||
type Attempt struct {
|
||||
strategy AttemptStrategy
|
||||
last time.Time
|
||||
end time.Time
|
||||
force bool
|
||||
count int
|
||||
}
|
||||
|
||||
// Start begins a new sequence of attempts for the given strategy.
|
||||
func (s AttemptStrategy) Start() *Attempt {
|
||||
now := time.Now()
|
||||
return &Attempt{
|
||||
strategy: s,
|
||||
last: now,
|
||||
end: now.Add(s.Total),
|
||||
force: true,
|
||||
}
|
||||
}
|
||||
|
||||
// Next waits until it is time to perform the next attempt or returns
|
||||
// false if it is time to stop trying.
|
||||
func (a *Attempt) Next() bool {
|
||||
now := time.Now()
|
||||
sleep := a.nextSleep(now)
|
||||
if !a.force && !now.Add(sleep).Before(a.end) && a.strategy.Min <= a.count {
|
||||
return false
|
||||
}
|
||||
a.force = false
|
||||
if sleep > 0 && a.count > 0 {
|
||||
time.Sleep(sleep)
|
||||
now = time.Now()
|
||||
}
|
||||
a.count++
|
||||
a.last = now
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *Attempt) nextSleep(now time.Time) time.Duration {
|
||||
sleep := a.strategy.Delay - now.Sub(a.last)
|
||||
if sleep < 0 {
|
||||
return 0
|
||||
}
|
||||
return sleep
|
||||
}
|
||||
|
||||
// HasNext returns whether another attempt will be made if the current
|
||||
// one fails. If it returns true, the following call to Next is
|
||||
// guaranteed to return true.
|
||||
func (a *Attempt) HasNext() bool {
|
||||
if a.force || a.strategy.Min > a.count {
|
||||
return true
|
||||
}
|
||||
now := time.Now()
|
||||
if now.Add(a.nextSleep(now)).Before(a.end) {
|
||||
a.force = true
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
90
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/attempt_test.go
generated
vendored
Normal file
90
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/attempt_test.go
generated
vendored
Normal file
|
@ -0,0 +1,90 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAttemptTiming(t *testing.T) {
|
||||
testAttempt := AttemptStrategy{
|
||||
Total: 0.25e9,
|
||||
Delay: 0.1e9,
|
||||
}
|
||||
want := []time.Duration{0, 0.1e9, 0.2e9, 0.2e9}
|
||||
got := make([]time.Duration, 0, len(want)) // avoid allocation when testing timing
|
||||
t0 := time.Now()
|
||||
for a := testAttempt.Start(); a.Next(); {
|
||||
got = append(got, time.Now().Sub(t0))
|
||||
}
|
||||
got = append(got, time.Now().Sub(t0))
|
||||
if len(got) != len(want) {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
const margin = 0.01e9
|
||||
for i, got := range want {
|
||||
lo := want[i] - margin
|
||||
hi := want[i] + margin
|
||||
if got < lo || got > hi {
|
||||
t.Errorf("attempt %d want %g got %g", i, want[i].Seconds(), got.Seconds())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAttemptNextHasNext(t *testing.T) {
|
||||
a := AttemptStrategy{}.Start()
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
|
||||
a = AttemptStrategy{}.Start()
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.HasNext() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
a = AttemptStrategy{Total: 2e8}.Start()
|
||||
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if !a.HasNext() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
time.Sleep(2e8)
|
||||
|
||||
if !a.HasNext() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
|
||||
a = AttemptStrategy{Total: 1e8, Min: 2}.Start()
|
||||
time.Sleep(1e8)
|
||||
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if !a.HasNext() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if !a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.HasNext() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
if a.Next() {
|
||||
t.Fatalf("Failed!")
|
||||
}
|
||||
}
|
123
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/encoding.go
generated
vendored
Normal file
123
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/encoding.go
generated
vendored
Normal file
|
@ -0,0 +1,123 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
//ConvertToQueryValues converts the struct to url.Values
|
||||
func ConvertToQueryValues(ifc interface{}) url.Values {
|
||||
values := url.Values{}
|
||||
SetQueryValues(ifc, &values)
|
||||
return values
|
||||
}
|
||||
|
||||
//SetQueryValues sets the struct to existing url.Values following ECS encoding rules
|
||||
func SetQueryValues(ifc interface{}, values *url.Values) {
|
||||
setQueryValues(ifc, values, "")
|
||||
}
|
||||
|
||||
func setQueryValues(i interface{}, values *url.Values, prefix string) {
|
||||
elem := reflect.ValueOf(i)
|
||||
if elem.Kind() == reflect.Ptr {
|
||||
elem = elem.Elem()
|
||||
}
|
||||
elemType := elem.Type()
|
||||
for i := 0; i < elem.NumField(); i++ {
|
||||
fieldName := elemType.Field(i).Name
|
||||
field := elem.Field(i)
|
||||
// TODO Use Tag for validation
|
||||
// tag := typ.Field(i).Tag.Get("tagname")
|
||||
kind := field.Kind()
|
||||
if (kind == reflect.Ptr || kind == reflect.Array || kind == reflect.Slice || kind == reflect.Map || kind == reflect.Chan) && field.IsNil() {
|
||||
continue
|
||||
}
|
||||
if kind == reflect.Ptr {
|
||||
field = field.Elem()
|
||||
kind = field.Kind()
|
||||
}
|
||||
var value string
|
||||
//switch field.Interface().(type) {
|
||||
switch kind {
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
i := field.Int()
|
||||
if i != 0 {
|
||||
value = strconv.FormatInt(i, 10)
|
||||
}
|
||||
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||
i := field.Uint()
|
||||
if i != 0 {
|
||||
value = strconv.FormatUint(i, 10)
|
||||
}
|
||||
case reflect.Float32:
|
||||
value = strconv.FormatFloat(field.Float(), 'f', 4, 32)
|
||||
case reflect.Float64:
|
||||
value = strconv.FormatFloat(field.Float(), 'f', 4, 64)
|
||||
case reflect.Bool:
|
||||
value = strconv.FormatBool(field.Bool())
|
||||
case reflect.String:
|
||||
value = field.String()
|
||||
case reflect.Slice:
|
||||
switch field.Type().Elem().Kind() {
|
||||
case reflect.Uint8:
|
||||
value = string(field.Bytes())
|
||||
case reflect.String:
|
||||
l := field.Len()
|
||||
if l > 0 {
|
||||
strArray := make([]string, l)
|
||||
for i := 0; i < l; i++ {
|
||||
strArray[i] = field.Index(i).String()
|
||||
}
|
||||
bytes, err := json.Marshal(strArray)
|
||||
if err == nil {
|
||||
value = string(bytes)
|
||||
} else {
|
||||
log.Printf("Failed to convert JSON: %v", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
l := field.Len()
|
||||
for j := 0; j < l; j++ {
|
||||
prefixName := fmt.Sprintf("%s.%d.", fieldName, (j + 1))
|
||||
ifc := field.Index(j).Interface()
|
||||
log.Printf("%s : %v", prefixName, ifc)
|
||||
if ifc != nil {
|
||||
setQueryValues(ifc, values, prefixName)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
default:
|
||||
switch field.Interface().(type) {
|
||||
case ISO6801Time:
|
||||
t := field.Interface().(ISO6801Time)
|
||||
value = t.String()
|
||||
case time.Time:
|
||||
t := field.Interface().(time.Time)
|
||||
value = GetISO8601TimeStamp(t)
|
||||
default:
|
||||
ifc := field.Interface()
|
||||
if ifc != nil {
|
||||
SetQueryValues(ifc, values)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
if value != "" {
|
||||
name := elemType.Field(i).Tag.Get("ArgName")
|
||||
if name == "" {
|
||||
name = fieldName
|
||||
}
|
||||
if prefix != "" {
|
||||
name = prefix + name
|
||||
}
|
||||
values.Set(name, value)
|
||||
}
|
||||
}
|
||||
}
|
52
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/encoding_test.go
generated
vendored
Normal file
52
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/encoding_test.go
generated
vendored
Normal file
|
@ -0,0 +1,52 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TestString string
|
||||
|
||||
type SubStruct struct {
|
||||
A string
|
||||
B int
|
||||
}
|
||||
|
||||
type TestStruct struct {
|
||||
Format string
|
||||
Version string
|
||||
AccessKeyId string
|
||||
Timestamp time.Time
|
||||
Empty string
|
||||
IntValue int `ArgName:"int-value"`
|
||||
BoolPtr *bool `ArgName:"bool-ptr"`
|
||||
IntPtr *int `ArgName:"int-ptr"`
|
||||
StringArray []string `ArgName:"str-array"`
|
||||
StructArray []SubStruct
|
||||
test TestString
|
||||
tests []TestString
|
||||
}
|
||||
|
||||
func TestConvertToQueryValues(t *testing.T) {
|
||||
boolValue := true
|
||||
request := TestStruct{
|
||||
Format: "JSON",
|
||||
Version: "1.0",
|
||||
Timestamp: time.Date(2015, time.Month(5), 26, 1, 2, 3, 4, time.UTC),
|
||||
IntValue: 10,
|
||||
BoolPtr: &boolValue,
|
||||
StringArray: []string{"abc", "xyz"},
|
||||
StructArray: []SubStruct{
|
||||
SubStruct{A: "a", B: 1},
|
||||
SubStruct{A: "x", B: 2},
|
||||
},
|
||||
test: TestString("test"),
|
||||
tests: []TestString{TestString("test1"), TestString("test2")},
|
||||
}
|
||||
result := ConvertToQueryValues(&request).Encode()
|
||||
const expectedResult = "Format=JSON&StructArray.1.A=a&StructArray.1.B=1&StructArray.2.A=x&StructArray.2.B=2&Timestamp=2015-05-26T01%3A02%3A03Z&Version=1.0&bool-ptr=true&int-value=10&str-array=%5B%22abc%22%2C%22xyz%22%5D&test=test&tests=%5B%22test1%22%2C%22test2%22%5D"
|
||||
if result != expectedResult {
|
||||
t.Error("Incorrect encoding: ", result)
|
||||
}
|
||||
|
||||
}
|
62
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/iso6801.go
generated
vendored
Normal file
62
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/iso6801.go
generated
vendored
Normal file
|
@ -0,0 +1,62 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetISO8601TimeStamp gets timestamp string in ISO8601 format
|
||||
func GetISO8601TimeStamp(ts time.Time) string {
|
||||
t := ts.UTC()
|
||||
return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02dZ", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
|
||||
}
|
||||
|
||||
const formatISO8601 = "2006-01-02T15:04:05Z"
|
||||
const jsonFormatISO8601 = `"` + formatISO8601 + `"`
|
||||
|
||||
// A ISO6801Time represents a time in ISO8601 format
|
||||
type ISO6801Time time.Time
|
||||
|
||||
// New constructs a new iso8601.Time instance from an existing
|
||||
// time.Time instance. This causes the nanosecond field to be set to
|
||||
// 0, and its time zone set to a fixed zone with no offset from UTC
|
||||
// (but it is *not* UTC itself).
|
||||
func NewISO6801Time(t time.Time) ISO6801Time {
|
||||
return ISO6801Time(time.Date(
|
||||
t.Year(),
|
||||
t.Month(),
|
||||
t.Day(),
|
||||
t.Hour(),
|
||||
t.Minute(),
|
||||
t.Second(),
|
||||
0,
|
||||
time.UTC,
|
||||
))
|
||||
}
|
||||
|
||||
// IsDefault checks if the time is default
|
||||
func (it *ISO6801Time) IsDefault() bool {
|
||||
return *it == ISO6801Time{}
|
||||
}
|
||||
|
||||
// MarshalJSON serializes the ISO6801Time into JSON string
|
||||
func (it ISO6801Time) MarshalJSON() ([]byte, error) {
|
||||
return []byte(time.Time(it).Format(jsonFormatISO8601)), nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON deserializes the ISO6801Time from JSON string
|
||||
func (it *ISO6801Time) UnmarshalJSON(data []byte) error {
|
||||
if string(data) == "\"\"" {
|
||||
return nil
|
||||
}
|
||||
t, err := time.ParseInLocation(jsonFormatISO8601, string(data), time.UTC)
|
||||
if err == nil {
|
||||
*it = ISO6801Time(t)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// String returns the time in ISO6801Time format
|
||||
func (it ISO6801Time) String() string {
|
||||
return time.Time(it).Format(formatISO8601)
|
||||
}
|
50
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/iso6801_test.go
generated
vendored
Normal file
50
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/iso6801_test.go
generated
vendored
Normal file
|
@ -0,0 +1,50 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestISO8601Time(t *testing.T) {
|
||||
now := NewISO6801Time(time.Now().UTC())
|
||||
|
||||
data, err := json.Marshal(now)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = time.Parse(`"`+formatISO8601+`"`, string(data))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var now2 ISO6801Time
|
||||
err = json.Unmarshal(data, &now2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if now != now2 {
|
||||
t.Errorf("Time %s does not equal expected %s", now2, now)
|
||||
}
|
||||
|
||||
if now.String() != now2.String() {
|
||||
t.Fatalf("String format for %s does not equal expected %s", now2, now)
|
||||
}
|
||||
|
||||
type TestTimeStruct struct {
|
||||
A int
|
||||
B *ISO6801Time
|
||||
}
|
||||
var testValue TestTimeStruct
|
||||
err = json.Unmarshal([]byte("{\"A\": 1, \"B\":\"\"}"), &testValue)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("%v", testValue)
|
||||
if !testValue.B.IsDefault() {
|
||||
t.Fatal("Invaid Unmarshal result for ISO6801Time from empty value")
|
||||
}
|
||||
t.Logf("ISO6801Time String(): %s", now2.String())
|
||||
}
|
40
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/signature.go
generated
vendored
Normal file
40
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/signature.go
generated
vendored
Normal file
|
@ -0,0 +1,40 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
"encoding/base64"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
//CreateSignature creates signature for string following Aliyun rules
|
||||
func CreateSignature(stringToSignature, accessKeySecret string) string {
|
||||
// Crypto by HMAC-SHA1
|
||||
hmacSha1 := hmac.New(sha1.New, []byte(accessKeySecret))
|
||||
hmacSha1.Write([]byte(stringToSignature))
|
||||
sign := hmacSha1.Sum(nil)
|
||||
|
||||
// Encode to Base64
|
||||
base64Sign := base64.StdEncoding.EncodeToString(sign)
|
||||
|
||||
return base64Sign
|
||||
}
|
||||
|
||||
func percentReplace(str string) string {
|
||||
str = strings.Replace(str, "+", "%20", -1)
|
||||
str = strings.Replace(str, "*", "%2A", -1)
|
||||
str = strings.Replace(str, "%7E", "~", -1)
|
||||
|
||||
return str
|
||||
}
|
||||
|
||||
// CreateSignatureForRequest creates signature for query string values
|
||||
func CreateSignatureForRequest(method string, values *url.Values, accessKeySecret string) string {
|
||||
|
||||
canonicalizedQueryString := percentReplace(values.Encode())
|
||||
|
||||
stringToSign := method + "&%2F&" + url.QueryEscape(canonicalizedQueryString)
|
||||
|
||||
return CreateSignature(stringToSign, accessKeySecret)
|
||||
}
|
14
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/signature_test.go
generated
vendored
Normal file
14
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/signature_test.go
generated
vendored
Normal file
|
@ -0,0 +1,14 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCreateSignature(t *testing.T) {
|
||||
|
||||
str := "GET&%2F&AccessKeyId%3Dtestid%26Action%3DDescribeRegions%26Format%3DXML%26RegionId%3Dregion1%26SignatureMethod%3DHMAC-SHA1%26SignatureNonce%3DNwDAxvLU6tFE0DVb%26SignatureVersion%3D1.0%26TimeStamp%3D2012-12-26T10%253A33%253A56Z%26Version%3D2014-05-26"
|
||||
|
||||
signature := CreateSignature(str, "testsecret")
|
||||
|
||||
t.Log(signature)
|
||||
}
|
134
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/util.go
generated
vendored
Normal file
134
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/util.go
generated
vendored
Normal file
|
@ -0,0 +1,134 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
srand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
//CreateRandomString create random string
|
||||
func CreateRandomString() string {
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randInt := rand.Int63()
|
||||
randStr := strconv.FormatInt(randInt, 36)
|
||||
|
||||
return randStr
|
||||
}
|
||||
|
||||
// Encode encodes the values into ``URL encoded'' form
|
||||
// ("acl&bar=baz&foo=quux") sorted by key.
|
||||
func Encode(v url.Values) string {
|
||||
if v == nil {
|
||||
return ""
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
keys := make([]string, 0, len(v))
|
||||
for k := range v {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, k := range keys {
|
||||
vs := v[k]
|
||||
prefix := url.QueryEscape(k)
|
||||
for _, v := range vs {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteByte('&')
|
||||
}
|
||||
buf.WriteString(prefix)
|
||||
if v != "" {
|
||||
buf.WriteString("=")
|
||||
buf.WriteString(url.QueryEscape(v))
|
||||
}
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func GetGMTime() string {
|
||||
return time.Now().UTC().Format(http.TimeFormat)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
func randUint32() uint32 {
|
||||
return randUint32Slice(1)[0]
|
||||
}
|
||||
|
||||
func randUint32Slice(c int) []uint32 {
|
||||
b := make([]byte, c*4)
|
||||
|
||||
_, err := srand.Read(b)
|
||||
|
||||
if err != nil {
|
||||
// fail back to insecure rand
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
for i := range b {
|
||||
b[i] = byte(rand.Int())
|
||||
}
|
||||
}
|
||||
|
||||
n := make([]uint32, c)
|
||||
|
||||
for i := range n {
|
||||
n[i] = binary.BigEndian.Uint32(b[i*4 : i*4+4])
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func toByte(n uint32, st, ed byte) byte {
|
||||
return byte(n%uint32(ed-st+1) + uint32(st))
|
||||
}
|
||||
|
||||
func toDigit(n uint32) byte {
|
||||
return toByte(n, '0', '9')
|
||||
}
|
||||
|
||||
func toLowerLetter(n uint32) byte {
|
||||
return toByte(n, 'a', 'z')
|
||||
}
|
||||
|
||||
func toUpperLetter(n uint32) byte {
|
||||
return toByte(n, 'A', 'Z')
|
||||
}
|
||||
|
||||
type convFunc func(uint32) byte
|
||||
|
||||
var convFuncs = []convFunc{toDigit, toLowerLetter, toUpperLetter}
|
||||
|
||||
// tools for generating a random ECS instance password
|
||||
// from 8 to 30 char MUST contain digit upper, case letter and upper case letter
|
||||
// http://docs.aliyun.com/#/pub/ecs/open-api/instance&createinstance
|
||||
func GenerateRandomECSPassword() string {
|
||||
|
||||
// [8, 30]
|
||||
l := int(randUint32()%23 + 8)
|
||||
|
||||
n := randUint32Slice(l)
|
||||
|
||||
b := make([]byte, l)
|
||||
|
||||
b[0] = toDigit(n[0])
|
||||
b[1] = toLowerLetter(n[1])
|
||||
b[2] = toUpperLetter(n[2])
|
||||
|
||||
for i := 3; i < l; i++ {
|
||||
b[i] = convFuncs[n[i]%3](n[i])
|
||||
}
|
||||
|
||||
s := make([]byte, l)
|
||||
perm := rand.Perm(l)
|
||||
for i, v := range perm {
|
||||
s[v] = b[i]
|
||||
}
|
||||
|
||||
return string(s)
|
||||
|
||||
}
|
43
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/util_test.go
generated
vendored
Normal file
43
Godeps/_workspace/src/github.com/denverdino/aliyungo/util/util_test.go
generated
vendored
Normal file
|
@ -0,0 +1,43 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGenerateRandomECSPassword(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
s := GenerateRandomECSPassword()
|
||||
|
||||
if len(s) < 8 || len(s) > 30 {
|
||||
t.Errorf("Generated ECS password [%v]: bad len", s)
|
||||
}
|
||||
|
||||
hasDigit := false
|
||||
hasLower := false
|
||||
hasUpper := false
|
||||
|
||||
for j := range s {
|
||||
|
||||
switch {
|
||||
case '0' <= s[j] && s[j] <= '9':
|
||||
hasDigit = true
|
||||
case 'a' <= s[j] && s[j] <= 'z':
|
||||
hasLower = true
|
||||
case 'A' <= s[j] && s[j] <= 'Z':
|
||||
hasUpper = true
|
||||
}
|
||||
}
|
||||
|
||||
if !hasDigit {
|
||||
t.Errorf("Generated ECS password [%v]: no digit", s)
|
||||
}
|
||||
|
||||
if !hasLower {
|
||||
t.Errorf("Generated ECS password [%v]: no lower letter ", s)
|
||||
}
|
||||
|
||||
if !hasUpper {
|
||||
t.Errorf("Generated ECS password [%v]: no upper letter", s)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,7 @@ machine:
|
|||
BASE_OLD: ../../../$HOME/.gvm/pkgsets/old/global/$BASE_DIR
|
||||
BASE_STABLE: ../../../$HOME/.gvm/pkgsets/stable/global/$BASE_DIR
|
||||
# BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR
|
||||
DOCKER_BUILDTAGS: "include_rados"
|
||||
DOCKER_BUILDTAGS: "include_rados include_oss"
|
||||
# Workaround Circle parsing dumb bugs and/or YAML wonkyness
|
||||
CIRCLE_PAIN: "mode: set"
|
||||
# Ceph config
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
_ "github.com/docker/distribution/registry/storage/driver/filesystem"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/oss"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/s3"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/swift"
|
||||
"github.com/docker/distribution/uuid"
|
||||
|
|
31
docs/storage-drivers/oss.md
Executable file
31
docs/storage-drivers/oss.md
Executable file
|
@ -0,0 +1,31 @@
|
|||
<!--GITHUB
|
||||
page_title: Aliyun OSS storage driver
|
||||
page_description: Explains how to use the Aliyun OSS storage drivers
|
||||
page_keywords: registry, service, driver, images, storage, OSS, aliyun
|
||||
IGNORES-->
|
||||
|
||||
# Aliyun OSS storage driver
|
||||
|
||||
An implementation of the `storagedriver.StorageDriver` interface which uses [Aliyun OSS](http://www.aliyun.com/product/oss) for object storage.
|
||||
|
||||
## Parameters
|
||||
|
||||
* `accesskeyid`: Your access key ID.
|
||||
|
||||
* `accesskeysecret`: Your access key secret.
|
||||
|
||||
* `region`: The name of the OSS region in which you would like to store objects (for example `oss-cn-beijing`). For a list of regions, you can look at <http://docs.aliyun.com/#/oss/product-documentation/domain-region>
|
||||
|
||||
* `endpoint`: (optional) By default, the endpoint shoulb be `<bucket>.<region>.aliyuncs.com` or `<bucket>.<region>-internal.aliyuncs.com` (when internal=true). You can change the default endpoint via changing this value.
|
||||
|
||||
* `internal`: (optional) Using internal endpoint or the public endpoint for OSS access. The default is false. For a list of regions, you can look at <http://docs.aliyun.com/#/oss/product-documentation/domain-region>
|
||||
|
||||
* `bucket`: The name of your OSS bucket where you wish to store objects (needs to already be created prior to driver initialization).
|
||||
|
||||
* `encrypt`: (optional) Whether you would like your data encrypted on the server side (defaults to false if not specified).
|
||||
|
||||
* `secure`: (optional) Whether you would like to transfer data to the bucket over ssl or not. Defaults to false if not specified.
|
||||
|
||||
* `chunksize`: (optional) The default part size for multipart uploads (performed by WriteStream) to OSS. The default is 10 MB. Keep in mind that the minimum part size for OSS is 5MB. You might experience better performance for larger chunk sizes depending on the speed of your connection to OSS.
|
||||
|
||||
* `rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root).
|
|
@ -24,6 +24,7 @@ This storage driver package comes bundled with several drivers:
|
|||
- [azure](storage-drivers/azure.md): A driver storing objects in [Microsoft Azure Blob Storage](http://azure.microsoft.com/en-us/services/storage/).
|
||||
- [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool.
|
||||
- [swift](storage-drivers/swift.md): A driver storing objects in [Openstack Swift](http://docs.openstack.org/developer/swift/).
|
||||
- [oss](storage-drivers/oss.md): A driver storing objects in [Aliyun OSS](http://www.aliyun.com/product/oss).
|
||||
|
||||
## Storage Driver API
|
||||
|
||||
|
|
3
registry/storage/driver/oss/doc.go
Normal file
3
registry/storage/driver/oss/doc.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
// Package oss implements the Aliyun OSS Storage driver backend. Support can be
|
||||
// enabled by including the "include_oss" build tag.
|
||||
package oss
|
813
registry/storage/driver/oss/oss.go
Normal file
813
registry/storage/driver/oss/oss.go
Normal file
|
@ -0,0 +1,813 @@
|
|||
// Package oss provides a storagedriver.StorageDriver implementation to
|
||||
// store blobs in Aliyun OSS cloud storage.
|
||||
//
|
||||
// This package leverages the denverdino/aliyungo client library for interfacing with
|
||||
// oss.
|
||||
//
|
||||
// Because OSS is a key, value store the Stat call does not support last modification
|
||||
// time for directories (directories are an abstraction for key, value stores)
|
||||
//
|
||||
// +build include_oss
|
||||
|
||||
package oss
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution/context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/denverdino/aliyungo/oss"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/base"
|
||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||
)
|
||||
|
||||
const driverName = "oss"
|
||||
|
||||
// minChunkSize defines the minimum multipart upload chunk size
|
||||
// OSS API requires multipart upload chunks to be at least 5MB
|
||||
const minChunkSize = 5 << 20
|
||||
|
||||
const defaultChunkSize = 2 * minChunkSize
|
||||
|
||||
// listMax is the largest amount of objects you can request from OSS in a list call
|
||||
const listMax = 1000
|
||||
|
||||
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
|
||||
type DriverParameters struct {
|
||||
AccessKeyID string
|
||||
AccessKeySecret string
|
||||
Bucket string
|
||||
Region oss.Region
|
||||
Internal bool
|
||||
Encrypt bool
|
||||
Secure bool
|
||||
ChunkSize int64
|
||||
RootDirectory string
|
||||
Endpoint string
|
||||
}
|
||||
|
||||
func init() {
|
||||
factory.Register(driverName, &ossDriverFactory{})
|
||||
}
|
||||
|
||||
// ossDriverFactory implements the factory.StorageDriverFactory interface
|
||||
type ossDriverFactory struct{}
|
||||
|
||||
func (factory *ossDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
return FromParameters(parameters)
|
||||
}
|
||||
|
||||
type driver struct {
|
||||
Client *oss.Client
|
||||
Bucket *oss.Bucket
|
||||
ChunkSize int64
|
||||
Encrypt bool
|
||||
RootDirectory string
|
||||
|
||||
pool sync.Pool // pool []byte buffers used for WriteStream
|
||||
zeros []byte // shared, zero-valued buffer used for WriteStream
|
||||
}
|
||||
|
||||
type baseEmbed struct {
|
||||
base.Base
|
||||
}
|
||||
|
||||
// Driver is a storagedriver.StorageDriver implementation backed by Aliyun OSS
|
||||
// Objects are stored at absolute keys in the provided bucket.
|
||||
type Driver struct {
|
||||
baseEmbed
|
||||
}
|
||||
|
||||
// FromParameters constructs a new Driver with a given parameters map
|
||||
// Required parameters:
|
||||
// - accesskey
|
||||
// - secretkey
|
||||
// - region
|
||||
// - bucket
|
||||
// - encrypt
|
||||
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||
// Providing no values for these is valid in case the user is authenticating
|
||||
// with an IAM on an ec2 instance (in which case the instance credentials will
|
||||
// be summoned when GetAuth is called)
|
||||
accessKey, ok := parameters["accesskeyid"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("No accesskeyid parameter provided")
|
||||
}
|
||||
secretKey, ok := parameters["accesskeysecret"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("No accesskeysecret parameter provided")
|
||||
}
|
||||
|
||||
regionName, ok := parameters["region"]
|
||||
if !ok || fmt.Sprint(regionName) == "" {
|
||||
return nil, fmt.Errorf("No region parameter provided")
|
||||
}
|
||||
|
||||
bucket, ok := parameters["bucket"]
|
||||
if !ok || fmt.Sprint(bucket) == "" {
|
||||
return nil, fmt.Errorf("No bucket parameter provided")
|
||||
}
|
||||
|
||||
internalBool := false
|
||||
internal, ok := parameters["internal"]
|
||||
if ok {
|
||||
internalBool, ok = internal.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The internal parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
encryptBool := false
|
||||
encrypt, ok := parameters["encrypt"]
|
||||
if ok {
|
||||
encryptBool, ok = encrypt.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
secureBool := true
|
||||
secure, ok := parameters["secure"]
|
||||
if ok {
|
||||
secureBool, ok = secure.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The secure parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
chunkSize := int64(defaultChunkSize)
|
||||
chunkSizeParam, ok := parameters["chunksize"]
|
||||
if ok {
|
||||
switch v := chunkSizeParam.(type) {
|
||||
case string:
|
||||
vv, err := strconv.ParseInt(v, 0, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
|
||||
}
|
||||
chunkSize = vv
|
||||
case int64:
|
||||
chunkSize = v
|
||||
case int, uint, int32, uint32, uint64:
|
||||
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
|
||||
}
|
||||
|
||||
if chunkSize < minChunkSize {
|
||||
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
|
||||
}
|
||||
}
|
||||
|
||||
rootDirectory, ok := parameters["rootdirectory"]
|
||||
if !ok {
|
||||
rootDirectory = ""
|
||||
}
|
||||
|
||||
endpoint, ok := parameters["endpoint"]
|
||||
if !ok {
|
||||
endpoint = ""
|
||||
}
|
||||
|
||||
params := DriverParameters{
|
||||
AccessKeyID: fmt.Sprint(accessKey),
|
||||
AccessKeySecret: fmt.Sprint(secretKey),
|
||||
Bucket: fmt.Sprint(bucket),
|
||||
Region: oss.Region(fmt.Sprint(regionName)),
|
||||
ChunkSize: chunkSize,
|
||||
RootDirectory: fmt.Sprint(rootDirectory),
|
||||
Encrypt: encryptBool,
|
||||
Secure: secureBool,
|
||||
Internal: internalBool,
|
||||
Endpoint: fmt.Sprint(endpoint),
|
||||
}
|
||||
|
||||
return New(params)
|
||||
}
|
||||
|
||||
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
|
||||
// bucketName
|
||||
func New(params DriverParameters) (*Driver, error) {
|
||||
|
||||
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
|
||||
client.SetEndpoint(params.Endpoint)
|
||||
bucket := client.Bucket(params.Bucket)
|
||||
|
||||
// Validate that the given credentials have at least read permissions in the
|
||||
// given bucket scope.
|
||||
if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(tg123): Currently multipart uploads have no timestamps, so this would be unwise
|
||||
// if you initiated a new OSS client while another one is running on the same bucket.
|
||||
|
||||
d := &driver{
|
||||
Client: client,
|
||||
Bucket: bucket,
|
||||
ChunkSize: params.ChunkSize,
|
||||
Encrypt: params.Encrypt,
|
||||
RootDirectory: params.RootDirectory,
|
||||
zeros: make([]byte, params.ChunkSize),
|
||||
}
|
||||
|
||||
d.pool.New = func() interface{} {
|
||||
return make([]byte, d.ChunkSize)
|
||||
}
|
||||
|
||||
return &Driver{
|
||||
baseEmbed: baseEmbed{
|
||||
Base: base.Base{
|
||||
StorageDriver: d,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Implement the storagedriver.StorageDriver interface
|
||||
|
||||
func (d *driver) Name() string {
|
||||
return driverName
|
||||
}
|
||||
|
||||
// GetContent retrieves the content stored at "path" as a []byte.
|
||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||
content, err := d.Bucket.Get(d.ossPath(path))
|
||||
if err != nil {
|
||||
return nil, parseError(path, err)
|
||||
}
|
||||
return content, nil
|
||||
}
|
||||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||
return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
|
||||
}
|
||||
|
||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
||||
// given byte offset.
|
||||
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
headers := make(http.Header)
|
||||
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
|
||||
|
||||
resp, err := d.Bucket.GetResponseWithHeaders(d.ossPath(path), headers)
|
||||
if err != nil {
|
||||
return nil, parseError(path, err)
|
||||
}
|
||||
|
||||
// Due to Aliyun OSS API, status 200 and whole object will be return instead of an
|
||||
// InvalidRange error when range is invalid.
|
||||
//
|
||||
// OSS sever will always return http.StatusPartialContent if range is acceptable.
|
||||
if resp.StatusCode != http.StatusPartialContent {
|
||||
resp.Body.Close()
|
||||
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// WriteStream stores the contents of the provided io.Reader at a
|
||||
// location designated by the given path. The driver will know it has
|
||||
// received the full contents when the reader returns io.EOF. The number
|
||||
// of successfully READ bytes will be returned, even if an error is
|
||||
// returned. May be used to resume writing a stream by providing a nonzero
|
||||
// offset. Offsets past the current size will write from the position
|
||||
// beyond the end of the file.
|
||||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
||||
partNumber := 1
|
||||
bytesRead := 0
|
||||
var putErrChan chan error
|
||||
parts := []oss.Part{}
|
||||
var part oss.Part
|
||||
done := make(chan struct{}) // stopgap to free up waiting goroutines
|
||||
|
||||
multi, err := d.Bucket.InitMulti(d.ossPath(path), d.getContentType(), getPermissions(), d.getOptions())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
buf := d.getbuf()
|
||||
|
||||
// We never want to leave a dangling multipart upload, our only consistent state is
|
||||
// when there is a whole object at path. This is in order to remain consistent with
|
||||
// the stat call.
|
||||
//
|
||||
// Note that if the machine dies before executing the defer, we will be left with a dangling
|
||||
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
|
||||
// made prior to the machine crashing.
|
||||
defer func() {
|
||||
if putErrChan != nil {
|
||||
if putErr := <-putErrChan; putErr != nil {
|
||||
err = putErr
|
||||
}
|
||||
}
|
||||
|
||||
if len(parts) > 0 {
|
||||
if multi == nil {
|
||||
// Parts should be empty if the multi is not initialized
|
||||
panic("Unreachable")
|
||||
} else {
|
||||
if multi.Complete(parts) != nil {
|
||||
multi.Abort()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
d.putbuf(buf) // needs to be here to pick up new buf value
|
||||
close(done) // free up any waiting goroutines
|
||||
}()
|
||||
|
||||
// Fills from 0 to total from current
|
||||
fromSmallCurrent := func(total int64) error {
|
||||
current, err := d.ReadStream(ctx, path, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bytesRead = 0
|
||||
for int64(bytesRead) < total {
|
||||
//The loop should very rarely enter a second iteration
|
||||
nn, err := current.Read(buf[bytesRead:total])
|
||||
bytesRead += nn
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fills from parameter to chunkSize from reader
|
||||
fromReader := func(from int64) error {
|
||||
bytesRead = 0
|
||||
for from+int64(bytesRead) < d.ChunkSize {
|
||||
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
||||
totalRead += int64(nn)
|
||||
bytesRead += nn
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if putErrChan == nil {
|
||||
putErrChan = make(chan error)
|
||||
} else {
|
||||
if putErr := <-putErrChan; putErr != nil {
|
||||
putErrChan = nil
|
||||
return putErr
|
||||
}
|
||||
}
|
||||
|
||||
go func(bytesRead int, from int64, buf []byte) {
|
||||
defer d.putbuf(buf) // this buffer gets dropped after this call
|
||||
|
||||
// DRAGONS(stevvooe): There are few things one might want to know
|
||||
// about this section. First, the putErrChan is expecting an error
|
||||
// and a nil or just a nil to come through the channel. This is
|
||||
// covered by the silly defer below. The other aspect is the OSS
|
||||
// retry backoff to deal with RequestTimeout errors. Even though
|
||||
// the underlying OSS library should handle it, it doesn't seem to
|
||||
// be part of the shouldRetry function (see denverdino/aliyungo/oss).
|
||||
defer func() {
|
||||
select {
|
||||
case putErrChan <- nil: // for some reason, we do this no matter what.
|
||||
case <-done:
|
||||
return // ensure we don't leak the goroutine
|
||||
}
|
||||
}()
|
||||
|
||||
if bytesRead <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
var part oss.Part
|
||||
|
||||
loop:
|
||||
for retries := 0; retries < 5; retries++ {
|
||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
|
||||
if err == nil {
|
||||
break // success!
|
||||
}
|
||||
|
||||
// NOTE(stevvooe): This retry code tries to only retry under
|
||||
// conditions where the OSS package does not. We may add oss
|
||||
// error codes to the below if we see others bubble up in the
|
||||
// application. Right now, the most troubling is
|
||||
// RequestTimeout, which seems to only triggered when a tcp
|
||||
// connection to OSS slows to a crawl. If the RequestTimeout
|
||||
// ends up getting added to the OSS library and we don't see
|
||||
// other errors, this retry loop can be removed.
|
||||
switch err := err.(type) {
|
||||
case *oss.Error:
|
||||
switch err.Code {
|
||||
case "RequestTimeout":
|
||||
// allow retries on only this error.
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
backoff := 100 * time.Millisecond * time.Duration(retries+1)
|
||||
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("error putting part, aborting: %v", err)
|
||||
select {
|
||||
case putErrChan <- err:
|
||||
case <-done:
|
||||
return // don't leak the goroutine
|
||||
}
|
||||
}
|
||||
|
||||
// parts and partNumber are safe, because this function is the
|
||||
// only one modifying them and we force it to be executed
|
||||
// serially.
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}(bytesRead, from, buf)
|
||||
|
||||
buf = d.getbuf() // use a new buffer for the next call
|
||||
return nil
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
resp, err := d.Bucket.Head(d.ossPath(path), nil)
|
||||
if err != nil {
|
||||
if ossErr, ok := err.(*oss.Error); !ok || ossErr.Code != "NoSuchKey" {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
currentLength := int64(0)
|
||||
if err == nil {
|
||||
currentLength = resp.ContentLength
|
||||
}
|
||||
|
||||
if currentLength >= offset {
|
||||
if offset < d.ChunkSize {
|
||||
// chunkSize > currentLength >= offset
|
||||
if err = fromSmallCurrent(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+offset < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
} else {
|
||||
// currentLength >= offset >= chunkSize
|
||||
_, part, err = multi.PutPartCopy(partNumber,
|
||||
oss.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
|
||||
d.Bucket.Path(d.ossPath(path)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}
|
||||
} else {
|
||||
// Fills between parameters with 0s but only when to - from <= chunkSize
|
||||
fromZeroFillSmall := func(from, to int64) error {
|
||||
bytesRead = 0
|
||||
for from+int64(bytesRead) < to {
|
||||
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
|
||||
bytesRead += nn
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fills between parameters with 0s, making new parts
|
||||
fromZeroFillLarge := func(from, to int64) error {
|
||||
bytesRead64 := int64(0)
|
||||
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bytesRead64 += d.ChunkSize
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}
|
||||
|
||||
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
|
||||
}
|
||||
|
||||
// currentLength < offset
|
||||
if currentLength < d.ChunkSize {
|
||||
if offset < d.ChunkSize {
|
||||
// chunkSize > offset > currentLength
|
||||
if err = fromSmallCurrent(currentLength); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromZeroFillSmall(currentLength, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+offset < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
} else {
|
||||
// offset >= chunkSize > currentLength
|
||||
if err = fromSmallCurrent(currentLength); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
|
||||
//Zero fill from chunkSize up to offset, then some reader
|
||||
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset % d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// offset > currentLength >= chunkSize
|
||||
_, part, err = multi.PutPartCopy(partNumber,
|
||||
oss.CopyOptions{},
|
||||
d.Bucket.Path(d.ossPath(path)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
|
||||
//Zero fill from currentLength up to offset, then some reader
|
||||
if err = fromZeroFillLarge(currentLength, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
if err = fromReader(0); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if int64(bytesRead) < d.ChunkSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return totalRead, nil
|
||||
}
|
||||
|
||||
// Stat retrieves the FileInfo for the given path, including the current size
|
||||
// in bytes and the creation time.
|
||||
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "", "", 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fi := storagedriver.FileInfoFields{
|
||||
Path: path,
|
||||
}
|
||||
|
||||
if len(listResponse.Contents) == 1 {
|
||||
if listResponse.Contents[0].Key != d.ossPath(path) {
|
||||
fi.IsDir = true
|
||||
} else {
|
||||
fi.IsDir = false
|
||||
fi.Size = listResponse.Contents[0].Size
|
||||
|
||||
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fi.ModTime = timestamp
|
||||
}
|
||||
} else if len(listResponse.CommonPrefixes) == 1 {
|
||||
fi.IsDir = true
|
||||
} else {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
}
|
||||
|
||||
// List returns a list of the objects that are direct descendants of the given path.
|
||||
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||
if path != "/" && path[len(path)-1] != '/' {
|
||||
path = path + "/"
|
||||
}
|
||||
|
||||
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
|
||||
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
|
||||
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
|
||||
prefix := ""
|
||||
if d.ossPath("") == "" {
|
||||
prefix = "/"
|
||||
}
|
||||
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "/", "", listMax)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
files := []string{}
|
||||
directories := []string{}
|
||||
|
||||
for {
|
||||
for _, key := range listResponse.Contents {
|
||||
files = append(files, strings.Replace(key.Key, d.ossPath(""), prefix, 1))
|
||||
}
|
||||
|
||||
for _, commonPrefix := range listResponse.CommonPrefixes {
|
||||
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.ossPath(""), prefix, 1))
|
||||
}
|
||||
|
||||
if listResponse.IsTruncated {
|
||||
listResponse, err = d.Bucket.List(d.ossPath(path), "/", listResponse.NextMarker, listMax)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return append(files, directories...), nil
|
||||
}
|
||||
|
||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||
// object.
|
||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
logrus.Infof("Move from %s to %s", d.Bucket.Path("/"+d.ossPath(sourcePath)), d.ossPath(destPath))
|
||||
/* This is terrible, but aws doesn't have an actual move. */
|
||||
_, err := d.Bucket.PutCopy(d.ossPath(destPath), getPermissions(),
|
||||
oss.CopyOptions{
|
||||
//Options: d.getOptions(),
|
||||
//ContentType: d.getContentType()
|
||||
},
|
||||
d.Bucket.Path(d.ossPath(sourcePath)))
|
||||
if err != nil {
|
||||
return parseError(sourcePath, err)
|
||||
}
|
||||
|
||||
return d.Delete(ctx, sourcePath)
|
||||
}
|
||||
|
||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "", "", listMax)
|
||||
if err != nil || len(listResponse.Contents) == 0 {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
ossObjects := make([]oss.Object, listMax)
|
||||
|
||||
for len(listResponse.Contents) > 0 {
|
||||
for index, key := range listResponse.Contents {
|
||||
ossObjects[index].Key = key.Key
|
||||
}
|
||||
|
||||
err := d.Bucket.DelMulti(oss.Delete{Quiet: false, Objects: ossObjects[0:len(listResponse.Contents)]})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
listResponse, err = d.Bucket.List(d.ossPath(path), "", "", listMax)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
|
||||
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
|
||||
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
||||
methodString := "GET"
|
||||
method, ok := options["method"]
|
||||
if ok {
|
||||
methodString, ok = method.(string)
|
||||
if !ok || (methodString != "GET" && methodString != "HEAD") {
|
||||
return "", storagedriver.ErrUnsupportedMethod
|
||||
}
|
||||
}
|
||||
|
||||
expiresTime := time.Now().Add(20 * time.Minute)
|
||||
logrus.Infof("expiresTime: %d", expiresTime)
|
||||
|
||||
expires, ok := options["expiry"]
|
||||
if ok {
|
||||
et, ok := expires.(time.Time)
|
||||
if ok {
|
||||
expiresTime = et
|
||||
}
|
||||
}
|
||||
logrus.Infof("expiresTime: %d", expiresTime)
|
||||
testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
|
||||
logrus.Infof("testURL: %s", testURL)
|
||||
return testURL, nil
|
||||
}
|
||||
|
||||
func (d *driver) ossPath(path string) string {
|
||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||
}
|
||||
|
||||
// S3BucketKey returns the OSS bucket key for the given storage driver path.
|
||||
func (d *Driver) S3BucketKey(path string) string {
|
||||
return d.StorageDriver.(*driver).ossPath(path)
|
||||
}
|
||||
|
||||
func parseError(path string, err error) error {
|
||||
if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "NoSuchKey" {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func hasCode(err error, code string) bool {
|
||||
ossErr, ok := err.(*oss.Error)
|
||||
return ok && ossErr.Code == code
|
||||
}
|
||||
|
||||
func (d *driver) getOptions() oss.Options {
|
||||
return oss.Options{ServerSideEncryption: d.Encrypt}
|
||||
}
|
||||
|
||||
func getPermissions() oss.ACL {
|
||||
return oss.Private
|
||||
}
|
||||
|
||||
func (d *driver) getContentType() string {
|
||||
return "application/octet-stream"
|
||||
}
|
||||
|
||||
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
|
||||
func (d *driver) getbuf() []byte {
|
||||
return d.pool.Get().([]byte)
|
||||
}
|
||||
|
||||
func (d *driver) putbuf(p []byte) {
|
||||
copy(p, d.zeros)
|
||||
d.pool.Put(p)
|
||||
}
|
144
registry/storage/driver/oss/oss_test.go
Normal file
144
registry/storage/driver/oss/oss_test.go
Normal file
|
@ -0,0 +1,144 @@
|
|||
// +build include_oss
|
||||
|
||||
package oss
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
||||
alioss "github.com/denverdino/aliyungo/oss"
|
||||
"github.com/docker/distribution/context"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||
//"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
var ossDriverConstructor func(rootDirectory string) (*Driver, error)
|
||||
|
||||
var skipCheck func() string
|
||||
|
||||
func init() {
|
||||
accessKey := os.Getenv("ALIYUN_ACCESS_KEY_ID")
|
||||
secretKey := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
|
||||
bucket := os.Getenv("OSS_BUCKET")
|
||||
region := os.Getenv("OSS_REGION")
|
||||
internal := os.Getenv("OSS_INTERNAL")
|
||||
encrypt := os.Getenv("OSS_ENCRYPT")
|
||||
secure := os.Getenv("OSS_SECURE")
|
||||
endpoint := os.Getenv("OSS_ENDPOINT")
|
||||
root, err := ioutil.TempDir("", "driver-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer os.Remove(root)
|
||||
|
||||
ossDriverConstructor = func(rootDirectory string) (*Driver, error) {
|
||||
encryptBool := false
|
||||
if encrypt != "" {
|
||||
encryptBool, err = strconv.ParseBool(encrypt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
secureBool := false
|
||||
if secure != "" {
|
||||
secureBool, err = strconv.ParseBool(secure)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
internalBool := false
|
||||
if internal != "" {
|
||||
internalBool, err = strconv.ParseBool(internal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
parameters := DriverParameters{
|
||||
AccessKeyID: accessKey,
|
||||
AccessKeySecret: secretKey,
|
||||
Bucket: bucket,
|
||||
Region: alioss.Region(region),
|
||||
Internal: internalBool,
|
||||
ChunkSize: minChunkSize,
|
||||
RootDirectory: rootDirectory,
|
||||
Encrypt: encryptBool,
|
||||
Secure: secureBool,
|
||||
Endpoint: endpoint,
|
||||
}
|
||||
|
||||
return New(parameters)
|
||||
}
|
||||
|
||||
// Skip OSS storage driver tests if environment variable parameters are not provided
|
||||
skipCheck = func() string {
|
||||
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||
return "Must set ALIYUN_ACCESS_KEY_ID, ALIYUN_ACCESS_KEY_SECRET, OSS_REGION, OSS_BUCKET, and OSS_ENCRYPT to run OSS tests"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||
return ossDriverConstructor(root)
|
||||
}, skipCheck)
|
||||
}
|
||||
|
||||
func TestEmptyRootList(t *testing.T) {
|
||||
if skipCheck() != "" {
|
||||
t.Skip(skipCheck())
|
||||
}
|
||||
|
||||
validRoot, err := ioutil.TempDir("", "driver-")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||
}
|
||||
defer os.Remove(validRoot)
|
||||
|
||||
rootedDriver, err := ossDriverConstructor(validRoot)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||
}
|
||||
|
||||
emptyRootDriver, err := ossDriverConstructor("")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||
}
|
||||
|
||||
slashRootDriver, err := ossDriverConstructor("/")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||
}
|
||||
|
||||
filename := "/test"
|
||||
contents := []byte("contents")
|
||||
ctx := context.Background()
|
||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating content: %v", err)
|
||||
}
|
||||
defer rootedDriver.Delete(ctx, filename)
|
||||
|
||||
keys, err := emptyRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
|
||||
keys, err = slashRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue