From bc42f53ec8298c1f17df155ffc88069c2ea582e9 Mon Sep 17 00:00:00 2001 From: Ahmet Alp Balkan Date: Fri, 16 Jan 2015 10:18:42 -0800 Subject: [PATCH] Fix azure storagedriver methods, implement Stat, URLFor Signed-off-by: Ahmet Alp Balkan --- cmd/registry-storagedriver-azure/main.go | 2 +- storagedriver/azure/azure.go | 162 ++++++++++++++++++----- storagedriver/azure/azure_test.go | 12 +- 3 files changed, 136 insertions(+), 40 deletions(-) diff --git a/cmd/registry-storagedriver-azure/main.go b/cmd/registry-storagedriver-azure/main.go index 17881b50..71b1faaf 100644 --- a/cmd/registry-storagedriver-azure/main.go +++ b/cmd/registry-storagedriver-azure/main.go @@ -14,7 +14,7 @@ import ( // An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient func main() { parametersBytes := []byte(os.Args[1]) - var parameters map[string]string + var parameters map[string]interface{} err := json.Unmarshal(parametersBytes, ¶meters) if err != nil { panic(err) diff --git a/storagedriver/azure/azure.go b/storagedriver/azure/azure.go index ee3230ff..995f0fca 100644 --- a/storagedriver/azure/azure.go +++ b/storagedriver/azure/azure.go @@ -1,5 +1,3 @@ -// +build ignore - // Package azure provides a storagedriver.StorageDriver implementation to // store blobs in Microsoft Azure Blob Storage Service. package azure @@ -10,8 +8,10 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "strconv" "strings" + "time" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" @@ -40,28 +40,28 @@ func init() { type azureDriverFactory struct{} -func (factory *azureDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { +func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { return FromParameters(parameters) } // FromParameters constructs a new Driver with a given parameters map. -func FromParameters(parameters map[string]string) (*Driver, error) { +func FromParameters(parameters map[string]interface{}) (*Driver, error) { accountName, ok := parameters[paramAccountName] - if !ok { + if !ok || fmt.Sprint(accountName) == "" { return nil, fmt.Errorf("No %s parameter provided", paramAccountName) } accountKey, ok := parameters[paramAccountKey] - if !ok { + if !ok || fmt.Sprint(accountKey) == "" { return nil, fmt.Errorf("No %s parameter provided", paramAccountKey) } container, ok := parameters[paramContainer] - if !ok { + if !ok || fmt.Sprint(container) == "" { return nil, fmt.Errorf("No %s parameter provided", paramContainer) } - return New(accountName, accountKey, container) + return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container)) } // New constructs a new Driver with the given Azure Storage Account credentials @@ -87,6 +87,10 @@ func New(accountName, accountKey, container string) (*Driver, error) { // GetContent retrieves the content stored at "path" as a []byte. func (d *Driver) GetContent(path string) ([]byte, error) { + if !storagedriver.PathRegexp.MatchString(path) { + return nil, storagedriver.InvalidPathError{Path: path} + } + blob, err := d.client.GetBlob(d.container, path) if err != nil { if is404(err) { @@ -100,25 +104,29 @@ func (d *Driver) GetContent(path string) ([]byte, error) { // PutContent stores the []byte content at a location designated by "path". func (d *Driver) PutContent(path string, contents []byte) error { + if !storagedriver.PathRegexp.MatchString(path) { + return storagedriver.InvalidPathError{Path: path} + } return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents))) } // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { +func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { return nil, storagedriver.PathNotFoundError{Path: path} } - size, err := d.CurrentSize(path) + info, err := d.client.GetBlobProperties(d.container, path) if err != nil { return nil, err } - if offset >= int64(size) { - return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + size := int64(info.ContentLength) + if offset >= size { + return ioutil.NopCloser(bytes.NewReader(nil)), nil } bytesRange := fmt.Sprintf("%v-", offset) @@ -131,7 +139,11 @@ func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { +func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) { + if !storagedriver.PathRegexp.MatchString(path) { + return 0, storagedriver.InvalidPathError{Path: path} + } + var ( lastBlockNum int resumableOffset int64 @@ -139,20 +151,20 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos ) if blobExists, err := d.client.BlobExists(d.container, path); err != nil { - return err + return 0, err } else if !blobExists { // new blob lastBlockNum = 0 resumableOffset = 0 } else { // append if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil { - return err + return 0, err } else if len(parts.CommittedBlocks) == 0 { lastBlockNum = 0 resumableOffset = 0 } else { lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1] - if lastBlockNum, err = blockNum(lastBlock.Name); err != nil { - return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) + if lastBlockNum, err = fromBlockID(lastBlock.Name); err != nil { + return 0, fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error()) } var totalSize int64 @@ -174,11 +186,17 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos } } - if offset != resumableOffset { - return storagedriver.InvalidOffsetError{Path: path, Offset: offset} + if offset < resumableOffset { + // only writing at the end or after the end of the file is supported + return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } else if offset > resumableOffset { + // zero-fill in between, construct a multi-reader + zeroReader := bytes.NewReader(make([]byte, offset-resumableOffset)) + reader = io.MultiReader(zeroReader, reader) } // Put content + var nn int64 buf := make([]byte, azure.MaxBlobBlockSize) for { // Read chunks of exactly size N except the last chunk to @@ -187,35 +205,89 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos if err == io.EOF { break } + nn += int64(n) data := buf[:n] - blockID := toBlockID(lastBlockNum + 1) + lastBlockNum++ + blockID := toBlockID(lastBlockNum) if err = d.client.PutBlock(d.container, path, blockID, data); err != nil { - return err + return 0, err } + blocks = append(blocks, azure.Block{ Id: blockID, Status: azure.BlockStatusLatest}) - lastBlockNum++ + } + + // If there was a zero-fill, adjust nn to exclude zeros + if offset > resumableOffset { + nn -= offset - resumableOffset } // Commit block list - return d.client.PutBlockList(d.container, path, blocks) + return nn, d.client.PutBlockList(d.container, path, blocks) } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(path string) (uint64, error) { - props, err := d.client.GetBlobProperties(d.container, path) - if err != nil { - return 0, err +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) { + if !storagedriver.PathRegexp.MatchString(path) { + return nil, storagedriver.InvalidPathError{Path: path} } - return props.ContentLength, nil + + // Check if the path is a blob + if ok, err := d.client.BlobExists(d.container, path); err != nil { + return nil, err + } else if ok { + blob, err := d.client.GetBlobProperties(d.container, path) + if err != nil { + return nil, err + } + + mtim, err := time.Parse(http.TimeFormat, blob.LastModified) + if err != nil { + return nil, err + } + + return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: int64(blob.ContentLength), + ModTime: mtim, + IsDir: false, + }}, nil + } + + // Check if path is a virtual container + virtContainerPath := path + if !strings.HasSuffix(virtContainerPath, "/") { + virtContainerPath += "/" + } + blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{ + Prefix: virtContainerPath, + MaxResults: 1, + }) + if err != nil { + return nil, err + } + if len(blobs.Blobs) > 0 { + // path is a virtual container + return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + IsDir: true, + }}, nil + } + + // path is not a blob or virtual container + return nil, storagedriver.PathNotFoundError{Path: path} } // List returns a list of the objects that are direct descendants of the given // path. func (d *Driver) List(path string) ([]string, error) { + if !storagedriver.PathRegexp.MatchString(path) && path != "/" { + return nil, storagedriver.InvalidPathError{Path: path} + } + if path == "/" { path = "" } @@ -232,6 +304,12 @@ func (d *Driver) List(path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the original // object. func (d *Driver) Move(sourcePath string, destPath string) error { + if !storagedriver.PathRegexp.MatchString(sourcePath) { + return storagedriver.InvalidPathError{Path: sourcePath} + } else if !storagedriver.PathRegexp.MatchString(destPath) { + return storagedriver.InvalidPathError{Path: destPath} + } + sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath) err := d.client.CopyBlob(d.container, destPath, sourceBlobURL) if err != nil { @@ -246,6 +324,10 @@ func (d *Driver) Move(sourcePath string, destPath string) error { // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(path string) error { + if !storagedriver.PathRegexp.MatchString(path) { + return storagedriver.InvalidPathError{Path: path} + } + ok, err := d.client.DeleteBlobIfExists(d.container, path) if err != nil { return err @@ -272,6 +354,21 @@ func (d *Driver) Delete(path string) error { return nil } +// URLFor returns a publicly accessible URL for the blob stored at given path +// for specified duration by making use of Azure Storage Shared Access Signatures (SAS). +// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info. +func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) { + expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration + expires, ok := options["expiry"] + if ok { + t, ok := expires.(time.Time) + if ok { + expiresTime = t + } + } + return d.client.GetBlobSASURI(d.container, path, expiresTime, "r") +} + // directDescendants will find direct descendants (blobs or virtual containers) // of from list of blob paths and will return their full paths. Elements in blobs // list must be prefixed with a "/" and @@ -340,7 +437,7 @@ func is404(err error) bool { return ok && e.StatusCode == 404 } -func blockNum(b64Name string) (int, error) { +func fromBlockID(b64Name string) (int, error) { s, err := base64.StdEncoding.DecodeString(b64Name) if err != nil { return 0, err @@ -350,5 +447,6 @@ func blockNum(b64Name string) (int, error) { } func toBlockID(i int) string { - return base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(i))) + s := fmt.Sprintf("%010d", i) // add zero padding + return base64.StdEncoding.EncodeToString([]byte(s)) } diff --git a/storagedriver/azure/azure_test.go b/storagedriver/azure/azure_test.go index 4e8ac59d..170e20f8 100644 --- a/storagedriver/azure/azure_test.go +++ b/storagedriver/azure/azure_test.go @@ -1,5 +1,3 @@ -// +build ignore - package azure import ( @@ -59,9 +57,9 @@ func init() { } testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck) - testsuites.RegisterIPCSuite(driverName, map[string]string{ - paramAccountName: accountName, - paramAccountKey: accountKey, - paramContainer: container, - }, skipCheck) + // testsuites.RegisterIPCSuite(driverName, map[string]string{ + // paramAccountName: accountName, + // paramAccountKey: accountKey, + // paramContainer: container, + // }, skipCheck) }