Swift driver now bulk deletes in chunks specified by the server (#1915)

Swift driver now bulk deletes in chunks specified by the server

Signed-off-by: Matthew Green <matthew.green@uk.ibm.com>
This commit is contained in:
Matthew Green 2016-08-24 18:09:25 +01:00 committed by Richard Scothern
parent c24e10f70a
commit dea554fc7c
2 changed files with 108 additions and 17 deletions

View file

@ -91,6 +91,9 @@ type swiftInfo struct {
Tempurl struct {
Methods []string `mapstructure:"methods"`
}
BulkDelete struct {
MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"`
} `mapstructure:"bulk_delete"`
}
func init() {
@ -109,6 +112,7 @@ type driver struct {
Container string
Prefix string
BulkDeleteSupport bool
BulkDeleteMaxDeletes int
ChunkSize int
SecretKey string
AccessKey string
@ -221,6 +225,9 @@ func New(params Parameters) (*Driver, error) {
if err := mapstructure.Decode(config, &info); err == nil {
d.TempURLContainerKey = info.Swift.Version >= "2.3.0"
d.TempURLMethods = info.Tempurl.Methods
if d.BulkDeleteSupport {
d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest
}
}
} else {
d.TempURLContainerKey = params.TempURLContainerKey
@ -532,12 +539,18 @@ func (d *driver) Delete(ctx context.Context, path string) error {
}
}
if d.BulkDeleteSupport && len(objects) > 0 {
if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 {
filenames := make([]string, len(objects))
for i, obj := range objects {
filenames[i] = obj.Name
}
_, err = d.Conn.BulkDelete(d.Container, filenames)
chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes)
if err != nil {
return err
}
for _, chunk := range chunks {
_, err := d.Conn.BulkDelete(d.Container, chunk)
// Don't fail on ObjectNotFound because eventual consistency
// makes this situation normal.
if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
@ -546,6 +559,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
}
return err
}
}
} else {
for _, obj := range objects {
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
@ -712,6 +726,21 @@ func (d *driver) createManifest(path string, segments string) error {
return nil
}
func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) {
if maxSize > 0 {
for offset := 0; offset < len(slice); offset += maxSize {
chunkSize := maxSize
if offset+chunkSize > len(slice) {
chunkSize = len(slice) - offset
}
chunks = append(chunks, slice[offset:offset+chunkSize])
}
} else {
return nil, fmt.Errorf("Max chunk size must be > 0")
}
return
}
func parseManifest(manifest string) (container string, prefix string) {
components := strings.SplitN(manifest, "/", 2)
container = components[0]

View file

@ -3,6 +3,7 @@ package swift
import (
"io/ioutil"
"os"
"reflect"
"strconv"
"strings"
"testing"
@ -181,3 +182,64 @@ func TestEmptyRootList(t *testing.T) {
t.Fatal("delete did not remove nested objects")
}
}
func TestFilenameChunking(t *testing.T) {
// Test valid input and sizes
input := []string{"a", "b", "c", "d", "e"}
expecteds := [][][]string{
{
{"a"},
{"b"},
{"c"},
{"d"},
{"e"},
},
{
{"a", "b"},
{"c", "d"},
{"e"},
},
{
{"a", "b", "c"},
{"d", "e"},
},
{
{"a", "b", "c", "d"},
{"e"},
},
{
{"a", "b", "c", "d", "e"},
},
{
{"a", "b", "c", "d", "e"},
},
}
for i, expected := range expecteds {
actual, err := chunkFilenames(input, i+1)
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("chunk %v didn't match expected value %v", actual, expected)
}
if err != nil {
t.Fatalf("unexpected error chunking filenames: %v", err)
}
}
// Test nil input
actual, err := chunkFilenames(nil, 5)
if len(actual) != 0 {
t.Fatal("chunks were returned when passed nil")
}
if err != nil {
t.Fatalf("unexpected error chunking filenames: %v", err)
}
// Test 0 and < 0 sizes
actual, err = chunkFilenames(nil, 0)
if err == nil {
t.Fatal("expected error for size = 0")
}
actual, err = chunkFilenames(nil, -1)
if err == nil {
t.Fatal("expected error for size = -1")
}
}