Fix S3 Delete method's notion of subpaths
Deleting "/a" was deleting "/a/b" but also "/ab". Signed-off-by: Noah Treuhaft <noah.treuhaft@docker.com>
This commit is contained in:
parent
b89a594355
commit
76226c61a9
2 changed files with 55 additions and 2 deletions
|
@ -784,10 +784,12 @@ func min(a, b int) int {
|
||||||
// We must be careful since S3 does not guarantee read after delete consistency
|
// We must be careful since S3 does not guarantee read after delete consistency
|
||||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
||||||
|
s3Path := d.s3Path(path)
|
||||||
listObjectsInput := &s3.ListObjectsInput{
|
listObjectsInput := &s3.ListObjectsInput{
|
||||||
Bucket: aws.String(d.Bucket),
|
Bucket: aws.String(d.Bucket),
|
||||||
Prefix: aws.String(d.s3Path(path)),
|
Prefix: aws.String(s3Path),
|
||||||
}
|
}
|
||||||
|
ListLoop:
|
||||||
for {
|
for {
|
||||||
// list all the objects
|
// list all the objects
|
||||||
resp, err := d.S3.ListObjects(listObjectsInput)
|
resp, err := d.S3.ListObjects(listObjectsInput)
|
||||||
|
@ -800,6 +802,10 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range resp.Contents {
|
for _, key := range resp.Contents {
|
||||||
|
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
|
||||||
|
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
|
||||||
|
break ListLoop
|
||||||
|
}
|
||||||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
||||||
Key: key.Key,
|
Key: key.Key,
|
||||||
})
|
})
|
||||||
|
|
|
@ -15,9 +15,10 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/check.v1"
|
||||||
|
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
"gopkg.in/check.v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test hooks up gocheck into the "go test" runner.
|
// Test hooks up gocheck into the "go test" runner.
|
||||||
|
@ -717,6 +718,52 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) {
|
||||||
c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true)
|
c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDeleteOnlyDeletesSubpaths checks that deleting path A does not
|
||||||
|
// delete path B when A is a prefix of B but B is not a subpath of A (so that
|
||||||
|
// deleting "/a" does not delete "/ab"). This matters for services like S3 that
|
||||||
|
// do not implement directories.
|
||||||
|
func (suite *DriverSuite) TestDeleteOnlyDeletesSubpaths(c *check.C) {
|
||||||
|
dirname := randomPath(32)
|
||||||
|
filename := randomPath(32)
|
||||||
|
contents := randomContents(32)
|
||||||
|
|
||||||
|
defer suite.deletePath(c, firstPart(dirname))
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename), contents)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename+"suffix"), contents)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, dirname, filename), contents)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, dirname+"suffix", filename), contents)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, filename))
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename))
|
||||||
|
c.Assert(err, check.NotNil)
|
||||||
|
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
|
||||||
|
c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename+"suffix"))
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, dirname))
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, dirname, filename))
|
||||||
|
c.Assert(err, check.NotNil)
|
||||||
|
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
|
||||||
|
c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, dirname+"suffix", filename))
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
}
|
||||||
|
|
||||||
// TestStatCall runs verifies the implementation of the storagedriver's Stat call.
|
// TestStatCall runs verifies the implementation of the storagedriver's Stat call.
|
||||||
func (suite *DriverSuite) TestStatCall(c *check.C) {
|
func (suite *DriverSuite) TestStatCall(c *check.C) {
|
||||||
content := randomContents(4096)
|
content := randomContents(4096)
|
||||||
|
|
Loading…
Reference in a new issue