Merge pull request #1157 from lebauce/swift-bulk-delete-fixes
Use bulk delete to remove segments in Swift driver
This commit is contained in:
commit
3b7e1a986a
2 changed files with 43 additions and 27 deletions
|
@ -629,19 +629,6 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(objects) > 0 && d.BulkDeleteSupport {
|
|
||||||
filenames := make([]string, len(objects))
|
|
||||||
for i, obj := range objects {
|
|
||||||
filenames[i] = obj.Name
|
|
||||||
}
|
|
||||||
if _, err := d.Conn.BulkDelete(d.Container, filenames); err != swift.Forbidden {
|
|
||||||
if err == swift.ContainerNotFound {
|
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, obj := range objects {
|
for _, obj := range objects {
|
||||||
if obj.PseudoDirectory {
|
if obj.PseudoDirectory {
|
||||||
continue
|
continue
|
||||||
|
@ -649,20 +636,12 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
||||||
manifest, ok := headers["X-Object-Manifest"]
|
manifest, ok := headers["X-Object-Manifest"]
|
||||||
if ok {
|
if ok {
|
||||||
segContainer, prefix := parseManifest(manifest)
|
_, prefix := parseManifest(manifest)
|
||||||
segments, err := d.getAllSegments(prefix)
|
segments, err := d.getAllSegments(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
objects = append(objects, segments...)
|
||||||
for _, s := range segments {
|
|
||||||
if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
|
|
||||||
if err == swift.ObjectNotFound {
|
|
||||||
return storagedriver.PathNotFoundError{Path: s.Name}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err == swift.ObjectNotFound {
|
if err == swift.ObjectNotFound {
|
||||||
|
@ -670,13 +649,31 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
|
if d.BulkDeleteSupport && len(objects) > 0 {
|
||||||
if err == swift.ObjectNotFound {
|
filenames := make([]string, len(objects))
|
||||||
return storagedriver.PathNotFoundError{Path: obj.Name}
|
for i, obj := range objects {
|
||||||
|
filenames[i] = obj.Name
|
||||||
|
}
|
||||||
|
_, err = d.Conn.BulkDelete(d.Container, filenames)
|
||||||
|
// Don't fail on ObjectNotFound because eventual consistency
|
||||||
|
// makes this situation normal.
|
||||||
|
if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
|
||||||
|
if err == swift.ContainerNotFound {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
for _, obj := range objects {
|
||||||
|
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
|
||||||
|
if err == swift.ObjectNotFound {
|
||||||
|
return storagedriver.PathNotFoundError{Path: obj.Name}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
|
_, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
|
||||||
|
|
|
@ -134,7 +134,6 @@ func TestEmptyRootList(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error creating content: %v", err)
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
}
|
}
|
||||||
defer rootedDriver.Delete(ctx, filename)
|
|
||||||
|
|
||||||
keys, err := emptyRootDriver.List(ctx, "/")
|
keys, err := emptyRootDriver.List(ctx, "/")
|
||||||
for _, path := range keys {
|
for _, path := range keys {
|
||||||
|
@ -149,4 +148,24 @@ func TestEmptyRootList(t *testing.T) {
|
||||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create an object with a path nested under the existing object
|
||||||
|
err = rootedDriver.PutContent(ctx, filename+"/file1", contents)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rootedDriver.Delete(ctx, filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to delete: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys, err = rootedDriver.List(ctx, "/")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to list objects after deletion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(keys) != 0 {
|
||||||
|
t.Fatal("delete did not remove nested objects")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue