Merge pull request #1706 from aibaars/registry-size-close
Blobwriter: call BlobWriter.Size after BlobWriter.Close
This commit is contained in:
commit
1fc752c718
7 changed files with 95 additions and 28 deletions
|
@ -29,7 +29,7 @@ import (
|
||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/api/errcode"
|
"github.com/docker/distribution/registry/api/errcode"
|
||||||
"github.com/docker/distribution/registry/api/v2"
|
"github.com/docker/distribution/registry/api/v2"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
_ "github.com/docker/distribution/registry/storage/driver/testdriver"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
"github.com/docker/libtrust"
|
"github.com/docker/libtrust"
|
||||||
"github.com/gorilla/handlers"
|
"github.com/gorilla/handlers"
|
||||||
|
@ -219,7 +219,7 @@ func contains(elems []string, e string) bool {
|
||||||
func TestURLPrefix(t *testing.T) {
|
func TestURLPrefix(t *testing.T) {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
config.HTTP.Prefix = "/test/"
|
config.HTTP.Prefix = "/test/"
|
||||||
|
@ -296,7 +296,7 @@ func TestBlobDelete(t *testing.T) {
|
||||||
func TestRelativeURL(t *testing.T) {
|
func TestRelativeURL(t *testing.T) {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
config.HTTP.Headers = headerConfig
|
config.HTTP.Headers = headerConfig
|
||||||
|
@ -1884,7 +1884,7 @@ type testEnv struct {
|
||||||
func newTestEnvMirror(t *testing.T, deleteEnabled bool) *testEnv {
|
func newTestEnvMirror(t *testing.T, deleteEnabled bool) *testEnv {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
||||||
},
|
},
|
||||||
Proxy: configuration.Proxy{
|
Proxy: configuration.Proxy{
|
||||||
|
@ -1899,7 +1899,7 @@ func newTestEnvMirror(t *testing.T, deleteEnabled bool) *testEnv {
|
||||||
func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
|
func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -2413,7 +2413,7 @@ func TestCheckContextNotifier(t *testing.T) {
|
||||||
func TestProxyManifestGetByTag(t *testing.T) {
|
func TestProxyManifestGetByTag(t *testing.T) {
|
||||||
truthConfig := configuration.Configuration{
|
truthConfig := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
truthConfig.HTTP.Headers = headerConfig
|
truthConfig.HTTP.Headers = headerConfig
|
||||||
|
@ -2427,7 +2427,7 @@ func TestProxyManifestGetByTag(t *testing.T) {
|
||||||
|
|
||||||
proxyConfig := configuration.Configuration{
|
proxyConfig := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
},
|
},
|
||||||
Proxy: configuration.Proxy{
|
Proxy: configuration.Proxy{
|
||||||
RemoteURL: truthEnv.server.URL,
|
RemoteURL: truthEnv.server.URL,
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
_ "github.com/docker/distribution/registry/auth/silly"
|
_ "github.com/docker/distribution/registry/auth/silly"
|
||||||
"github.com/docker/distribution/registry/storage"
|
"github.com/docker/distribution/registry/storage"
|
||||||
memorycache "github.com/docker/distribution/registry/storage/cache/memory"
|
memorycache "github.com/docker/distribution/registry/storage/cache/memory"
|
||||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
"github.com/docker/distribution/registry/storage/driver/testdriver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestAppDispatcher builds an application with a test dispatcher and ensures
|
// TestAppDispatcher builds an application with a test dispatcher and ensures
|
||||||
|
@ -24,7 +24,7 @@ import (
|
||||||
// This only tests the dispatch mechanism. The underlying dispatchers must be
|
// This only tests the dispatch mechanism. The underlying dispatchers must be
|
||||||
// tested individually.
|
// tested individually.
|
||||||
func TestAppDispatcher(t *testing.T) {
|
func TestAppDispatcher(t *testing.T) {
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
registry, err := storage.NewRegistry(ctx, driver, storage.BlobDescriptorCacheProvider(memorycache.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableDelete, storage.EnableRedirect)
|
registry, err := storage.NewRegistry(ctx, driver, storage.BlobDescriptorCacheProvider(memorycache.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableDelete, storage.EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -142,7 +142,7 @@ func TestNewApp(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": nil,
|
"testdriver": nil,
|
||||||
},
|
},
|
||||||
Auth: configuration.Auth{
|
Auth: configuration.Auth{
|
||||||
// For now, we simply test that new auth results in a viable
|
// For now, we simply test that new auth results in a viable
|
||||||
|
|
|
@ -134,7 +134,6 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
|
||||||
}
|
}
|
||||||
|
|
||||||
buh.Upload = upload
|
buh.Upload = upload
|
||||||
defer buh.Upload.Close()
|
|
||||||
|
|
||||||
if err := buh.blobUploadResponse(w, r, true); err != nil {
|
if err := buh.blobUploadResponse(w, r, true); err != nil {
|
||||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||||
|
@ -224,11 +223,8 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
size := buh.Upload.Size()
|
|
||||||
|
|
||||||
desc, err := buh.Upload.Commit(buh, distribution.Descriptor{
|
desc, err := buh.Upload.Commit(buh, distribution.Descriptor{
|
||||||
Digest: dgst,
|
Digest: dgst,
|
||||||
Size: size,
|
|
||||||
|
|
||||||
// TODO(stevvooe): This isn't wildly important yet, but we should
|
// TODO(stevvooe): This isn't wildly important yet, but we should
|
||||||
// really set the mediatype. For now, we can let the backend take care
|
// really set the mediatype. For now, we can let the backend take care
|
||||||
|
@ -295,6 +291,7 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
|
||||||
// TODO(stevvooe): Need a better way to manage the upload state automatically.
|
// TODO(stevvooe): Need a better way to manage the upload state automatically.
|
||||||
buh.State.Name = buh.Repository.Named().Name()
|
buh.State.Name = buh.Repository.Named().Name()
|
||||||
buh.State.UUID = buh.Upload.ID()
|
buh.State.UUID = buh.Upload.ID()
|
||||||
|
buh.Upload.Close()
|
||||||
buh.State.Offset = buh.Upload.Size()
|
buh.State.Offset = buh.Upload.Size()
|
||||||
buh.State.StartedAt = buh.Upload.StartedAt()
|
buh.State.StartedAt = buh.Upload.StartedAt()
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
"github.com/docker/distribution/registry/storage/driver/testdriver"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ import (
|
||||||
func TestWriteSeek(t *testing.T) {
|
func TestWriteSeek(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating registry: %v", err)
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
@ -43,6 +43,7 @@ func TestWriteSeek(t *testing.T) {
|
||||||
}
|
}
|
||||||
contents := []byte{1, 2, 3}
|
contents := []byte{1, 2, 3}
|
||||||
blobUpload.Write(contents)
|
blobUpload.Write(contents)
|
||||||
|
blobUpload.Close()
|
||||||
offset := blobUpload.Size()
|
offset := blobUpload.Size()
|
||||||
if offset != int64(len(contents)) {
|
if offset != int64(len(contents)) {
|
||||||
t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents))
|
t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents))
|
||||||
|
@ -60,7 +61,7 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating registry: %v", err)
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
@ -121,11 +122,12 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||||
t.Fatalf("layer data write incomplete")
|
t.Fatalf("layer data write incomplete")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blobUpload.Close()
|
||||||
|
|
||||||
offset := blobUpload.Size()
|
offset := blobUpload.Size()
|
||||||
if offset != nn {
|
if offset != nn {
|
||||||
t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn)
|
t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn)
|
||||||
}
|
}
|
||||||
blobUpload.Close()
|
|
||||||
|
|
||||||
// Do a resume, for good fun
|
// Do a resume, for good fun
|
||||||
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
|
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
|
||||||
|
@ -254,7 +256,7 @@ func TestSimpleBlobUpload(t *testing.T) {
|
||||||
func TestSimpleBlobRead(t *testing.T) {
|
func TestSimpleBlobRead(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating registry: %v", err)
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
@ -366,7 +368,7 @@ func TestBlobMount(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
sourceImageName, _ := reference.ParseNamed("foo/source")
|
sourceImageName, _ := reference.ParseNamed("foo/source")
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating registry: %v", err)
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
@ -517,7 +519,7 @@ func TestBlobMount(t *testing.T) {
|
||||||
func TestLayerUploadZeroLength(t *testing.T) {
|
func TestLayerUploadZeroLength(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
driver := inmemory.New()
|
driver := testdriver.New()
|
||||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating registry: %v", err)
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
|
|
@ -58,6 +58,7 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
bw.Close()
|
bw.Close()
|
||||||
|
desc.Size = bw.Size()
|
||||||
|
|
||||||
canonical, err := bw.validateBlob(ctx, desc)
|
canonical, err := bw.validateBlob(ctx, desc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -321,12 +321,8 @@ type writer struct {
|
||||||
|
|
||||||
// Cancel removes any written content from this FileWriter.
|
// Cancel removes any written content from this FileWriter.
|
||||||
func (w *writer) Cancel() error {
|
func (w *writer) Cancel() error {
|
||||||
err := w.checkClosed()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.closed = true
|
w.closed = true
|
||||||
err = storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
|
err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status, ok := err.(*googleapi.Error); ok {
|
if status, ok := err.(*googleapi.Error); ok {
|
||||||
if status.Code == http.StatusNotFound {
|
if status.Code == http.StatusNotFound {
|
||||||
|
|
71
registry/storage/driver/testdriver/testdriver.go
Normal file
71
registry/storage/driver/testdriver/testdriver.go
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
package testdriver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/distribution/context"
|
||||||
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
|
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||||
|
)
|
||||||
|
|
||||||
|
const driverName = "testdriver"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
factory.Register(driverName, &testDriverFactory{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// testDriverFactory implements the factory.StorageDriverFactory interface.
|
||||||
|
type testDriverFactory struct{}
|
||||||
|
|
||||||
|
func (factory *testDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
|
return New(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDriver is a StorageDriver for testing purposes. The Writer returned by this driver
|
||||||
|
// simulates the case where Write operations are buffered. This causes the value returned by Size to lag
|
||||||
|
// behind until Close (or Commit, or Cancel) is called.
|
||||||
|
type TestDriver struct {
|
||||||
|
storagedriver.StorageDriver
|
||||||
|
}
|
||||||
|
|
||||||
|
type testFileWriter struct {
|
||||||
|
storagedriver.FileWriter
|
||||||
|
prevchunk []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ storagedriver.StorageDriver = &TestDriver{}
|
||||||
|
|
||||||
|
// New constructs a new StorageDriver for testing purposes. The Writer returned by this driver
|
||||||
|
// simulates the case where Write operations are buffered. This causes the value returned by Size to lag
|
||||||
|
// behind until Close (or Commit, or Cancel) is called.
|
||||||
|
func New() *TestDriver {
|
||||||
|
return &TestDriver{StorageDriver: inmemory.New()}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writer returns a FileWriter which will store the content written to it
|
||||||
|
// at the location designated by "path" after the call to Commit.
|
||||||
|
func (td *TestDriver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||||
|
fw, err := td.StorageDriver.Writer(ctx, path, append)
|
||||||
|
return &testFileWriter{FileWriter: fw}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tfw *testFileWriter) Write(p []byte) (int, error) {
|
||||||
|
_, err := tfw.FileWriter.Write(tfw.prevchunk)
|
||||||
|
tfw.prevchunk = make([]byte, len(p))
|
||||||
|
copy(tfw.prevchunk, p)
|
||||||
|
return len(p), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tfw *testFileWriter) Close() error {
|
||||||
|
tfw.Write(nil)
|
||||||
|
return tfw.FileWriter.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tfw *testFileWriter) Cancel() error {
|
||||||
|
tfw.Write(nil)
|
||||||
|
return tfw.FileWriter.Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tfw *testFileWriter) Commit() error {
|
||||||
|
tfw.Write(nil)
|
||||||
|
return tfw.FileWriter.Commit()
|
||||||
|
}
|
Loading…
Reference in a new issue