From 35b29a609ed0bf97f7cc5e101b27bc53fcf44f9e Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 29 Nov 2017 12:26:28 -0800 Subject: [PATCH] Use the New Driver Walk method for catalog enumeration This changes the Walk Method used for catalog enumeration. Just to show how much an effect this has on our s3 storage: Original: List calls: 6839 real 3m16.636s user 0m0.000s sys 0m0.016s New: ListObjectsV2 Calls: 1805 real 0m49.970s user 0m0.008s sys 0m0.000s This is because it no longer performs a list and stat per item, and instead is able to use the metadata gained from the list as a replacement to stat. Signed-off-by: Sargun Dhillon --- registry/storage/catalog.go | 19 ++-- registry/storage/driver/s3-aws/s3.go | 129 ++++++++++++++++++++++++++- 2 files changed, 136 insertions(+), 12 deletions(-) diff --git a/registry/storage/catalog.go b/registry/storage/catalog.go index 21950980..4d4149ad 100644 --- a/registry/storage/catalog.go +++ b/registry/storage/catalog.go @@ -18,6 +18,7 @@ var errFinishedWalk = errors.New("finished walk") // Because it's a quite expensive operation, it should only be used when building up // an initial set of repositories. func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) { + var finishedWalk bool var foundRepos []string if len(repos) == 0 { @@ -29,7 +30,7 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri return 0, err } - err = Walk(ctx, reg.blobStore.driver, root, func(fileInfo driver.FileInfo) error { + err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error { err := handleRepository(fileInfo, root, last, func(repoPath string) error { foundRepos = append(foundRepos, repoPath) return nil @@ -40,7 +41,8 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri // if we've filled our array, no need to walk any further if len(foundRepos) == len(repos) { - return errFinishedWalk + finishedWalk = true + return driver.ErrSkipDir } return nil @@ -48,14 +50,11 @@ func (reg *registry) Repositories(ctx context.Context, repos []string, last stri n = copy(repos, foundRepos) - switch err { - case nil: - // nil means that we completed walk and didn't fill buffer. No more - // records are available. - err = io.EOF - case errFinishedWalk: - // more records are available. - err = nil + if err != nil { + return n, err + } else if !finishedWalk { + // We didn't fill buffer. No more records are available. + return n, io.EOF } return n, err diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index f9a60653..0c6f7e5e 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -34,6 +34,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + dcontext "github.com/docker/distribution/context" "github.com/docker/distribution/registry/client/transport" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" @@ -876,8 +877,132 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int // Walk traverses a filesystem defined within driver, starting // from the given path, calling f on each file -func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { - return storagedriver.WalkFallback(ctx, d, path, f) +func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) error { + path := from + if !strings.HasSuffix(path, "/") { + path = path + "/" + } + + prefix := "" + if d.s3Path("") == "" { + prefix = "/" + } + + var objectCount int64 + if err := d.doWalk(ctx, &objectCount, d.s3Path(path), prefix, f); err != nil { + return err + } + + // S3 doesn't have the concept of empty directories, so it'll return path not found if there are no objects + if objectCount == 0 { + return storagedriver.PathNotFoundError{Path: from} + } + + return nil +} + +type walkInfoContainer struct { + storagedriver.FileInfoFields + prefix *string +} + +// Path provides the full path of the target of this file info. +func (wi walkInfoContainer) Path() string { + return wi.FileInfoFields.Path +} + +// Size returns current length in bytes of the file. The return value can +// be used to write to the end of the file at path. The value is +// meaningless if IsDir returns true. +func (wi walkInfoContainer) Size() int64 { + return wi.FileInfoFields.Size +} + +// ModTime returns the modification time for the file. For backends that +// don't have a modification time, the creation time should be returned. +func (wi walkInfoContainer) ModTime() time.Time { + return wi.FileInfoFields.ModTime +} + +// IsDir returns true if the path is a directory. +func (wi walkInfoContainer) IsDir() bool { + return wi.FileInfoFields.IsDir +} + +func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error { + var retError error + + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(path), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(listMax), + } + + ctx, done := dcontext.WithTrace(parentCtx) + defer done("s3aws.ListObjectsV2Pages(%s)", path) + listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { + + *objectCount += *objects.KeyCount + walkInfos := make([]walkInfoContainer, 0, *objects.KeyCount) + + for _, dir := range objects.CommonPrefixes { + commonPrefix := *dir.Prefix + walkInfos = append(walkInfos, walkInfoContainer{ + prefix: dir.Prefix, + FileInfoFields: storagedriver.FileInfoFields{ + IsDir: true, + Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1), + }, + }) + } + + for _, file := range objects.Contents { + walkInfos = append(walkInfos, walkInfoContainer{ + FileInfoFields: storagedriver.FileInfoFields{ + IsDir: false, + Size: *file.Size, + ModTime: *file.LastModified, + Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1), + }, + }) + } + + sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path }) + + for _, walkInfo := range walkInfos { + err := f(walkInfo) + + if err == storagedriver.ErrSkipDir { + if walkInfo.IsDir() { + continue + } else { + break + } + } else if err != nil { + retError = err + return false + } + + if walkInfo.IsDir() { + if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil { + retError = err + return false + } + } + } + return true + }) + + if retError != nil { + return retError + } + + if listObjectErr != nil { + return listObjectErr + } + + return nil } func (d *driver) s3Path(path string) string {