Delete S3 keys incrementally in batches
Instead of first collecting all keys and then batch deleting them, we will do the incremental delete _online_ per max allowed batch. Doing this prevents frequent allocations for large S3 keyspaces and OOM-kills that might happen as a result of those. This commit introduces storagedriver.Errors type that allows to return multierrors as a single error from any storage driver implementation. Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
parent
5fe693474e
commit
ebc4234fd5
3 changed files with 71 additions and 43 deletions
|
@ -15,6 +15,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -900,54 +901,71 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func min(a, b int) int {
|
|
||||||
if a < b {
|
|
||||||
return a
|
|
||||||
}
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||||
// We must be careful since S3 does not guarantee read after delete consistency
|
// We must be careful since S3 does not guarantee read after delete consistency
|
||||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
||||||
|
s3Path := d.s3Path(path)
|
||||||
// manually add the given path if it's a file
|
|
||||||
stat, err := d.Stat(ctx, path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if stat != nil && !stat.IsDir() {
|
|
||||||
path := d.s3Path(path)
|
|
||||||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
|
||||||
Key: &path,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// list objects under the given path as a subpath (suffix with slash "/")
|
|
||||||
s3Path := d.s3Path(path) + "/"
|
|
||||||
listObjectsInput := &s3.ListObjectsV2Input{
|
listObjectsInput := &s3.ListObjectsV2Input{
|
||||||
Bucket: aws.String(d.Bucket),
|
Bucket: aws.String(d.Bucket),
|
||||||
Prefix: aws.String(s3Path),
|
Prefix: aws.String(s3Path),
|
||||||
}
|
}
|
||||||
ListLoop:
|
|
||||||
for {
|
for {
|
||||||
// list all the objects
|
// list all the objects
|
||||||
resp, err := d.S3.ListObjectsV2(listObjectsInput)
|
resp, err := d.S3.ListObjectsV2(listObjectsInput)
|
||||||
|
|
||||||
// resp.Contents can only be empty on the first call
|
// resp.Contents can only be empty on the first call
|
||||||
// if there were no more results to return after the first call, resp.IsTruncated would have been false
|
// if there were no more results to return after the first call, resp.IsTruncated would have been false
|
||||||
// and the loop would be exited without recalling ListObjects
|
// and the loop would exit without recalling ListObjects
|
||||||
if err != nil || len(resp.Contents) == 0 {
|
if err != nil || len(resp.Contents) == 0 {
|
||||||
break ListLoop
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range resp.Contents {
|
for _, key := range resp.Contents {
|
||||||
|
// Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
|
||||||
|
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
||||||
Key: key.Key,
|
Key: key.Key,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete objects only if the list is not empty, otherwise S3 API returns a cryptic error
|
||||||
|
if len(s3Objects) > 0 {
|
||||||
|
// NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
|
||||||
|
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
|
||||||
|
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
|
||||||
|
// Delete here straight away and reset the object slice when successful.
|
||||||
|
resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
|
||||||
|
Bucket: aws.String(d.Bucket),
|
||||||
|
Delete: &s3.Delete{
|
||||||
|
Objects: s3Objects,
|
||||||
|
Quiet: aws.Bool(false),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Errors) > 0 {
|
||||||
|
// NOTE: AWS SDK s3.Error does not implement error interface which
|
||||||
|
// is pretty intensely sad, so we have to do away with this for now.
|
||||||
|
errs := make([]error, 0, len(resp.Errors))
|
||||||
|
for _, err := range resp.Errors {
|
||||||
|
errs = append(errs, errors.New(err.String()))
|
||||||
|
}
|
||||||
|
return storagedriver.Errors{
|
||||||
|
DriverName: driverName,
|
||||||
|
Errs: errs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// NOTE: we don't want to reallocate
|
||||||
|
// the slice so we simply "reset" it
|
||||||
|
s3Objects = s3Objects[:0]
|
||||||
|
|
||||||
// resp.Contents must have at least one element or we would have returned not found
|
// resp.Contents must have at least one element or we would have returned not found
|
||||||
listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key
|
listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key
|
||||||
|
|
||||||
|
@ -958,24 +976,6 @@ ListLoop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
total := len(s3Objects)
|
|
||||||
if total == 0 {
|
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
|
||||||
}
|
|
||||||
|
|
||||||
// need to chunk objects into groups of 1000 per s3 restrictions
|
|
||||||
for i := 0; i < total; i += 1000 {
|
|
||||||
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
|
|
||||||
Bucket: aws.String(d.Bucket),
|
|
||||||
Delete: &s3.Delete{
|
|
||||||
Objects: s3Objects[i:min(i+1000, total)],
|
|
||||||
Quiet: aws.Bool(false),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -782,6 +782,10 @@ func TestMoveWithMultipartCopy(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListObjectsV2(t *testing.T) {
|
func TestListObjectsV2(t *testing.T) {
|
||||||
|
if skipS3() != "" {
|
||||||
|
t.Skip(skipS3())
|
||||||
|
}
|
||||||
|
|
||||||
rootDir, err := ioutil.TempDir("", "driver-")
|
rootDir, err := ioutil.TempDir("", "driver-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
|
|
@ -169,3 +169,27 @@ type Error struct {
|
||||||
func (err Error) Error() string {
|
func (err Error) Error() string {
|
||||||
return fmt.Sprintf("%s: %s", err.DriverName, err.Enclosed)
|
return fmt.Sprintf("%s: %s", err.DriverName, err.Enclosed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Errors provides the envelope for multiple errors
|
||||||
|
// for use within the storagedriver implementations.
|
||||||
|
type Errors struct {
|
||||||
|
DriverName string
|
||||||
|
Errs []error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ error = Errors{}
|
||||||
|
|
||||||
|
func (e Errors) Error() string {
|
||||||
|
switch len(e.Errs) {
|
||||||
|
case 0:
|
||||||
|
return "<nil>"
|
||||||
|
case 1:
|
||||||
|
return e.Errs[0].Error()
|
||||||
|
default:
|
||||||
|
msg := "errors:\n"
|
||||||
|
for _, err := range e.Errs {
|
||||||
|
msg += err.Error() + "\n"
|
||||||
|
}
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue