Merge pull request #677 from RichardScothern/soft-delete-remove-links

Manifest and layer soft deletion
This commit is contained in:
Stephen Day 2015-07-24 13:02:36 -07:00
commit afc45e8f57
21 changed files with 816 additions and 92 deletions

View file

@ -354,7 +354,7 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
defer resp.Body.Close() defer resp.Body.Close()
switch resp.StatusCode { switch resp.StatusCode {
case http.StatusOK: case http.StatusAccepted:
return nil return nil
default: default:
return handleErrorResponse(resp) return handleErrorResponse(resp)
@ -366,7 +366,8 @@ type blobs struct {
ub *v2.URLBuilder ub *v2.URLBuilder
client *http.Client client *http.Client
statter distribution.BlobStatter statter distribution.BlobDescriptorService
distribution.BlobDeleter
} }
func sanitizeLocation(location, source string) (string, error) { func sanitizeLocation(location, source string) (string, error) {
@ -484,6 +485,10 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented") panic("not implemented")
} }
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}
type blobStatter struct { type blobStatter struct {
name string name string
ub *v2.URLBuilder ub *v2.URLBuilder
@ -535,3 +540,32 @@ func buildCatalogValues(maxEntries int, last string) url.Values {
return values return values
} }
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst)
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", blobURL, nil)
if err != nil {
return err
}
resp, err := bs.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusAccepted:
return nil
default:
return handleErrorResponse(resp)
}
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
return nil
}

View file

@ -101,6 +101,39 @@ func addTestCatalog(route string, content []byte, link string, m *testutil.Reque
}) })
} }
func TestBlobDelete(t *testing.T) {
dgst, _ := newRandomBlob(1024)
var m testutil.RequestResponseMap
repo := "test.example.com/repo1"
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "DELETE",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
err = l.Delete(ctx, dgst)
if err != nil {
t.Errorf("Error deleting blob: %s", err.Error())
}
}
func TestBlobFetch(t *testing.T) { func TestBlobFetch(t *testing.T) {
d1, b1 := newRandomBlob(1024) d1, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap var m testutil.RequestResponseMap
@ -590,7 +623,7 @@ func TestManifestDelete(t *testing.T) {
Route: "/v2/" + repo + "/manifests/" + dgst1.String(), Route: "/v2/" + repo + "/manifests/" + dgst1.String(),
}, },
Response: testutil.Response{ Response: testutil.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{ Headers: http.Header(map[string][]string{
"Content-Length": {"0"}, "Content-Length": {"0"},
}), }),

View file

@ -33,7 +33,7 @@ import (
// TestCheckAPI hits the base endpoint (/v2/) ensures we return the specified // TestCheckAPI hits the base endpoint (/v2/) ensures we return the specified
// 200 OK response. // 200 OK response.
func TestCheckAPI(t *testing.T) { func TestCheckAPI(t *testing.T) {
env := newTestEnv(t) env := newTestEnv(t, false)
baseURL, err := env.builder.BuildBaseURL() baseURL, err := env.builder.BuildBaseURL()
if err != nil { if err != nil {
@ -65,7 +65,7 @@ func TestCheckAPI(t *testing.T) {
// TestCatalogAPI tests the /v2/_catalog endpoint // TestCatalogAPI tests the /v2/_catalog endpoint
func TestCatalogAPI(t *testing.T) { func TestCatalogAPI(t *testing.T) {
chunkLen := 2 chunkLen := 2
env := newTestEnv(t) env := newTestEnv(t, false)
values := url.Values{ values := url.Values{
"last": []string{""}, "last": []string{""},
@ -239,18 +239,16 @@ func TestURLPrefix(t *testing.T) {
"Content-Type": []string{"application/json; charset=utf-8"}, "Content-Type": []string{"application/json; charset=utf-8"},
"Content-Length": []string{"2"}, "Content-Length": []string{"2"},
}) })
} }
// TestBlobAPI conducts a full test of the of the blob api. type blobArgs struct {
func TestBlobAPI(t *testing.T) { imageName string
// TODO(stevvooe): This test code is complete junk but it should cover the layerFile io.ReadSeeker
// complete flow. This must be broken down and checked against the layerDigest digest.Digest
// specification *before* we submit the final to docker core. tarSumStr string
env := newTestEnv(t) }
imageName := "foo/bar" func makeBlobArgs(t *testing.T) blobArgs {
// "build" our layer file
layerFile, tarSumStr, err := testutil.CreateRandomTarFile() layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil { if err != nil {
t.Fatalf("error creating random layer file: %v", err) t.Fatalf("error creating random layer file: %v", err)
@ -258,6 +256,66 @@ func TestBlobAPI(t *testing.T) {
layerDigest := digest.Digest(tarSumStr) layerDigest := digest.Digest(tarSumStr)
args := blobArgs{
imageName: "foo/bar",
layerFile: layerFile,
layerDigest: layerDigest,
tarSumStr: tarSumStr,
}
return args
}
// TestBlobAPI conducts a full test of the of the blob api.
func TestBlobAPI(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
testBlobAPI(t, env, args)
deleteEnabled = true
env = newTestEnv(t, deleteEnabled)
args = makeBlobArgs(t)
testBlobAPI(t, env, args)
}
func TestBlobDelete(t *testing.T) {
deleteEnabled := true
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
env = testBlobAPI(t, env, args)
testBlobDelete(t, env, args)
}
func TestBlobDeleteDisabled(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
imageName := args.imageName
layerDigest := args.layerDigest
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf("error building url: %v", err)
}
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting when disabled: %v", err)
}
checkResponse(t, "status of disabled delete", resp, http.StatusMethodNotAllowed)
}
func testBlobAPI(t *testing.T, env *testEnv, args blobArgs) *testEnv {
// TODO(stevvooe): This test code is complete junk but it should cover the
// complete flow. This must be broken down and checked against the
// specification *before* we submit the final to docker core.
imageName := args.imageName
layerFile := args.layerFile
layerDigest := args.layerDigest
// ----------------------------------- // -----------------------------------
// Test fetch for non-existent content // Test fetch for non-existent content
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest) layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
@ -372,6 +430,7 @@ func TestBlobAPI(t *testing.T) {
uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName) uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName)
uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength) uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength)
finishUpload(t, env.builder, imageName, uploadURLBase, dgst) finishUpload(t, env.builder, imageName, uploadURLBase, dgst)
// ------------------------ // ------------------------
// Use a head request to see if the layer exists. // Use a head request to see if the layer exists.
resp, err = http.Head(layerURL) resp, err = http.Head(layerURL)
@ -459,12 +518,188 @@ func TestBlobAPI(t *testing.T) {
// Missing tests: // Missing tests:
// - Upload the same tarsum file under and different repository and // - Upload the same tarsum file under and different repository and
// ensure the content remains uncorrupted. // ensure the content remains uncorrupted.
return env
}
func testBlobDelete(t *testing.T, env *testEnv, args blobArgs) {
// Upload a layer
imageName := args.imageName
layerFile := args.layerFile
layerDigest := args.layerDigest
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf(err.Error())
}
// ---------------
// Delete a layer
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{"0"},
})
// ---------------
// Try and get it back
// Use a head request to see if the layer exists.
resp, err = http.Head(layerURL)
if err != nil {
t.Fatalf("unexpected error checking head on existing layer: %v", err)
}
checkResponse(t, "checking existence of deleted layer", resp, http.StatusNotFound)
// Delete already deleted layer
resp, err = httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer", resp, http.StatusNotFound)
// ----------------
// Attempt to delete a layer with an invalid digest
badURL := strings.Replace(layerURL, "tarsum", "trsum", 1)
resp, err = httpDelete(badURL)
if err != nil {
t.Fatalf("unexpected error fetching layer: %v", err)
}
checkResponse(t, "deleting layer bad digest", resp, http.StatusBadRequest)
// ----------------
// Reupload previously deleted blob
layerFile.Seek(0, os.SEEK_SET)
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
layerFile.Seek(0, os.SEEK_SET)
canonicalDigester := digest.Canonical.New()
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
t.Fatalf("error copying to digest: %v", err)
}
canonicalDigest := canonicalDigester.Digest()
// ------------------------
// Use a head request to see if it exists
resp, err = http.Head(layerURL)
if err != nil {
t.Fatalf("unexpected error checking head on existing layer: %v", err)
}
layerLength, _ := layerFile.Seek(0, os.SEEK_END)
checkResponse(t, "checking head on reuploaded layer", resp, http.StatusOK)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{fmt.Sprint(layerLength)},
"Docker-Content-Digest": []string{canonicalDigest.String()},
})
}
func TestDeleteDisabled(t *testing.T) {
env := newTestEnv(t, false)
imageName := "foo/bar"
// "build" our layer file
layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random layer file: %v", err)
}
layerDigest := digest.Digest(tarSumStr)
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf("Error building blob URL")
}
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer with delete disabled", resp, http.StatusMethodNotAllowed)
}
func httpDelete(url string) (*http.Response, error) {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// defer resp.Body.Close()
return resp, err
}
type manifestArgs struct {
imageName string
signedManifest *manifest.SignedManifest
dgst digest.Digest
}
func makeManifestArgs(t *testing.T) manifestArgs {
args := manifestArgs{
imageName: "foo/bar",
}
return args
} }
func TestManifestAPI(t *testing.T) { func TestManifestAPI(t *testing.T) {
env := newTestEnv(t) deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
testManifestAPI(t, env, args)
imageName := "foo/bar" deleteEnabled = true
env = newTestEnv(t, deleteEnabled)
args = makeManifestArgs(t)
testManifestAPI(t, env, args)
}
func TestManifestDelete(t *testing.T) {
deleteEnabled := true
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
env, args = testManifestAPI(t, env, args)
testManifestDelete(t, env, args)
}
func TestManifestDeleteDisabled(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
testManifestDeleteDisabled(t, env, args)
}
func testManifestDeleteDisabled(t *testing.T, env *testEnv, args manifestArgs) *testEnv {
imageName := args.imageName
manifestURL, err := env.builder.BuildManifestURL(imageName, digest.DigestSha256EmptyTar)
if err != nil {
t.Fatalf("unexpected error getting manifest url: %v", err)
}
resp, err := httpDelete(manifestURL)
if err != nil {
t.Fatalf("unexpected error deleting manifest %v", err)
}
defer resp.Body.Close()
checkResponse(t, "status of disabled delete of manifest", resp, http.StatusMethodNotAllowed)
return nil
}
func testManifestAPI(t *testing.T, env *testEnv, args manifestArgs) (*testEnv, manifestArgs) {
imageName := args.imageName
tag := "thetag" tag := "thetag"
manifestURL, err := env.builder.BuildManifestURL(imageName, tag) manifestURL, err := env.builder.BuildManifestURL(imageName, tag)
@ -567,6 +802,9 @@ func TestManifestAPI(t *testing.T) {
dgst, err := digest.FromBytes(payload) dgst, err := digest.FromBytes(payload)
checkErr(t, err, "digesting manifest") checkErr(t, err, "digesting manifest")
args.signedManifest = signedManifest
args.dgst = dgst
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String()) manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
checkErr(t, err, "building manifest url") checkErr(t, err, "building manifest url")
@ -687,6 +925,70 @@ func TestManifestAPI(t *testing.T) {
if tagsResponse.Tags[0] != tag { if tagsResponse.Tags[0] != tag {
t.Fatalf("tag not as expected: %q != %q", tagsResponse.Tags[0], tag) t.Fatalf("tag not as expected: %q != %q", tagsResponse.Tags[0], tag)
} }
return env, args
}
func testManifestDelete(t *testing.T, env *testEnv, args manifestArgs) {
imageName := args.imageName
dgst := args.dgst
signedManifest := args.signedManifest
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
// ---------------
// Delete by digest
resp, err := httpDelete(manifestDigestURL)
checkErr(t, err, "deleting manifest by digest")
checkResponse(t, "deleting manifest", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{"0"},
})
// ---------------
// Attempt to fetch deleted manifest
resp, err = http.Get(manifestDigestURL)
checkErr(t, err, "fetching deleted manifest by digest")
defer resp.Body.Close()
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
// ---------------
// Delete already deleted manifest by digest
resp, err = httpDelete(manifestDigestURL)
checkErr(t, err, "re-deleting manifest by digest")
checkResponse(t, "re-deleting manifest", resp, http.StatusNotFound)
// --------------------
// Re-upload manifest by digest
resp = putManifest(t, "putting signed manifest", manifestDigestURL, signedManifest)
checkResponse(t, "putting signed manifest", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Location": []string{manifestDigestURL},
"Docker-Content-Digest": []string{dgst.String()},
})
// ---------------
// Attempt to fetch re-uploaded deleted digest
resp, err = http.Get(manifestDigestURL)
checkErr(t, err, "fetching re-uploaded manifest by digest")
defer resp.Body.Close()
checkResponse(t, "fetching re-uploaded manifest", resp, http.StatusOK)
checkHeaders(t, resp, http.Header{
"Docker-Content-Digest": []string{dgst.String()},
})
// ---------------
// Attempt to delete an unknown manifest
unknownDigest := "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
unknownManifestDigestURL, err := env.builder.BuildManifestURL(imageName, unknownDigest)
checkErr(t, err, "building unknown manifest url")
resp, err = httpDelete(unknownManifestDigestURL)
checkErr(t, err, "delting unknown manifest by digest")
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
} }
type testEnv struct { type testEnv struct {
@ -698,10 +1000,11 @@ type testEnv struct {
builder *v2.URLBuilder builder *v2.URLBuilder
} }
func newTestEnv(t *testing.T) *testEnv { func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
config := configuration.Configuration{ config := configuration.Configuration{
Storage: configuration.Storage{ Storage: configuration.Storage{
"inmemory": configuration.Parameters{}, "inmemory": configuration.Parameters{},
"delete": configuration.Parameters{"enabled": deleteEnabled},
}, },
} }
@ -1005,7 +1308,7 @@ func checkHeaders(t *testing.T, resp *http.Response, headers http.Header) {
for _, hv := range resp.Header[k] { for _, hv := range resp.Header[k] {
if hv != v { if hv != v {
t.Fatalf("%v header value not matched in response: %q != %q", k, hv, v) t.Fatalf("%+v %v header value not matched in response: %q != %q", resp.Header, k, hv, v)
} }
} }
} }

View file

@ -106,6 +106,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
app.configureRedis(&configuration) app.configureRedis(&configuration)
app.configureLogHook(&configuration) app.configureLogHook(&configuration)
deleteEnabled := false
if d, ok := configuration.Storage["delete"]; ok {
e, ok := d["enabled"]
if ok {
if deleteEnabled, ok = e.(bool); !ok {
deleteEnabled = false
}
}
}
// configure storage caches // configure storage caches
if cc, ok := configuration.Storage["cache"]; ok { if cc, ok := configuration.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"] v, ok := cc["blobdescriptor"]
@ -119,10 +129,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.redis == nil { if app.redis == nil {
panic("redis configuration required to use for layerinfo cache") panic("redis configuration required to use for layerinfo cache")
} }
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)) app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled)
ctxu.GetLogger(app).Infof("using redis blob descriptor cache") ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory": case "inmemory":
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider()) app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled)
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
default: default:
if v != "" { if v != "" {
@ -133,7 +143,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.registry == nil { if app.registry == nil {
// configure the registry if no cache section is available. // configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil) app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled)
} }
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])

View file

@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) {
Context: ctx, Context: ctx,
router: v2.Router(), router: v2.Router(),
driver: driver, driver: driver,
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider()), registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true),
} }
server := httptest.NewServer(app) server := httptest.NewServer(app)
router := v2.Router() router := v2.Router()

View file

@ -33,8 +33,9 @@ func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
} }
return handlers.MethodHandler{ return handlers.MethodHandler{
"GET": http.HandlerFunc(blobHandler.GetBlob), "GET": http.HandlerFunc(blobHandler.GetBlob),
"HEAD": http.HandlerFunc(blobHandler.GetBlob), "HEAD": http.HandlerFunc(blobHandler.GetBlob),
"DELETE": http.HandlerFunc(blobHandler.DeleteBlob),
} }
} }
@ -66,3 +67,27 @@ func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
// DeleteBlob deletes a layer blob
func (bh *blobHandler) DeleteBlob(w http.ResponseWriter, r *http.Request) {
context.GetLogger(bh).Debug("DeleteBlob")
blobs := bh.Repository.Blobs(bh)
err := blobs.Delete(bh, bh.Digest)
if err != nil {
switch err {
case distribution.ErrBlobUnknown:
w.WriteHeader(http.StatusNotFound)
bh.Errors = append(bh.Errors, v2.ErrorCodeBlobUnknown)
case distribution.ErrUnsupported:
w.WriteHeader(http.StatusMethodNotAllowed)
bh.Errors = append(bh.Errors, v2.ErrorCodeUnsupported)
default:
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown)
}
return
}
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusAccepted)
}

View file

@ -186,16 +186,38 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }
// DeleteImageManifest removes the image with the given tag from the registry. // DeleteImageManifest removes the manifest with the given digest from the registry.
func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) { func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) {
ctxu.GetLogger(imh).Debug("DeleteImageManifest") ctxu.GetLogger(imh).Debug("DeleteImageManifest")
// TODO(stevvooe): Unfortunately, at this point, manifest deletes are manifests, err := imh.Repository.Manifests(imh)
// unsupported. There are issues with schema version 1 that make removing if err != nil {
// tag index entries a serious problem in eventually consistent storage. imh.Errors = append(imh.Errors, err)
// Once we work out schema version 2, the full deletion system will be return
// worked out and we can add support back. }
imh.Errors = append(imh.Errors, v2.ErrorCodeUnsupported)
err = manifests.Delete(imh.Digest)
if err != nil {
switch err {
case digest.ErrDigestUnsupported:
case digest.ErrDigestInvalidFormat:
imh.Errors = append(imh.Errors, v2.ErrorCodeDigestInvalid)
return
case distribution.ErrBlobUnknown:
imh.Errors = append(imh.Errors, v2.ErrorCodeManifestUnknown)
w.WriteHeader(http.StatusNotFound)
return
case distribution.ErrUnsupported:
imh.Errors = append(imh.Errors, v2.ErrorCodeUnsupported)
w.WriteHeader(http.StatusMethodNotAllowed)
default:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown)
w.WriteHeader(http.StatusBadRequest)
return
}
}
w.WriteHeader(http.StatusAccepted)
} }
// digestManifest takes a digest of the given manifest. This belongs somewhere // digestManifest takes a digest of the given manifest. This belongs somewhere

View file

@ -21,13 +21,11 @@ import (
// error paths that might be seen during an upload. // error paths that might be seen during an upload.
func TestSimpleBlobUpload(t *testing.T) { func TestSimpleBlobUpload(t *testing.T) {
randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile() randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil { if err != nil {
t.Fatalf("error creating random reader: %v", err) t.Fatalf("error creating random reader: %v", err)
} }
dgst := digest.Digest(tarSumStr) dgst := digest.Digest(tarSumStr)
if err != nil { if err != nil {
t.Fatalf("error allocating upload store: %v", err) t.Fatalf("error allocating upload store: %v", err)
} }
@ -35,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -139,6 +137,72 @@ func TestSimpleBlobUpload(t *testing.T) {
if digest.NewDigest("sha256", h) != sha256Digest { if digest.NewDigest("sha256", h) != sha256Digest {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest) t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest)
} }
// Delete a blob
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err := bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %s", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
_, err = bs.Open(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected success opening deleted blob for read")
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type getting deleted manifest: %#v", err)
}
// Re-upload the blob
randomBlob, err := ioutil.ReadAll(randomDataReader)
if err != nil {
t.Fatalf("Error reading all of blob %s", err.Error())
}
expectedDigest, err := digest.FromBytes(randomBlob)
if err != nil {
t.Fatalf("Error getting digest from bytes: %s", err)
}
simpleUpload(t, bs, randomBlob, expectedDigest)
d, err = bs.Stat(ctx, expectedDigest)
if err != nil {
t.Errorf("unexpected error stat-ing blob")
}
if d.Digest != expectedDigest {
t.Errorf("Mismatching digest with restored blob")
}
_, err = bs.Open(ctx, expectedDigest)
if err != nil {
t.Errorf("Unexpected error opening blob")
}
// Reuse state to test delete with a delete-disabled registry
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
repository, err = registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs = repository.Blobs(ctx)
err = bs.Delete(ctx, desc.Digest)
if err == nil {
t.Errorf("Unexpected success deleting while disabled")
}
} }
// TestSimpleBlobRead just creates a simple blob file and ensures that basic // TestSimpleBlobRead just creates a simple blob file and ensures that basic
@ -148,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -252,19 +316,24 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
} }
bs := repository.Blobs(ctx) bs := repository.Blobs(ctx)
simpleUpload(t, bs, []byte{}, digest.DigestSha256EmptyTar)
}
func simpleUpload(t *testing.T, bs distribution.BlobIngester, blob []byte, expectedDigest digest.Digest) {
ctx := context.Background()
wr, err := bs.Create(ctx) wr, err := bs.Create(ctx)
if err != nil { if err != nil {
t.Fatalf("unexpected error starting upload: %v", err) t.Fatalf("unexpected error starting upload: %v", err)
} }
nn, err := io.Copy(wr, bytes.NewReader([]byte{})) nn, err := io.Copy(wr, bytes.NewReader(blob))
if err != nil { if err != nil {
t.Fatalf("error copying into blob writer: %v", err) t.Fatalf("error copying into blob writer: %v", err)
} }
@ -273,12 +342,12 @@ func TestLayerUploadZeroLength(t *testing.T) {
t.Fatalf("unexpected number of bytes copied: %v > 0", nn) t.Fatalf("unexpected number of bytes copied: %v > 0", nn)
} }
dgst, err := digest.FromReader(bytes.NewReader([]byte{})) dgst, err := digest.FromReader(bytes.NewReader(blob))
if err != nil { if err != nil {
t.Fatalf("error getting zero digest: %v", err) t.Fatalf("error getting digest: %v", err)
} }
if dgst != digest.DigestSha256EmptyTar { if dgst != expectedDigest {
// sanity check on zero digest // sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar) t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
} }

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver"
) )
// blobStore implements a the read side of the blob store interface over a // blobStore implements the read side of the blob store interface over a
// driver without enforcing per-repository membership. This object is // driver without enforcing per-repository membership. This object is
// intentionally a leaky abstraction, providing utility methods that support // intentionally a leaky abstraction, providing utility methods that support
// creating and traversing backend links. // creating and traversing backend links.
@ -143,7 +143,7 @@ type blobStatter struct {
pm *pathMapper pm *pathMapper
} }
var _ distribution.BlobStatter = &blobStatter{} var _ distribution.BlobDescriptorService = &blobStatter{}
// Stat implements BlobStatter.Stat by returning the descriptor for the blob // Stat implements BlobStatter.Stat by returning the descriptor for the blob
// in the main blob store. If this method returns successfully, there is // in the main blob store. If this method returns successfully, there is
@ -188,3 +188,11 @@ func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distributi
Digest: dgst, Digest: dgst,
}, nil }, nil
} }
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
return distribution.ErrUnsupported
}

View file

@ -70,6 +70,11 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
if err != nil {
return distribution.Descriptor{}, err
}
return canonical, nil return canonical, nil
} }

View file

@ -26,13 +26,13 @@ type MetricsTracker interface {
type cachedBlobStatter struct { type cachedBlobStatter struct {
cache distribution.BlobDescriptorService cache distribution.BlobDescriptorService
backend distribution.BlobStatter backend distribution.BlobDescriptorService
tracker MetricsTracker tracker MetricsTracker
} }
// NewCachedBlobStatter creates a new statter which prefers a cache and // NewCachedBlobStatter creates a new statter which prefers a cache and
// falls back to a backend. // falls back to a backend.
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobStatter) distribution.BlobStatter { func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService) distribution.BlobDescriptorService {
return &cachedBlobStatter{ return &cachedBlobStatter{
cache: cache, cache: cache,
backend: backend, backend: backend,
@ -41,7 +41,7 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist
// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and // NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
// falls back to a backend. Hits and misses will send to the tracker. // falls back to a backend. Hits and misses will send to the tracker.
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobStatter, tracker MetricsTracker) distribution.BlobStatter { func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter {
return &cachedBlobStatter{ return &cachedBlobStatter{
cache: cache, cache: cache,
backend: backend, backend: backend,
@ -77,4 +77,25 @@ fallback:
} }
return desc, err return desc, err
}
func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
err := cbds.cache.Clear(ctx, dgst)
if err != nil {
return err
}
err = cbds.backend.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
}
return nil
} }

View file

@ -44,6 +44,10 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) Stat(ctx context.Context, dgs
return imbdcp.global.Stat(ctx, dgst) return imbdcp.global.Stat(ctx, dgst)
} }
func (imbdcp *inMemoryBlobDescriptorCacheProvider) Clear(ctx context.Context, dgst digest.Digest) error {
return imbdcp.global.Clear(ctx, dgst)
}
func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
_, err := imbdcp.Stat(ctx, dgst) _, err := imbdcp.Stat(ctx, dgst)
if err == distribution.ErrBlobUnknown { if err == distribution.ErrBlobUnknown {
@ -80,6 +84,14 @@ func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Co
return rsimbdcp.repository.Stat(ctx, dgst) return rsimbdcp.repository.Stat(ctx, dgst)
} }
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
if rsimbdcp.repository == nil {
return distribution.ErrBlobUnknown
}
return rsimbdcp.repository.Clear(ctx, dgst)
}
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if rsimbdcp.repository == nil { if rsimbdcp.repository == nil {
// allocate map since we are setting it now. // allocate map since we are setting it now.
@ -133,6 +145,14 @@ func (mbdc *mapBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest
return desc, nil return desc, nil
} }
func (mbdc *mapBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
mbdc.mu.Lock()
defer mbdc.mu.Unlock()
delete(mbdc.descriptors, dgst)
return nil
}
func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil { if err := dgst.Validate(); err != nil {
return err return err

View file

@ -12,7 +12,7 @@ import (
) )
// redisBlobStatService provides an implementation of // redisBlobStatService provides an implementation of
// BlobDescriptorCacheProvider based on redis. Blob descritors are stored in // BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
// two parts. The first provide fast access to repository membership through a // two parts. The first provide fast access to repository membership through a
// redis set for each repo. The second is a redis hash keyed by the digest of // redis set for each repo. The second is a redis hash keyed by the digest of
// the layer, providing path, length and mediatype information. There is also // the layer, providing path, length and mediatype information. There is also
@ -63,6 +63,27 @@ func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Di
return rbds.stat(ctx, conn, dgst) return rbds.stat(ctx, conn, dgst)
} }
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rbds.pool.Get()
defer conn.Close()
// Not atomic in redis <= 2.3
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype")
if err != nil {
return err
}
if reply == 0 {
return distribution.ErrBlobUnknown
}
return nil
}
// stat provides an internal stat call that takes a connection parameter. This // stat provides an internal stat call that takes a connection parameter. This
// allows some internal management of the connection scope. // allows some internal management of the connection scope.
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) { func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
@ -170,6 +191,28 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte
return upstream, nil return upstream, nil
} }
// Clear removes the descriptor from the cache and forwards to the upstream descriptor store
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rsrbds.upstream.pool.Get()
defer conn.Close()
// Check membership to repository first
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
if err != nil {
return err
}
if !member {
return distribution.ErrBlobUnknown
}
return rsrbds.upstream.Clear(ctx, dgst)
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil { if err := dgst.Validate(); err != nil {
return err return err

View file

@ -139,3 +139,40 @@ func checkBlobDescriptorCacheSetAndRead(t *testing.T, ctx context.Context, provi
t.Fatalf("unexpected descriptor: %#v != %#v", desc, expected) t.Fatalf("unexpected descriptor: %#v != %#v", desc, expected)
} }
} }
func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider BlobDescriptorCacheProvider) {
localDigest := digest.Digest("sha384:abc")
expected := distribution.Descriptor{
Digest: "sha256:abc",
Size: 10,
MediaType: "application/octet-stream"}
cache, err := provider.RepositoryScoped("foo/bar")
if err != nil {
t.Fatalf("unexpected error getting scoped cache: %v", err)
}
if err := cache.SetDescriptor(ctx, localDigest, expected); err != nil {
t.Fatalf("error setting descriptor: %v", err)
}
desc, err := cache.Stat(ctx, localDigest)
if err != nil {
t.Fatalf("unexpected error statting fake2:abc: %v", err)
}
if expected != desc {
t.Fatalf("unexpected descriptor: %#v != %#v", expected, desc)
}
err = cache.Clear(ctx, localDigest)
if err != nil {
t.Fatalf("unexpected error deleting descriptor")
}
nonExistantDigest := digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
err = cache.Clear(ctx, nonExistantDigest)
if err == nil {
t.Fatalf("expected error deleting unknown descriptor")
}
}

View file

@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv {
d := inmemory.New() d := inmemory.New()
c := []byte("") c := []byte("")
ctx := context.Background() ctx := context.Background()
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{}) rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{})
repos := []string{ repos := []string{

View file

@ -16,10 +16,11 @@ import (
// that grant access to the global blob store. // that grant access to the global blob store.
type linkedBlobStore struct { type linkedBlobStore struct {
*blobStore *blobStore
blobServer distribution.BlobServer blobServer distribution.BlobServer
statter distribution.BlobStatter blobAccessController distribution.BlobDescriptorService
repository distribution.Repository repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
// linkPath allows one to control the repository blob link set to which // linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer // the blob store dispatches. This is required because manifest and layer
@ -31,7 +32,7 @@ type linkedBlobStore struct {
var _ distribution.BlobStore = &linkedBlobStore{} var _ distribution.BlobStore = &linkedBlobStore{}
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return lbs.statter.Stat(ctx, dgst) return lbs.blobAccessController.Stat(ctx, dgst)
} }
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
@ -67,6 +68,10 @@ func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter
} }
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
dgst, err := digest.FromBytes(p)
if err != nil {
return distribution.Descriptor{}, err
}
// Place the data in the blob store first. // Place the data in the blob store first.
desc, err := lbs.blobStore.Put(ctx, mediaType, p) desc, err := lbs.blobStore.Put(ctx, mediaType, p)
if err != nil { if err != nil {
@ -74,6 +79,10 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte)
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
return distribution.Descriptor{}, err
}
// TODO(stevvooe): Write out mediatype if incoming differs from what is // TODO(stevvooe): Write out mediatype if incoming differs from what is
// returned by Put above. Note that we should allow updates for a given // returned by Put above. Note that we should allow updates for a given
// repository. // repository.
@ -153,7 +162,26 @@ func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution
return lbs.newBlobUpload(ctx, id, path, startedAt) return lbs.newBlobUpload(ctx, id, path, startedAt)
} }
// newLayerUpload allocates a new upload controller with the given state. func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
if !lbs.deleteEnabled {
return distribution.ErrUnsupported
}
// Ensure the blob is available for deletion
_, err := lbs.blobAccessController.Stat(ctx, dgst)
if err != nil {
return err
}
err = lbs.blobAccessController.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
// newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
fw, err := newFileWriter(ctx, lbs.driver, path) fw, err := newFileWriter(ctx, lbs.driver, path)
if err != nil { if err != nil {
@ -213,7 +241,7 @@ type linkedBlobStatter struct {
linkPath func(pm *pathMapper, name string, dgst digest.Digest) (string, error) linkPath func(pm *pathMapper, name string, dgst digest.Digest) (string, error)
} }
var _ distribution.BlobStatter = &linkedBlobStatter{} var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst) blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
@ -246,6 +274,20 @@ func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (dis
return lbs.blobStore.statter.Stat(ctx, target) return lbs.blobStore.statter.Stat(ctx, target)
} }
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
if err != nil {
return err
}
return lbs.blobStore.driver.Delete(ctx, blobLinkPath)
}
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
// The canonical descriptor for a blob is set at the commit phase of upload
return nil
}
// blobLinkPath provides the path to the blob link, also known as layers. // blobLinkPath provides the path to the blob link, also known as layers.
func blobLinkPath(pm *pathMapper, name string, dgst digest.Digest) (string, error) { func blobLinkPath(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
return pm.path(layerLinkPathSpec{name: name, digest: dgst}) return pm.path(layerLinkPathSpec{name: name, digest: dgst})

View file

@ -69,8 +69,8 @@ func (ms *manifestStore) Put(manifest *manifest.SignedManifest) error {
// Delete removes the revision of the specified manfiest. // Delete removes the revision of the specified manfiest.
func (ms *manifestStore) Delete(dgst digest.Digest) error { func (ms *manifestStore) Delete(dgst digest.Digest) error {
context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete - unsupported") context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete")
return fmt.Errorf("deletion of manifests not supported") return ms.revisionStore.delete(ms.ctx, dgst)
} }
func (ms *manifestStore) Tags() ([]string, error) { func (ms *manifestStore) Tags() ([]string, error) {

View file

@ -29,8 +29,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background() ctx := context.Background()
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repo, err := registry.Repository(ctx, name) repo, err := registry.Repository(ctx, name)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -156,6 +155,7 @@ func TestManifestStorage(t *testing.T) {
} }
fetchedManifest, err := ms.GetByTag(env.tag) fetchedManifest, err := ms.GetByTag(env.tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err) t.Fatalf("unexpected error fetching manifest: %v", err)
} }
@ -296,11 +296,68 @@ func TestManifestStorage(t *testing.T) {
} }
} }
// TODO(stevvooe): Currently, deletes are not supported due to some // Test deleting manifests
// complexity around managing tag indexes. We'll add this support back in err = ms.Delete(dgst)
// when the manifest format has settled. For now, we expect an error for if err != nil {
// all deletes.
if err := ms.Delete(dgst); err == nil {
t.Fatalf("unexpected an error deleting manifest by digest: %v", err) t.Fatalf("unexpected an error deleting manifest by digest: %v", err)
} }
exists, err = ms.Exists(dgst)
if err != nil {
t.Fatalf("Error querying manifest existence")
}
if exists {
t.Errorf("Deleted manifest should not exist")
}
deletedManifest, err := ms.Get(dgst)
if err == nil {
t.Errorf("Unexpected success getting deleted manifest")
}
switch err.(type) {
case distribution.ErrManifestUnknownRevision:
break
default:
t.Errorf("Unexpected error getting deleted manifest: %s", reflect.ValueOf(err).Type())
}
if deletedManifest != nil {
t.Errorf("Deleted manifest get returned non-nil")
}
// Re-upload should restore manifest to a good state
err = ms.Put(sm)
if err != nil {
t.Errorf("Error re-uploading deleted manifest")
}
exists, err = ms.Exists(dgst)
if err != nil {
t.Fatalf("Error querying manifest existence")
}
if !exists {
t.Errorf("Restored manifest should exist")
}
deletedManifest, err = ms.Get(dgst)
if err != nil {
t.Errorf("Unexpected error getting manifest")
}
if deletedManifest == nil {
t.Errorf("Deleted manifest get returned non-nil")
}
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
repo, err := r.Repository(ctx, env.name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
ms, err = repo.Manifests(ctx)
if err != nil {
t.Fatal(err)
}
err = ms.Delete(dgst)
if err == nil {
t.Errorf("Unexpected success deleting while disabled")
}
} }

View file

@ -15,15 +15,16 @@ type registry struct {
blobServer distribution.BlobServer blobServer distribution.BlobServer
statter distribution.BlobStatter // global statter service. statter distribution.BlobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
} }
// NewRegistryWithDriver creates a new registry instance from the provided // NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is // driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate. // cheap to allocate.
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider) distribution.Namespace { func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool) distribution.Namespace {
// create global statter, with cache. // create global statter, with cache.
var statter distribution.BlobStatter = &blobStatter{ var statter distribution.BlobDescriptorService = &blobStatter{
driver: driver, driver: driver,
pm: defaultPathMapper, pm: defaultPathMapper,
} }
@ -46,6 +47,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
pathFn: bs.path, pathFn: bs.path,
}, },
blobDescriptorCacheProvider: blobDescriptorCacheProvider, blobDescriptorCacheProvider: blobDescriptorCacheProvider,
deleteEnabled: deleteEnabled,
} }
} }
@ -107,10 +109,11 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
ctx: ctx, ctx: ctx,
repository: repo, repository: repo,
blobStore: &linkedBlobStore{ blobStore: &linkedBlobStore{
ctx: ctx, ctx: ctx,
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
statter: &linkedBlobStatter{ deleteEnabled: repo.registry.deleteEnabled,
blobAccessController: &linkedBlobStatter{
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
linkPath: manifestRevisionLinkPath, linkPath: manifestRevisionLinkPath,
@ -143,7 +146,7 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
// may be context sensitive in the future. The instance should be used similar // may be context sensitive in the future. The instance should be used similar
// to a request local. // to a request local.
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore { func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
var statter distribution.BlobStatter = &linkedBlobStatter{ var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
linkPath: blobLinkPath, linkPath: blobLinkPath,
@ -154,15 +157,16 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
} }
return &linkedBlobStore{ return &linkedBlobStore{
blobStore: repo.blobStore, blobStore: repo.blobStore,
blobServer: repo.blobServer, blobServer: repo.blobServer,
statter: statter, blobAccessController: statter,
repository: repo, repository: repo,
ctx: ctx, ctx: ctx,
// TODO(stevvooe): linkPath limits this blob store to only layers. // TODO(stevvooe): linkPath limits this blob store to only layers.
// This instance cannot be used for manifest checks. // This instance cannot be used for manifest checks.
linkPath: blobLinkPath, linkPath: blobLinkPath,
deleteEnabled: repo.registry.deleteEnabled,
} }
} }

View file

@ -17,19 +17,6 @@ type revisionStore struct {
ctx context.Context ctx context.Context
} }
func newRevisionStore(ctx context.Context, repo *repository, blobStore *blobStore) *revisionStore {
return &revisionStore{
ctx: ctx,
repository: repo,
blobStore: &linkedBlobStore{
blobStore: blobStore,
repository: repo,
ctx: ctx,
linkPath: manifestRevisionLinkPath,
},
}
}
// get retrieves the manifest, keyed by revision digest. // get retrieves the manifest, keyed by revision digest.
func (rs *revisionStore) get(ctx context.Context, revision digest.Digest) (*manifest.SignedManifest, error) { func (rs *revisionStore) get(ctx context.Context, revision digest.Digest) (*manifest.SignedManifest, error) {
// Ensure that this revision is available in this repository. // Ensure that this revision is available in this repository.
@ -118,3 +105,7 @@ func (rs *revisionStore) put(ctx context.Context, sm *manifest.SignedManifest) (
return revision, nil return revision, nil
} }
func (rs *revisionStore) delete(ctx context.Context, revision digest.Digest) error {
return rs.blobStore.Delete(ctx, revision)
}

View file

@ -115,8 +115,8 @@ func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
return nil return nil
} }
// namedBlobStore returns the namedBlobStore of the signatures for the // linkedBlobStore returns the namedBlobStore of the signatures for the
// manifest with the given digest. Effectively, each singature link path // manifest with the given digest. Effectively, each signature link path
// layout is a unique linked blob store. // layout is a unique linked blob store.
func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Digest) *linkedBlobStore { func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Digest) *linkedBlobStore {
linkpath := func(pm *pathMapper, name string, dgst digest.Digest) (string, error) { linkpath := func(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
@ -131,7 +131,7 @@ func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Di
ctx: ctx, ctx: ctx,
repository: s.repository, repository: s.repository,
blobStore: s.blobStore, blobStore: s.blobStore,
statter: &linkedBlobStatter{ blobAccessController: &linkedBlobStatter{
blobStore: s.blobStore, blobStore: s.blobStore,
repository: s.repository, repository: s.repository,
linkPath: linkpath, linkPath: linkpath,