Support writes to random offsets in Azure storage driver

Signed-off-by: Ahmet Alp Balkan <ahmetalpbalkan@gmail.com>
This commit is contained in:
Ahmet Alp Balkan 2015-02-03 15:29:00 -08:00
parent bc42f53ec8
commit e7485c831f
9 changed files with 1045 additions and 96 deletions

View file

@ -4,12 +4,10 @@ package azure
import ( import (
"bytes" "bytes"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"strings" "strings"
"time" "time"
@ -30,7 +28,7 @@ const (
// Driver is a storagedriver.StorageDriver implementation backed by // Driver is a storagedriver.StorageDriver implementation backed by
// Microsoft Azure Blob Storage Service. // Microsoft Azure Blob Storage Service.
type Driver struct { type Driver struct {
client *azure.BlobStorageClient client azure.BlobStorageClient
container string container string
} }
@ -79,7 +77,7 @@ func New(accountName, accountKey, container string) (*Driver, error) {
} }
return &Driver{ return &Driver{
client: blobClient, client: *blobClient,
container: container}, nil container: container}, nil
} }
@ -140,92 +138,22 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.ReadCloser at a location // WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path. // designated by the given path.
func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) { func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) {
if !storagedriver.PathRegexp.MatchString(path) {
return 0, storagedriver.InvalidPathError{Path: path}
}
var (
lastBlockNum int
resumableOffset int64
blocks []azure.Block
)
if blobExists, err := d.client.BlobExists(d.container, path); err != nil { if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
return 0, err return 0, err
} else if !blobExists { // new blob } else if !blobExists {
lastBlockNum = 0 err := d.client.CreateBlockBlob(d.container, path)
resumableOffset = 0 if err != nil {
} else { // append
if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil {
return 0, err return 0, err
} else if len(parts.CommittedBlocks) == 0 {
lastBlockNum = 0
resumableOffset = 0
} else {
lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1]
if lastBlockNum, err = fromBlockID(lastBlock.Name); err != nil {
return 0, fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
}
var totalSize int64
for _, v := range parts.CommittedBlocks {
blocks = append(blocks, azure.Block{
Id: v.Name,
Status: azure.BlockStatusCommitted})
totalSize += int64(v.Size)
}
// NOTE: Azure driver currently supports only append mode (resumable
// index is exactly where the committed blocks of the blob end).
// In order to support writing to offsets other than last index,
// adjacent blocks overlapping with the [offset:offset+size] area
// must be fetched, splitted and should be overwritten accordingly.
// As the current use of this method is append only, that implementation
// is omitted.
resumableOffset = totalSize
} }
} }
if offset < 0 {
if offset < resumableOffset {
// only writing at the end or after the end of the file is supported
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
} else if offset > resumableOffset {
// zero-fill in between, construct a multi-reader
zeroReader := bytes.NewReader(make([]byte, offset-resumableOffset))
reader = io.MultiReader(zeroReader, reader)
} }
// Put content bs := newAzureBlockStorage(d.client)
var nn int64 bw := newRandomBlobWriter(&bs, azure.MaxBlobBlockSize)
buf := make([]byte, azure.MaxBlobBlockSize) zw := newZeroFillWriter(&bw)
for { return zw.Write(d.container, path, offset, reader)
// Read chunks of exactly size N except the last chunk to
// maximize block size and minimize block count.
n, err := io.ReadFull(reader, buf)
if err == io.EOF {
break
}
nn += int64(n)
data := buf[:n]
lastBlockNum++
blockID := toBlockID(lastBlockNum)
if err = d.client.PutBlock(d.container, path, blockID, data); err != nil {
return 0, err
}
blocks = append(blocks, azure.Block{
Id: blockID,
Status: azure.BlockStatusLatest})
}
// If there was a zero-fill, adjust nn to exclude zeros
if offset > resumableOffset {
nn -= offset - resumableOffset
}
// Commit block list
return nn, d.client.PutBlockList(d.container, path, blocks)
} }
// Stat retrieves the FileInfo for the given path, including the current size // Stat retrieves the FileInfo for the given path, including the current size
@ -436,17 +364,3 @@ func is404(err error) bool {
e, ok := err.(azure.StorageServiceError) e, ok := err.(azure.StorageServiceError)
return ok && e.StatusCode == 404 return ok && e.StatusCode == 404
} }
func fromBlockID(b64Name string) (int, error) {
s, err := base64.StdEncoding.DecodeString(b64Name)
if err != nil {
return 0, err
}
return strconv.Atoi(string(s))
}
func toBlockID(i int) string {
s := fmt.Sprintf("%010d", i) // add zero padding
return base64.StdEncoding.EncodeToString([]byte(s))
}

View file

@ -0,0 +1,24 @@
package azure
import (
"fmt"
"io"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
// azureBlockStorage is adaptor between azure.BlobStorageClient and
// blockStorage interface.
type azureBlockStorage struct {
azure.BlobStorageClient
}
func (b *azureBlockStorage) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) {
return b.BlobStorageClient.GetBlobRange(container, blob, fmt.Sprintf("%v-%v", start, start+length-1))
}
func newAzureBlockStorage(b azure.BlobStorageClient) azureBlockStorage {
a := azureBlockStorage{}
a.BlobStorageClient = b
return a
}

View file

@ -0,0 +1,155 @@
package azure
import (
"bytes"
"fmt"
"io"
"io/ioutil"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
type StorageSimulator struct {
blobs map[string]*BlockBlob
}
type BlockBlob struct {
blocks map[string]*DataBlock
blockList []string
}
type DataBlock struct {
data []byte
committed bool
}
func (s *StorageSimulator) path(container, blob string) string {
return fmt.Sprintf("%s/%s", container, blob)
}
func (s *StorageSimulator) BlobExists(container, blob string) (bool, error) {
_, ok := s.blobs[s.path(container, blob)]
return ok, nil
}
func (s *StorageSimulator) GetBlob(container, blob string) (io.ReadCloser, error) {
bb, ok := s.blobs[s.path(container, blob)]
if !ok {
return nil, fmt.Errorf("blob not found")
}
var readers []io.Reader
for _, bID := range bb.blockList {
readers = append(readers, bytes.NewReader(bb.blocks[bID].data))
}
return ioutil.NopCloser(io.MultiReader(readers...)), nil
}
func (s *StorageSimulator) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) {
r, err := s.GetBlob(container, blob)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return ioutil.NopCloser(bytes.NewReader(b[start : start+length])), nil
}
func (s *StorageSimulator) CreateBlockBlob(container, blob string) error {
path := s.path(container, blob)
bb := &BlockBlob{
blocks: make(map[string]*DataBlock),
blockList: []string{},
}
s.blobs[path] = bb
return nil
}
func (s *StorageSimulator) PutBlock(container, blob, blockID string, chunk []byte) error {
path := s.path(container, blob)
bb, ok := s.blobs[path]
if !ok {
return fmt.Errorf("blob not found")
}
data := make([]byte, len(chunk))
copy(data, chunk)
bb.blocks[blockID] = &DataBlock{data: data, committed: false} // add block to blob
return nil
}
func (s *StorageSimulator) GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error) {
resp := azure.BlockListResponse{}
bb, ok := s.blobs[s.path(container, blob)]
if !ok {
return resp, fmt.Errorf("blob not found")
}
// Iterate committed blocks (in order)
if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted {
for _, blockID := range bb.blockList {
b := bb.blocks[blockID]
block := azure.BlockResponse{
Name: blockID,
Size: int64(len(b.data)),
}
resp.CommittedBlocks = append(resp.CommittedBlocks, block)
}
}
// Iterate uncommitted blocks (in no order)
if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted {
for blockID, b := range bb.blocks {
block := azure.BlockResponse{
Name: blockID,
Size: int64(len(b.data)),
}
if !b.committed {
resp.UncommittedBlocks = append(resp.UncommittedBlocks, block)
}
}
}
return resp, nil
}
func (s *StorageSimulator) PutBlockList(container, blob string, blocks []azure.Block) error {
bb, ok := s.blobs[s.path(container, blob)]
if !ok {
return fmt.Errorf("blob not found")
}
var blockIDs []string
for _, v := range blocks {
bl, ok := bb.blocks[v.Id]
if !ok { // check if block ID exists
return fmt.Errorf("Block id '%s' not found", v.Id)
}
bl.committed = true
blockIDs = append(blockIDs, v.Id)
}
// Mark all other blocks uncommitted
for k, b := range bb.blocks {
inList := false
for _, v := range blockIDs {
if k == v {
inList = true
break
}
}
if !inList {
b.committed = false
}
}
bb.blockList = blockIDs
return nil
}
func NewStorageSimulator() StorageSimulator {
return StorageSimulator{
blobs: make(map[string]*BlockBlob),
}
}

View file

@ -0,0 +1,60 @@
package azure
import (
"encoding/base64"
"fmt"
"math/rand"
"sync"
"time"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
type blockIDGenerator struct {
pool map[string]bool
r *rand.Rand
m sync.Mutex
}
// Generate returns an unused random block id and adds the generated ID
// to list of used IDs so that the same block name is not used again.
func (b *blockIDGenerator) Generate() string {
b.m.Lock()
defer b.m.Unlock()
var id string
for {
id = toBlockID(int(b.r.Int()))
if !b.exists(id) {
break
}
}
b.pool[id] = true
return id
}
func (b *blockIDGenerator) exists(id string) bool {
_, used := b.pool[id]
return used
}
func (b *blockIDGenerator) Feed(blocks azure.BlockListResponse) {
b.m.Lock()
defer b.m.Unlock()
for _, bl := range append(blocks.CommittedBlocks, blocks.UncommittedBlocks...) {
b.pool[bl.Name] = true
}
}
func newBlockIDGenerator() *blockIDGenerator {
return &blockIDGenerator{
pool: make(map[string]bool),
r: rand.New(rand.NewSource(time.Now().UnixNano()))}
}
// toBlockId converts given integer to base64-encoded block ID of a fixed length.
func toBlockID(i int) string {
s := fmt.Sprintf("%029d", i) // add zero padding for same length-blobs
return base64.StdEncoding.EncodeToString([]byte(s))
}

View file

@ -0,0 +1,74 @@
package azure
import (
"math"
"testing"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
func Test_blockIdGenerator(t *testing.T) {
r := newBlockIDGenerator()
for i := 1; i <= 10; i++ {
if expected := i - 1; len(r.pool) != expected {
t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected)
}
if id := r.Generate(); id == "" {
t.Fatal("returned empty id")
}
if expected := i; len(r.pool) != expected {
t.Fatalf("rand pool has wrong number of items: %d, expected:%d", len(r.pool), expected)
}
}
}
func Test_blockIdGenerator_Feed(t *testing.T) {
r := newBlockIDGenerator()
if expected := 0; len(r.pool) != expected {
t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected)
}
// feed empty list
blocks := azure.BlockListResponse{}
r.Feed(blocks)
if expected := 0; len(r.pool) != expected {
t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected)
}
// feed blocks
blocks = azure.BlockListResponse{
CommittedBlocks: []azure.BlockResponse{
{"1", 1},
{"2", 2},
},
UncommittedBlocks: []azure.BlockResponse{
{"3", 3},
}}
r.Feed(blocks)
if expected := 3; len(r.pool) != expected {
t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected)
}
// feed same block IDs with committed/uncommitted place changed
blocks = azure.BlockListResponse{
CommittedBlocks: []azure.BlockResponse{
{"3", 3},
},
UncommittedBlocks: []azure.BlockResponse{
{"1", 1},
}}
r.Feed(blocks)
if expected := 3; len(r.pool) != expected {
t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected)
}
}
func Test_toBlockId(t *testing.T) {
min := 0
max := math.MaxInt64
if len(toBlockID(min)) != len(toBlockID(max)) {
t.Fatalf("different-sized blockIDs are returned")
}
}

View file

@ -0,0 +1,208 @@
package azure
import (
"fmt"
"io"
"io/ioutil"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
// blockStorage is the interface required from a block storage service
// client implementation
type blockStorage interface {
CreateBlockBlob(container, blob string) error
GetBlob(container, blob string) (io.ReadCloser, error)
GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error)
PutBlock(container, blob, blockID string, chunk []byte) error
GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error)
PutBlockList(container, blob string, blocks []azure.Block) error
}
// randomBlobWriter enables random access semantics on Azure block blobs
// by enabling writing arbitrary length of chunks to arbitrary write offsets
// within the blob. Normally, Azure Blob Storage does not support random
// access semantics on block blobs; however, this writer can download, split and
// reupload the overlapping blocks and discards those being overwritten entirely.
type randomBlobWriter struct {
bs blockStorage
blockSize int
}
func newRandomBlobWriter(bs blockStorage, blockSize int) randomBlobWriter {
return randomBlobWriter{bs: bs, blockSize: blockSize}
}
// WriteBlobAt writes the given chunk to the specified position of an existing blob.
// The offset must be equals to size of the blob or smaller than it.
func (r *randomBlobWriter) WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) {
rand := newBlockIDGenerator()
blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted)
if err != nil {
return 0, err
}
rand.Feed(blocks) // load existing block IDs
// Check for write offset for existing blob
size := getBlobSize(blocks)
if offset < 0 || offset > size {
return 0, fmt.Errorf("wrong offset for Write: %v", offset)
}
// Upload the new chunk as blocks
blockList, nn, err := r.writeChunkToBlocks(container, blob, chunk, rand)
if err != nil {
return 0, err
}
// For non-append operations, existing blocks may need to be splitted
if offset != size {
// Split the block on the left end (if any)
leftBlocks, err := r.blocksLeftSide(container, blob, offset, rand)
if err != nil {
return 0, err
}
blockList = append(leftBlocks, blockList...)
// Split the block on the right end (if any)
rightBlocks, err := r.blocksRightSide(container, blob, offset, nn, rand)
if err != nil {
return 0, err
}
blockList = append(blockList, rightBlocks...)
} else {
// Use existing block list
var existingBlocks []azure.Block
for _, v := range blocks.CommittedBlocks {
existingBlocks = append(existingBlocks, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
}
blockList = append(existingBlocks, blockList...)
}
// Put block list
return nn, r.bs.PutBlockList(container, blob, blockList)
}
func (r *randomBlobWriter) GetSize(container, blob string) (int64, error) {
blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted)
if err != nil {
return 0, err
}
return getBlobSize(blocks), nil
}
// writeChunkToBlocks writes given chunk to one or multiple blocks within specified
// blob and returns their block representations. Those blocks are not committed, yet
func (r *randomBlobWriter) writeChunkToBlocks(container, blob string, chunk io.Reader, rand *blockIDGenerator) ([]azure.Block, int64, error) {
var newBlocks []azure.Block
var nn int64
// Read chunks of at most size N except the last chunk to
// maximize block size and minimize block count.
buf := make([]byte, r.blockSize)
for {
n, err := io.ReadFull(chunk, buf)
if err == io.EOF {
break
}
nn += int64(n)
data := buf[:n]
blockID := rand.Generate()
if err := r.bs.PutBlock(container, blob, blockID, data); err != nil {
return newBlocks, nn, err
}
newBlocks = append(newBlocks, azure.Block{Id: blockID, Status: azure.BlockStatusUncommitted})
}
return newBlocks, nn, nil
}
// blocksLeftSide returns the blocks that are going to be at the left side of
// the writeOffset: [0, writeOffset) by identifying blocks that will remain
// the same and splitting blocks and reuploading them as needed.
func (r *randomBlobWriter) blocksLeftSide(container, blob string, writeOffset int64, rand *blockIDGenerator) ([]azure.Block, error) {
var left []azure.Block
bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll)
if err != nil {
return left, err
}
o := writeOffset
elapsed := int64(0)
for _, v := range bx.CommittedBlocks {
blkSize := int64(v.Size)
if o >= blkSize { // use existing block
left = append(left, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
o -= blkSize
elapsed += blkSize
} else if o > 0 { // current block needs to be splitted
start := elapsed
size := o
part, err := r.bs.GetSectionReader(container, blob, start, size)
if err != nil {
return left, err
}
newBlockID := rand.Generate()
data, err := ioutil.ReadAll(part)
if err != nil {
return left, err
}
if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil {
return left, err
}
left = append(left, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted})
break
}
}
return left, nil
}
// blocksRightSide returns the blocks that are going to be at the right side of
// the written chunk: [writeOffset+size, +inf) by identifying blocks that will remain
// the same and splitting blocks and reuploading them as needed.
func (r *randomBlobWriter) blocksRightSide(container, blob string, writeOffset int64, chunkSize int64, rand *blockIDGenerator) ([]azure.Block, error) {
var right []azure.Block
bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll)
if err != nil {
return nil, err
}
re := writeOffset + chunkSize - 1 // right end of written chunk
var elapsed int64
for _, v := range bx.CommittedBlocks {
var (
bs = elapsed // left end of current block
be = elapsed + int64(v.Size) - 1 // right end of current block
)
if bs > re { // take the block as is
right = append(right, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
} else if be > re { // current block needs to be splitted
part, err := r.bs.GetSectionReader(container, blob, re+1, be-(re+1)+1)
if err != nil {
return right, err
}
newBlockID := rand.Generate()
data, err := ioutil.ReadAll(part)
if err != nil {
return right, err
}
if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil {
return right, err
}
right = append(right, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted})
}
elapsed += int64(v.Size)
}
return right, nil
}
func getBlobSize(blocks azure.BlockListResponse) int64 {
var n int64
for _, v := range blocks.CommittedBlocks {
n += int64(v.Size)
}
return n
}

View file

@ -0,0 +1,339 @@
package azure
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"reflect"
"strings"
"testing"
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
)
func TestRandomWriter_writeChunkToBlocks(t *testing.T) {
s := NewStorageSimulator()
rw := newRandomBlobWriter(&s, 3)
rand := newBlockIDGenerator()
c := []byte("AAABBBCCCD")
if err := rw.bs.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
bw, nn, err := rw.writeChunkToBlocks("a", "b", bytes.NewReader(c), rand)
if err != nil {
t.Fatal(err)
}
if expected := int64(len(c)); nn != expected {
t.Fatalf("wrong nn:%v, expected:%v", nn, expected)
}
if expected := 4; len(bw) != expected {
t.Fatal("unexpected written block count")
}
bx, err := s.GetBlockList("a", "b", azure.BlockListTypeAll)
if err != nil {
t.Fatal(err)
}
if expected := 0; len(bx.CommittedBlocks) != expected {
t.Fatal("unexpected committed block count")
}
if expected := 4; len(bx.UncommittedBlocks) != expected {
t.Fatalf("unexpected uncommitted block count: %d -- %#v", len(bx.UncommittedBlocks), bx)
}
if err := rw.bs.PutBlockList("a", "b", bw); err != nil {
t.Fatal(err)
}
r, err := rw.bs.GetBlob("a", "b")
if err != nil {
t.Fatal(err)
}
assertBlobContents(t, r, c)
}
func TestRandomWriter_blocksLeftSide(t *testing.T) {
blob := "AAAAABBBBBCCC"
cases := []struct {
offset int64
expectedBlob string
expectedPattern []azure.BlockStatus
}{
{0, "", []azure.BlockStatus{}}, // write to beginning, discard all
{13, blob, []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to end, no change
{1, "A", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // write at 1
{5, "AAAAA", []azure.BlockStatus{azure.BlockStatusCommitted}}, // write just after first block
{6, "AAAAAB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // split the second block
{9, "AAAAABBBB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // write just after first block
}
for _, c := range cases {
s := NewStorageSimulator()
rw := newRandomBlobWriter(&s, 5)
rand := newBlockIDGenerator()
if err := rw.bs.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand)
if err != nil {
t.Fatal(err)
}
if err := rw.bs.PutBlockList("a", "b", bw); err != nil {
t.Fatal(err)
}
bx, err := rw.blocksLeftSide("a", "b", c.offset, rand)
if err != nil {
t.Fatal(err)
}
bs := []azure.BlockStatus{}
for _, v := range bx {
bs = append(bs, v.Status)
}
if !reflect.DeepEqual(bs, c.expectedPattern) {
t.Logf("Committed blocks %v", bw)
t.Fatalf("For offset %v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.expectedPattern, bs, bx)
}
if rw.bs.PutBlockList("a", "b", bx); err != nil {
t.Fatal(err)
}
r, err := rw.bs.GetBlob("a", "b")
if err != nil {
t.Fatal(err)
}
cout, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
outBlob := string(cout)
if outBlob != c.expectedBlob {
t.Fatalf("wrong blob contents: %v, expected: %v", outBlob, c.expectedBlob)
}
}
}
func TestRandomWriter_blocksRightSide(t *testing.T) {
blob := "AAAAABBBBBCCC"
cases := []struct {
offset int64
size int64
expectedBlob string
expectedPattern []azure.BlockStatus
}{
{0, 100, "", []azure.BlockStatus{}}, // overwrite the entire blob
{0, 3, "AABBBBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // split first block
{4, 1, "BBBBBCCC", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to last char of first block
{1, 6, "BBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted}}, // overwrite splits first and second block, last block remains
{3, 8, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite a block in middle block, split end block
{10, 1, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite first byte of rightmost block
{11, 2, "", []azure.BlockStatus{}}, // overwrite the rightmost index
{13, 20, "", []azure.BlockStatus{}}, // append to the end
}
for _, c := range cases {
s := NewStorageSimulator()
rw := newRandomBlobWriter(&s, 5)
rand := newBlockIDGenerator()
if err := rw.bs.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand)
if err != nil {
t.Fatal(err)
}
if err := rw.bs.PutBlockList("a", "b", bw); err != nil {
t.Fatal(err)
}
bx, err := rw.blocksRightSide("a", "b", c.offset, c.size, rand)
if err != nil {
t.Fatal(err)
}
bs := []azure.BlockStatus{}
for _, v := range bx {
bs = append(bs, v.Status)
}
if !reflect.DeepEqual(bs, c.expectedPattern) {
t.Logf("Committed blocks %v", bw)
t.Fatalf("For offset %v-size:%v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.size, c.expectedPattern, bs, bx)
}
if rw.bs.PutBlockList("a", "b", bx); err != nil {
t.Fatal(err)
}
r, err := rw.bs.GetBlob("a", "b")
if err != nil {
t.Fatal(err)
}
cout, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
outBlob := string(cout)
if outBlob != c.expectedBlob {
t.Fatalf("For offset %v-size:%v: wrong blob contents: %v, expected: %v", c.offset, c.size, outBlob, c.expectedBlob)
}
}
}
func TestRandomWriter_Write_NewBlob(t *testing.T) {
var (
s = NewStorageSimulator()
rw = newRandomBlobWriter(&s, 1024*3) // 3 KB blocks
blob = randomContents(1024 * 7) // 7 KB blob
)
if err := rw.bs.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
if _, err := rw.WriteBlobAt("a", "b", 10, bytes.NewReader(blob)); err == nil {
t.Fatal("expected error, got nil")
}
if _, err := rw.WriteBlobAt("a", "b", 100000, bytes.NewReader(blob)); err == nil {
t.Fatal("expected error, got nil")
}
if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(blob)); err != nil {
t.Fatal(err)
} else if expected := int64(len(blob)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := rw.bs.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, blob)
}
if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil {
t.Fatal(err)
} else if len(bx.CommittedBlocks) != 3 {
t.Fatalf("got wrong number of committed blocks: %v", len(bx.CommittedBlocks))
}
// Replace first 512 bytes
leftChunk := randomContents(512)
blob = append(leftChunk, blob[512:]...)
if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(leftChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(leftChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := rw.bs.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, blob)
}
if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil {
t.Fatal(err)
} else if expected := 4; len(bx.CommittedBlocks) != expected {
t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected)
}
// Replace last 512 bytes with 1024 bytes
rightChunk := randomContents(1024)
offset := int64(len(blob) - 512)
blob = append(blob[:offset], rightChunk...)
if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(rightChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(rightChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := rw.bs.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, blob)
}
if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil {
t.Fatal(err)
} else if expected := 5; len(bx.CommittedBlocks) != expected {
t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected)
}
// Replace 2K-4K (overlaps 2 blocks from L/R)
newChunk := randomContents(1024 * 2)
offset = 1024 * 2
blob = append(append(blob[:offset], newChunk...), blob[offset+int64(len(newChunk)):]...)
if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(newChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(newChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := rw.bs.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, blob)
}
if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil {
t.Fatal(err)
} else if expected := 6; len(bx.CommittedBlocks) != expected {
t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks)
}
// Replace the entire blob
newBlob := randomContents(1024 * 30)
if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(newBlob)); err != nil {
t.Fatal(err)
} else if expected := int64(len(newBlob)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := rw.bs.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, newBlob)
}
if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil {
t.Fatal(err)
} else if expected := 10; len(bx.CommittedBlocks) != expected {
t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks)
} else if expected, size := int64(1024*30), getBlobSize(bx); size != expected {
t.Fatalf("committed block size does not indicate blob size")
}
}
func Test_getBlobSize(t *testing.T) {
// with some committed blocks
if expected, size := int64(151), getBlobSize(azure.BlockListResponse{
CommittedBlocks: []azure.BlockResponse{
{"A", 100},
{"B", 50},
{"C", 1},
},
UncommittedBlocks: []azure.BlockResponse{
{"D", 200},
}}); expected != size {
t.Fatalf("wrong blob size: %v, expected: %v", size, expected)
}
// with no committed blocks
if expected, size := int64(0), getBlobSize(azure.BlockListResponse{
UncommittedBlocks: []azure.BlockResponse{
{"A", 100},
{"B", 50},
{"C", 1},
{"D", 200},
}}); expected != size {
t.Fatalf("wrong blob size: %v, expected: %v", size, expected)
}
}
func assertBlobContents(t *testing.T, r io.Reader, expected []byte) {
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("wrong blob contents. size: %v, expected: %v", len(out), len(expected))
}
}
func randomContents(length int64) []byte {
b := make([]byte, length)
for i := range b {
b[i] = byte(rand.Intn(2 << 8))
}
return b
}

View file

@ -0,0 +1,49 @@
package azure
import (
"bytes"
"io"
)
type blockBlobWriter interface {
GetSize(container, blob string) (int64, error)
WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error)
}
// zeroFillWriter enables writing to an offset outside a block blob's size
// by offering the chunk to the underlying writer as a contiguous data with
// the gap in between filled with NUL (zero) bytes.
type zeroFillWriter struct {
blockBlobWriter
}
func newZeroFillWriter(b blockBlobWriter) zeroFillWriter {
w := zeroFillWriter{}
w.blockBlobWriter = b
return w
}
// Write writes the given chunk to the specified existing blob even though
// offset is out of blob's size. The gaps are filled with zeros. Returned
// written number count does not include zeros written.
func (z *zeroFillWriter) Write(container, blob string, offset int64, chunk io.Reader) (int64, error) {
size, err := z.blockBlobWriter.GetSize(container, blob)
if err != nil {
return 0, err
}
var reader io.Reader
var zeroPadding int64
if offset <= size {
reader = chunk
} else {
zeroPadding = offset - size
offset = size // adjust offset to be the append index
zeros := bytes.NewReader(make([]byte, zeroPadding))
reader = io.MultiReader(zeros, chunk)
}
nn, err := z.blockBlobWriter.WriteBlobAt(container, blob, offset, reader)
nn -= zeroPadding
return nn, err
}

View file

@ -0,0 +1,126 @@
package azure
import (
"bytes"
"testing"
)
func Test_zeroFillWrite_AppendNoGap(t *testing.T) {
s := NewStorageSimulator()
bw := newRandomBlobWriter(&s, 1024*1)
zw := newZeroFillWriter(&bw)
if err := s.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
firstChunk := randomContents(1024*3 + 512)
if nn, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(firstChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, firstChunk)
}
secondChunk := randomContents(256)
if nn, err := zw.Write("a", "b", int64(len(firstChunk)), bytes.NewReader(secondChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(secondChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, append(firstChunk, secondChunk...))
}
}
func Test_zeroFillWrite_StartWithGap(t *testing.T) {
s := NewStorageSimulator()
bw := newRandomBlobWriter(&s, 1024*2)
zw := newZeroFillWriter(&bw)
if err := s.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
chunk := randomContents(1024 * 5)
padding := int64(1024*2 + 256)
if nn, err := zw.Write("a", "b", padding, bytes.NewReader(chunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(chunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, append(make([]byte, padding), chunk...))
}
}
func Test_zeroFillWrite_AppendWithGap(t *testing.T) {
s := NewStorageSimulator()
bw := newRandomBlobWriter(&s, 1024*2)
zw := newZeroFillWriter(&bw)
if err := s.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
firstChunk := randomContents(1024*3 + 512)
if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil {
t.Fatal(err)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, firstChunk)
}
secondChunk := randomContents(256)
padding := int64(1024 * 4)
if nn, err := zw.Write("a", "b", int64(len(firstChunk))+padding, bytes.NewReader(secondChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(secondChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, append(firstChunk, append(make([]byte, padding), secondChunk...)...))
}
}
func Test_zeroFillWrite_LiesWithinSize(t *testing.T) {
s := NewStorageSimulator()
bw := newRandomBlobWriter(&s, 1024*2)
zw := newZeroFillWriter(&bw)
if err := s.CreateBlockBlob("a", "b"); err != nil {
t.Fatal(err)
}
firstChunk := randomContents(1024 * 3)
if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil {
t.Fatal(err)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, firstChunk)
}
// in this case, zerofill won't be used
secondChunk := randomContents(256)
if nn, err := zw.Write("a", "b", 0, bytes.NewReader(secondChunk)); err != nil {
t.Fatal(err)
} else if expected := int64(len(secondChunk)); expected != nn {
t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected)
}
if out, err := s.GetBlob("a", "b"); err != nil {
t.Fatal(err)
} else {
assertBlobContents(t, out, append(secondChunk, firstChunk[len(secondChunk):]...))
}
}