Merge pull request #1669 from RichardScothern/close-after-commit
Clean uploads
This commit is contained in:
commit
a1d7463d67
2 changed files with 29 additions and 3 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
|
"path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestWriteSeek tests that the current file size can be
|
// TestWriteSeek tests that the current file size can be
|
||||||
|
@ -83,6 +84,15 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||||
t.Fatalf("unexpected error during upload cancellation: %v", err)
|
t.Fatalf("unexpected error during upload cancellation: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the enclosing directory
|
||||||
|
uploadPath := path.Dir(blobUpload.(*blobWriter).path)
|
||||||
|
|
||||||
|
// ensure state was cleaned up
|
||||||
|
_, err = driver.List(ctx, uploadPath)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("files in upload path after cleanup")
|
||||||
|
}
|
||||||
|
|
||||||
// Do a resume, get unknown upload
|
// Do a resume, get unknown upload
|
||||||
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
|
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
|
||||||
if err != distribution.ErrBlobUploadUnknown {
|
if err != distribution.ErrBlobUploadUnknown {
|
||||||
|
@ -128,6 +138,13 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||||
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensure state was cleaned up
|
||||||
|
uploadPath = path.Dir(blobUpload.(*blobWriter).path)
|
||||||
|
_, err = driver.List(ctx, uploadPath)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("files in upload path after commit")
|
||||||
|
}
|
||||||
|
|
||||||
// After finishing an upload, it should no longer exist.
|
// After finishing an upload, it should no longer exist.
|
||||||
if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown {
|
if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown {
|
||||||
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
||||||
|
|
|
@ -18,8 +18,8 @@ var (
|
||||||
errResumableDigestNotAvailable = errors.New("resumable digest not available")
|
errResumableDigestNotAvailable = errors.New("resumable digest not available")
|
||||||
)
|
)
|
||||||
|
|
||||||
// layerWriter is used to control the various aspects of resumable
|
// blobWriter is used to control the various aspects of resumable
|
||||||
// layer upload. It implements the LayerUpload interface.
|
// blob upload.
|
||||||
type blobWriter struct {
|
type blobWriter struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
blobStore *linkedBlobStore
|
blobStore *linkedBlobStore
|
||||||
|
@ -34,6 +34,7 @@ type blobWriter struct {
|
||||||
path string
|
path string
|
||||||
|
|
||||||
resumableDigestEnabled bool
|
resumableDigestEnabled bool
|
||||||
|
committed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ distribution.BlobWriter = &blobWriter{}
|
var _ distribution.BlobWriter = &blobWriter{}
|
||||||
|
@ -80,6 +81,7 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bw.committed = true
|
||||||
return canonical, nil
|
return canonical, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,11 +93,14 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := bw.Close(); err != nil {
|
||||||
|
context.GetLogger(ctx).Errorf("error closing blobwriter: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := bw.removeResources(ctx); err != nil {
|
if err := bw.removeResources(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
bw.Close()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,6 +137,10 @@ func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bw *blobWriter) Close() error {
|
func (bw *blobWriter) Close() error {
|
||||||
|
if bw.committed {
|
||||||
|
return errors.New("blobwriter close after commit")
|
||||||
|
}
|
||||||
|
|
||||||
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
|
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue