Manifest and layer soft deletion.

Implement the delete API by implementing soft delete for layers
and blobs by removing link files and updating the blob descriptor
cache.  Deletion is configurable - if it is disabled API calls
will return an unsupported error.

We invalidate the blob descriptor cache by changing the linkedBlobStore's
blobStatter to a blobDescriptorService and naming it blobAccessController.

Delete() is added throughout the relevant API to support this functionality.

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
Richard 2015-05-27 10:52:22 -07:00 committed by Richard Scothern
parent 2445340f37
commit 9c1dd69439
27 changed files with 886 additions and 95 deletions

View file

@ -27,6 +27,9 @@ var (
// ErrBlobInvalidLength returned when the blob has an expected length on // ErrBlobInvalidLength returned when the blob has an expected length on
// commit, meaning mismatched with the descriptor or an invalid value. // commit, meaning mismatched with the descriptor or an invalid value.
ErrBlobInvalidLength = errors.New("blob invalid length") ErrBlobInvalidLength = errors.New("blob invalid length")
// ErrUnsupported returned when an unsupported operation is attempted
ErrUnsupported = errors.New("unsupported operation")
) )
// ErrBlobInvalidDigest returned when digest check fails. // ErrBlobInvalidDigest returned when digest check fails.
@ -70,6 +73,11 @@ type BlobStatter interface {
Stat(ctx context.Context, dgst digest.Digest) (Descriptor, error) Stat(ctx context.Context, dgst digest.Digest) (Descriptor, error)
} }
// BlobDeleter enables deleting blobs from storage.
type BlobDeleter interface {
Delete(ctx context.Context, dgst digest.Digest) error
}
// BlobDescriptorService manages metadata about a blob by digest. Most // BlobDescriptorService manages metadata about a blob by digest. Most
// implementations will not expose such an interface explicitly. Such mappings // implementations will not expose such an interface explicitly. Such mappings
// should be maintained by interacting with the BlobIngester. Hence, this is // should be maintained by interacting with the BlobIngester. Hence, this is
@ -87,6 +95,9 @@ type BlobDescriptorService interface {
// the restriction that the algorithm of the descriptor must match the // the restriction that the algorithm of the descriptor must match the
// canonical algorithm (ie sha256) of the annotator. // canonical algorithm (ie sha256) of the annotator.
SetDescriptor(ctx context.Context, dgst digest.Digest, desc Descriptor) error SetDescriptor(ctx context.Context, dgst digest.Digest, desc Descriptor) error
// Clear enables descriptors to be unlinked
Clear(ctx context.Context, dgst digest.Digest) error
} }
// ReadSeekCloser is the primary reader type for blob data, combining // ReadSeekCloser is the primary reader type for blob data, combining
@ -183,8 +194,9 @@ type BlobService interface {
} }
// BlobStore represent the entire suite of blob related operations. Such an // BlobStore represent the entire suite of blob related operations. Such an
// implementation can access, read, write and serve blobs. // implementation can access, read, write, delete and serve blobs.
type BlobStore interface { type BlobStore interface {
BlobService BlobService
BlobServer BlobServer
BlobDeleter
} }

View file

@ -19,6 +19,8 @@ log:
to: to:
- errors@example.com - errors@example.com
storage: storage:
delete:
enabled: true
cache: cache:
blobdescriptor: redis blobdescriptor: redis
filesystem: filesystem:

View file

@ -240,6 +240,8 @@ func (storage Storage) Type() string {
// allow configuration of maintenance // allow configuration of maintenance
case "cache": case "cache":
// allow configuration of caching // allow configuration of caching
case "delete":
// allow configuration of delete
default: default:
return k return k
} }
@ -271,6 +273,9 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error {
// allow for configuration of maintenance // allow for configuration of maintenance
case "cache": case "cache":
// allow configuration of caching // allow configuration of caching
case "delete":
// allow configuration of delete
default: default:
types = append(types, k) types = append(types, k)
} }

View file

@ -125,6 +125,12 @@ reference and shouldn't be used outside the specification other than to
identify a set of modifications. identify a set of modifications.
<dl> <dl>
<dt>f</dt>
<dd>
<ul>
<li>Specify the delete API for layers and manifests.</li>
</ul>
</dd>
<dt>e</dt> <dt>e</dt>
<dd> <dd>
@ -714,6 +720,22 @@ Note that the upload url will not be available forever. If the upload uuid is
unknown to the registry, a `404 Not Found` response will be returned and the unknown to the registry, a `404 Not Found` response will be returned and the
client must restart the upload process. client must restart the upload process.
### Deleting a Layer
A layer may be deleted from the registry via its `name` and `digest`. A
delete may be issued with the following request format:
DELETE /v2/<name>/blobs/<digest>
If the blob exists and has been successfully deleted, the following response will be issued:
202 Accepted
Content-Length: None
If the blob had already been deleted or did not exist, a `404 Not Found`
response will be issued instead.
#### Pushing an Image Manifest #### Pushing an Image Manifest
Once all of the layers for an image are uploaded, the client can upload the Once all of the layers for an image are uploaded, the client can upload the
@ -1000,6 +1022,7 @@ A list of methods and URIs are covered in the table below:
| PUT | `/v2/<name>/blobs/uploads/<uuid>` | Blob Upload | Complete the upload specified by `uuid`, optionally appending the body as the final chunk. | | PUT | `/v2/<name>/blobs/uploads/<uuid>` | Blob Upload | Complete the upload specified by `uuid`, optionally appending the body as the final chunk. |
| DELETE | `/v2/<name>/blobs/uploads/<uuid>` | Blob Upload | Cancel outstanding upload processes, releasing associated resources. If this is not called, the unfinished uploads will eventually timeout. | | DELETE | `/v2/<name>/blobs/uploads/<uuid>` | Blob Upload | Cancel outstanding upload processes, releasing associated resources. If this is not called, the unfinished uploads will eventually timeout. |
| GET | `/v2/_catalog` | Catalog | Retrieve a sorted, json list of repositories available in the registry. | | GET | `/v2/_catalog` | Catalog | Retrieve a sorted, json list of repositories available in the registry. |
| DELETE | `/v2/<name>/blobs/<digest>` | Blob delete | Delete the blob identified by `name` and `digest`|
The detail for each endpoint is covered in the following sections. The detail for each endpoint is covered in the following sections.
@ -1646,6 +1669,7 @@ The error codes that may be included in the response body are enumerated below:
#### DELETE Manifest #### DELETE Manifest
Delete the manifest identified by `name` and `reference`. Note that a manifest can _only_ be deleted by `digest`. Delete the manifest identified by `name` and `reference`. Note that a manifest can _only_ be deleted by `digest`.

View file

@ -125,6 +125,12 @@ reference and shouldn't be used outside the specification other than to
identify a set of modifications. identify a set of modifications.
<dl> <dl>
<dt>f</dt>
<dd>
<ul>
<li>Specify the delete API for layers and manifests.</li>
</ul>
</dd>
<dt>e</dt> <dt>e</dt>
<dd> <dd>
@ -169,7 +175,6 @@ identify a set of modifications.
<li>Added error code for unsupported operations.</li> <li>Added error code for unsupported operations.</li>
</ul> </ul>
</dd> </dd>
</dl> </dl>
## Overview ## Overview
@ -714,6 +719,25 @@ Note that the upload url will not be available forever. If the upload uuid is
unknown to the registry, a `404 Not Found` response will be returned and the unknown to the registry, a `404 Not Found` response will be returned and the
client must restart the upload process. client must restart the upload process.
### Deleting a Layer
A layer may be deleted from the registry via its `name` and `digest`. A
delete may be issued with the following request format:
DELETE /v2/<name>/blobs/<digest>
If the blob exists and has been successfully deleted, the following response
will be issued:
202 Accepted
Content-Length: None
If the blob had already been deleted or did not exist, a `404 Not Found`
response will be issued instead.
If a layer is deleted which is referenced by a manifest in the registry,
then the complete images will not be resolvable.
#### Pushing an Image Manifest #### Pushing an Image Manifest
Once all of the layers for an image are uploaded, the client can upload the Once all of the layers for an image are uploaded, the client can upload the

View file

@ -18,7 +18,7 @@ import (
func TestListener(t *testing.T) { func TestListener(t *testing.T) {
ctx := context.Background() ctx := context.Background()
registry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider()) registry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), true)
tl := &testListener{ tl := &testListener{
ops: make(map[string]int), ops: make(map[string]int),
} }

View file

@ -354,7 +354,7 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
defer resp.Body.Close() defer resp.Body.Close()
switch resp.StatusCode { switch resp.StatusCode {
case http.StatusOK: case http.StatusAccepted:
return nil return nil
default: default:
return handleErrorResponse(resp) return handleErrorResponse(resp)
@ -366,7 +366,8 @@ type blobs struct {
ub *v2.URLBuilder ub *v2.URLBuilder
client *http.Client client *http.Client
statter distribution.BlobStatter statter distribution.BlobDescriptorService
distribution.BlobDeleter
} }
func sanitizeLocation(location, source string) (string, error) { func sanitizeLocation(location, source string) (string, error) {
@ -484,6 +485,10 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented") panic("not implemented")
} }
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}
type blobStatter struct { type blobStatter struct {
name string name string
ub *v2.URLBuilder ub *v2.URLBuilder
@ -535,3 +540,32 @@ func buildCatalogValues(maxEntries int, last string) url.Values {
return values return values
} }
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst)
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", blobURL, nil)
if err != nil {
return err
}
resp, err := bs.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusAccepted:
return nil
default:
return handleErrorResponse(resp)
}
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
return nil
}

View file

@ -101,6 +101,39 @@ func addTestCatalog(route string, content []byte, link string, m *testutil.Reque
}) })
} }
func TestBlobDelete(t *testing.T) {
dgst, _ := newRandomBlob(1024)
var m testutil.RequestResponseMap
repo := "test.example.com/repo1"
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "DELETE",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
err = l.Delete(ctx, dgst)
if err != nil {
t.Errorf("Error deleting blob: %s", err.Error())
}
}
func TestBlobFetch(t *testing.T) { func TestBlobFetch(t *testing.T) {
d1, b1 := newRandomBlob(1024) d1, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap var m testutil.RequestResponseMap
@ -590,7 +623,7 @@ func TestManifestDelete(t *testing.T) {
Route: "/v2/" + repo + "/manifests/" + dgst1.String(), Route: "/v2/" + repo + "/manifests/" + dgst1.String(),
}, },
Response: testutil.Response{ Response: testutil.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{ Headers: http.Header(map[string][]string{
"Content-Length": {"0"}, "Content-Length": {"0"},
}), }),

View file

@ -33,7 +33,7 @@ import (
// TestCheckAPI hits the base endpoint (/v2/) ensures we return the specified // TestCheckAPI hits the base endpoint (/v2/) ensures we return the specified
// 200 OK response. // 200 OK response.
func TestCheckAPI(t *testing.T) { func TestCheckAPI(t *testing.T) {
env := newTestEnv(t) env := newTestEnv(t, false)
baseURL, err := env.builder.BuildBaseURL() baseURL, err := env.builder.BuildBaseURL()
if err != nil { if err != nil {
@ -65,7 +65,7 @@ func TestCheckAPI(t *testing.T) {
// TestCatalogAPI tests the /v2/_catalog endpoint // TestCatalogAPI tests the /v2/_catalog endpoint
func TestCatalogAPI(t *testing.T) { func TestCatalogAPI(t *testing.T) {
chunkLen := 2 chunkLen := 2
env := newTestEnv(t) env := newTestEnv(t, false)
values := url.Values{ values := url.Values{
"last": []string{""}, "last": []string{""},
@ -239,18 +239,16 @@ func TestURLPrefix(t *testing.T) {
"Content-Type": []string{"application/json; charset=utf-8"}, "Content-Type": []string{"application/json; charset=utf-8"},
"Content-Length": []string{"2"}, "Content-Length": []string{"2"},
}) })
} }
// TestBlobAPI conducts a full test of the of the blob api. type blobArgs struct {
func TestBlobAPI(t *testing.T) { imageName string
// TODO(stevvooe): This test code is complete junk but it should cover the layerFile io.ReadSeeker
// complete flow. This must be broken down and checked against the layerDigest digest.Digest
// specification *before* we submit the final to docker core. tarSumStr string
env := newTestEnv(t) }
imageName := "foo/bar" func makeBlobArgs(t *testing.T) blobArgs {
// "build" our layer file
layerFile, tarSumStr, err := testutil.CreateRandomTarFile() layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil { if err != nil {
t.Fatalf("error creating random layer file: %v", err) t.Fatalf("error creating random layer file: %v", err)
@ -258,6 +256,66 @@ func TestBlobAPI(t *testing.T) {
layerDigest := digest.Digest(tarSumStr) layerDigest := digest.Digest(tarSumStr)
args := blobArgs{
imageName: "foo/bar",
layerFile: layerFile,
layerDigest: layerDigest,
tarSumStr: tarSumStr,
}
return args
}
// TestBlobAPI conducts a full test of the of the blob api.
func TestBlobAPI(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
testBlobAPI(t, env, args)
deleteEnabled = true
env = newTestEnv(t, deleteEnabled)
args = makeBlobArgs(t)
testBlobAPI(t, env, args)
}
func TestBlobDelete(t *testing.T) {
deleteEnabled := true
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
env = testBlobAPI(t, env, args)
testBlobDelete(t, env, args)
}
func TestBlobDeleteDisabled(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeBlobArgs(t)
imageName := args.imageName
layerDigest := args.layerDigest
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf("error building url: %v", err)
}
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting when disabled: %v", err)
}
checkResponse(t, "status of disabled delete", resp, http.StatusMethodNotAllowed)
}
func testBlobAPI(t *testing.T, env *testEnv, args blobArgs) *testEnv {
// TODO(stevvooe): This test code is complete junk but it should cover the
// complete flow. This must be broken down and checked against the
// specification *before* we submit the final to docker core.
imageName := args.imageName
layerFile := args.layerFile
layerDigest := args.layerDigest
// ----------------------------------- // -----------------------------------
// Test fetch for non-existent content // Test fetch for non-existent content
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest) layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
@ -372,6 +430,7 @@ func TestBlobAPI(t *testing.T) {
uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName) uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName)
uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength) uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength)
finishUpload(t, env.builder, imageName, uploadURLBase, dgst) finishUpload(t, env.builder, imageName, uploadURLBase, dgst)
// ------------------------ // ------------------------
// Use a head request to see if the layer exists. // Use a head request to see if the layer exists.
resp, err = http.Head(layerURL) resp, err = http.Head(layerURL)
@ -459,12 +518,188 @@ func TestBlobAPI(t *testing.T) {
// Missing tests: // Missing tests:
// - Upload the same tarsum file under and different repository and // - Upload the same tarsum file under and different repository and
// ensure the content remains uncorrupted. // ensure the content remains uncorrupted.
return env
}
func testBlobDelete(t *testing.T, env *testEnv, args blobArgs) {
// Upload a layer
imageName := args.imageName
layerFile := args.layerFile
layerDigest := args.layerDigest
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf(err.Error())
}
// ---------------
// Delete a layer
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{"0"},
})
// ---------------
// Try and get it back
// Use a head request to see if the layer exists.
resp, err = http.Head(layerURL)
if err != nil {
t.Fatalf("unexpected error checking head on existing layer: %v", err)
}
checkResponse(t, "checking existence of deleted layer", resp, http.StatusNotFound)
// Delete already deleted layer
resp, err = httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer", resp, http.StatusNotFound)
// ----------------
// Attempt to delete a layer with an invalid digest
badURL := strings.Replace(layerURL, "tarsum", "trsum", 1)
resp, err = httpDelete(badURL)
if err != nil {
t.Fatalf("unexpected error fetching layer: %v", err)
}
checkResponse(t, "deleting layer bad digest", resp, http.StatusBadRequest)
// ----------------
// Reupload previously deleted blob
layerFile.Seek(0, os.SEEK_SET)
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
layerFile.Seek(0, os.SEEK_SET)
canonicalDigester := digest.Canonical.New()
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
t.Fatalf("error copying to digest: %v", err)
}
canonicalDigest := canonicalDigester.Digest()
// ------------------------
// Use a head request to see if it exists
resp, err = http.Head(layerURL)
if err != nil {
t.Fatalf("unexpected error checking head on existing layer: %v", err)
}
layerLength, _ := layerFile.Seek(0, os.SEEK_END)
checkResponse(t, "checking head on reuploaded layer", resp, http.StatusOK)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{fmt.Sprint(layerLength)},
"Docker-Content-Digest": []string{canonicalDigest.String()},
})
}
func TestDeleteDisabled(t *testing.T) {
env := newTestEnv(t, false)
imageName := "foo/bar"
// "build" our layer file
layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random layer file: %v", err)
}
layerDigest := digest.Digest(tarSumStr)
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
if err != nil {
t.Fatalf("Error building blob URL")
}
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
resp, err := httpDelete(layerURL)
if err != nil {
t.Fatalf("unexpected error deleting layer: %v", err)
}
checkResponse(t, "deleting layer with delete disabled", resp, http.StatusMethodNotAllowed)
}
func httpDelete(url string) (*http.Response, error) {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// defer resp.Body.Close()
return resp, err
}
type manifestArgs struct {
imageName string
signedManifest *manifest.SignedManifest
dgst digest.Digest
}
func makeManifestArgs(t *testing.T) manifestArgs {
args := manifestArgs{
imageName: "foo/bar",
}
return args
} }
func TestManifestAPI(t *testing.T) { func TestManifestAPI(t *testing.T) {
env := newTestEnv(t) deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
testManifestAPI(t, env, args)
imageName := "foo/bar" deleteEnabled = true
env = newTestEnv(t, deleteEnabled)
args = makeManifestArgs(t)
testManifestAPI(t, env, args)
}
func TestManifestDelete(t *testing.T) {
deleteEnabled := true
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
env, args = testManifestAPI(t, env, args)
testManifestDelete(t, env, args)
}
func TestManifestDeleteDisabled(t *testing.T) {
deleteEnabled := false
env := newTestEnv(t, deleteEnabled)
args := makeManifestArgs(t)
testManifestDeleteDisabled(t, env, args)
}
func testManifestDeleteDisabled(t *testing.T, env *testEnv, args manifestArgs) *testEnv {
imageName := args.imageName
manifestURL, err := env.builder.BuildManifestURL(imageName, digest.DigestSha256EmptyTar)
if err != nil {
t.Fatalf("unexpected error getting manifest url: %v", err)
}
resp, err := httpDelete(manifestURL)
if err != nil {
t.Fatalf("unexpected error deleting manifest %v", err)
}
defer resp.Body.Close()
checkResponse(t, "status of disabled delete of manifest", resp, http.StatusMethodNotAllowed)
return nil
}
func testManifestAPI(t *testing.T, env *testEnv, args manifestArgs) (*testEnv, manifestArgs) {
imageName := args.imageName
tag := "thetag" tag := "thetag"
manifestURL, err := env.builder.BuildManifestURL(imageName, tag) manifestURL, err := env.builder.BuildManifestURL(imageName, tag)
@ -567,6 +802,9 @@ func TestManifestAPI(t *testing.T) {
dgst, err := digest.FromBytes(payload) dgst, err := digest.FromBytes(payload)
checkErr(t, err, "digesting manifest") checkErr(t, err, "digesting manifest")
args.signedManifest = signedManifest
args.dgst = dgst
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String()) manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
checkErr(t, err, "building manifest url") checkErr(t, err, "building manifest url")
@ -687,6 +925,70 @@ func TestManifestAPI(t *testing.T) {
if tagsResponse.Tags[0] != tag { if tagsResponse.Tags[0] != tag {
t.Fatalf("tag not as expected: %q != %q", tagsResponse.Tags[0], tag) t.Fatalf("tag not as expected: %q != %q", tagsResponse.Tags[0], tag)
} }
return env, args
}
func testManifestDelete(t *testing.T, env *testEnv, args manifestArgs) {
imageName := args.imageName
dgst := args.dgst
signedManifest := args.signedManifest
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
// ---------------
// Delete by digest
resp, err := httpDelete(manifestDigestURL)
checkErr(t, err, "deleting manifest by digest")
checkResponse(t, "deleting manifest", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Content-Length": []string{"0"},
})
// ---------------
// Attempt to fetch deleted manifest
resp, err = http.Get(manifestDigestURL)
checkErr(t, err, "fetching deleted manifest by digest")
defer resp.Body.Close()
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
// ---------------
// Delete already deleted manifest by digest
resp, err = httpDelete(manifestDigestURL)
checkErr(t, err, "re-deleting manifest by digest")
checkResponse(t, "re-deleting manifest", resp, http.StatusNotFound)
// --------------------
// Re-upload manifest by digest
resp = putManifest(t, "putting signed manifest", manifestDigestURL, signedManifest)
checkResponse(t, "putting signed manifest", resp, http.StatusAccepted)
checkHeaders(t, resp, http.Header{
"Location": []string{manifestDigestURL},
"Docker-Content-Digest": []string{dgst.String()},
})
// ---------------
// Attempt to fetch re-uploaded deleted digest
resp, err = http.Get(manifestDigestURL)
checkErr(t, err, "fetching re-uploaded manifest by digest")
defer resp.Body.Close()
checkResponse(t, "fetching re-uploaded manifest", resp, http.StatusOK)
checkHeaders(t, resp, http.Header{
"Docker-Content-Digest": []string{dgst.String()},
})
// ---------------
// Attempt to delete an unknown manifest
unknownDigest := "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
unknownManifestDigestURL, err := env.builder.BuildManifestURL(imageName, unknownDigest)
checkErr(t, err, "building unknown manifest url")
resp, err = httpDelete(unknownManifestDigestURL)
checkErr(t, err, "delting unknown manifest by digest")
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
} }
type testEnv struct { type testEnv struct {
@ -698,10 +1000,11 @@ type testEnv struct {
builder *v2.URLBuilder builder *v2.URLBuilder
} }
func newTestEnv(t *testing.T) *testEnv { func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
config := configuration.Configuration{ config := configuration.Configuration{
Storage: configuration.Storage{ Storage: configuration.Storage{
"inmemory": configuration.Parameters{}, "inmemory": configuration.Parameters{},
"delete": configuration.Parameters{"enabled": deleteEnabled},
}, },
} }
@ -1005,7 +1308,7 @@ func checkHeaders(t *testing.T, resp *http.Response, headers http.Header) {
for _, hv := range resp.Header[k] { for _, hv := range resp.Header[k] {
if hv != v { if hv != v {
t.Fatalf("%v header value not matched in response: %q != %q", k, hv, v) t.Fatalf("%+v %v header value not matched in response: %q != %q", resp.Header, k, hv, v)
} }
} }
} }

View file

@ -106,6 +106,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
app.configureRedis(&configuration) app.configureRedis(&configuration)
app.configureLogHook(&configuration) app.configureLogHook(&configuration)
deleteEnabled := false
if d, ok := configuration.Storage["delete"]; ok {
e, ok := d["enabled"]
if ok {
if deleteEnabled, ok = e.(bool); !ok {
deleteEnabled = false
}
}
}
// configure storage caches // configure storage caches
if cc, ok := configuration.Storage["cache"]; ok { if cc, ok := configuration.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"] v, ok := cc["blobdescriptor"]
@ -119,10 +129,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.redis == nil { if app.redis == nil {
panic("redis configuration required to use for layerinfo cache") panic("redis configuration required to use for layerinfo cache")
} }
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)) app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled)
ctxu.GetLogger(app).Infof("using redis blob descriptor cache") ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory": case "inmemory":
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider()) app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled)
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
default: default:
if v != "" { if v != "" {
@ -133,7 +143,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.registry == nil { if app.registry == nil {
// configure the registry if no cache section is available. // configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil) app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled)
} }
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])

View file

@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) {
Context: ctx, Context: ctx,
router: v2.Router(), router: v2.Router(),
driver: driver, driver: driver,
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider()), registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true),
} }
server := httptest.NewServer(app) server := httptest.NewServer(app)
router := v2.Router() router := v2.Router()

View file

@ -35,6 +35,7 @@ func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
return handlers.MethodHandler{ return handlers.MethodHandler{
"GET": http.HandlerFunc(blobHandler.GetBlob), "GET": http.HandlerFunc(blobHandler.GetBlob),
"HEAD": http.HandlerFunc(blobHandler.GetBlob), "HEAD": http.HandlerFunc(blobHandler.GetBlob),
"DELETE": http.HandlerFunc(blobHandler.DeleteBlob),
} }
} }
@ -66,3 +67,27 @@ func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
// DeleteBlob deletes a layer blob
func (bh *blobHandler) DeleteBlob(w http.ResponseWriter, r *http.Request) {
context.GetLogger(bh).Debug("DeleteBlob")
blobs := bh.Repository.Blobs(bh)
err := blobs.Delete(bh, bh.Digest)
if err != nil {
switch err {
case distribution.ErrBlobUnknown:
w.WriteHeader(http.StatusNotFound)
bh.Errors = append(bh.Errors, v2.ErrorCodeBlobUnknown)
case distribution.ErrUnsupported:
w.WriteHeader(http.StatusMethodNotAllowed)
bh.Errors = append(bh.Errors, v2.ErrorCodeUnsupported)
default:
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown)
}
return
}
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusAccepted)
}

View file

@ -186,16 +186,38 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }
// DeleteImageManifest removes the image with the given tag from the registry. // DeleteImageManifest removes the manifest with the given digest from the registry.
func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) { func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) {
ctxu.GetLogger(imh).Debug("DeleteImageManifest") ctxu.GetLogger(imh).Debug("DeleteImageManifest")
// TODO(stevvooe): Unfortunately, at this point, manifest deletes are manifests, err := imh.Repository.Manifests(imh)
// unsupported. There are issues with schema version 1 that make removing if err != nil {
// tag index entries a serious problem in eventually consistent storage. imh.Errors = append(imh.Errors, err)
// Once we work out schema version 2, the full deletion system will be return
// worked out and we can add support back. }
err = manifests.Delete(imh.Digest)
if err != nil {
switch err {
case digest.ErrDigestUnsupported:
case digest.ErrDigestInvalidFormat:
imh.Errors = append(imh.Errors, v2.ErrorCodeDigestInvalid)
return
case distribution.ErrBlobUnknown:
imh.Errors = append(imh.Errors, v2.ErrorCodeManifestUnknown)
w.WriteHeader(http.StatusNotFound)
return
case distribution.ErrUnsupported:
imh.Errors = append(imh.Errors, v2.ErrorCodeUnsupported) imh.Errors = append(imh.Errors, v2.ErrorCodeUnsupported)
w.WriteHeader(http.StatusMethodNotAllowed)
default:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown)
w.WriteHeader(http.StatusBadRequest)
return
}
}
w.WriteHeader(http.StatusAccepted)
} }
// digestManifest takes a digest of the given manifest. This belongs somewhere // digestManifest takes a digest of the given manifest. This belongs somewhere

View file

@ -21,13 +21,11 @@ import (
// error paths that might be seen during an upload. // error paths that might be seen during an upload.
func TestSimpleBlobUpload(t *testing.T) { func TestSimpleBlobUpload(t *testing.T) {
randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile() randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil { if err != nil {
t.Fatalf("error creating random reader: %v", err) t.Fatalf("error creating random reader: %v", err)
} }
dgst := digest.Digest(tarSumStr) dgst := digest.Digest(tarSumStr)
if err != nil { if err != nil {
t.Fatalf("error allocating upload store: %v", err) t.Fatalf("error allocating upload store: %v", err)
} }
@ -35,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -139,6 +137,72 @@ func TestSimpleBlobUpload(t *testing.T) {
if digest.NewDigest("sha256", h) != sha256Digest { if digest.NewDigest("sha256", h) != sha256Digest {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest) t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest)
} }
// Delete a blob
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err := bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %s", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
_, err = bs.Open(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected success opening deleted blob for read")
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type getting deleted manifest: %#v", err)
}
// Re-upload the blob
randomBlob, err := ioutil.ReadAll(randomDataReader)
if err != nil {
t.Fatalf("Error reading all of blob %s", err.Error())
}
expectedDigest, err := digest.FromBytes(randomBlob)
if err != nil {
t.Fatalf("Error getting digest from bytes: %s", err)
}
simpleUpload(t, bs, randomBlob, expectedDigest)
d, err = bs.Stat(ctx, expectedDigest)
if err != nil {
t.Errorf("unexpected error stat-ing blob")
}
if d.Digest != expectedDigest {
t.Errorf("Mismatching digest with restored blob")
}
_, err = bs.Open(ctx, expectedDigest)
if err != nil {
t.Errorf("Unexpected error opening blob")
}
// Reuse state to test delete with a delete-disabled registry
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
repository, err = registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs = repository.Blobs(ctx)
err = bs.Delete(ctx, desc.Digest)
if err == nil {
t.Errorf("Unexpected success deleting while disabled")
}
} }
// TestSimpleBlobRead just creates a simple blob file and ensures that basic // TestSimpleBlobRead just creates a simple blob file and ensures that basic
@ -148,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -252,19 +316,24 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background() ctx := context.Background()
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repository, err := registry.Repository(ctx, imageName) repository, err := registry.Repository(ctx, imageName)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
} }
bs := repository.Blobs(ctx) bs := repository.Blobs(ctx)
simpleUpload(t, bs, []byte{}, digest.DigestSha256EmptyTar)
}
func simpleUpload(t *testing.T, bs distribution.BlobIngester, blob []byte, expectedDigest digest.Digest) {
ctx := context.Background()
wr, err := bs.Create(ctx) wr, err := bs.Create(ctx)
if err != nil { if err != nil {
t.Fatalf("unexpected error starting upload: %v", err) t.Fatalf("unexpected error starting upload: %v", err)
} }
nn, err := io.Copy(wr, bytes.NewReader([]byte{})) nn, err := io.Copy(wr, bytes.NewReader(blob))
if err != nil { if err != nil {
t.Fatalf("error copying into blob writer: %v", err) t.Fatalf("error copying into blob writer: %v", err)
} }
@ -273,12 +342,12 @@ func TestLayerUploadZeroLength(t *testing.T) {
t.Fatalf("unexpected number of bytes copied: %v > 0", nn) t.Fatalf("unexpected number of bytes copied: %v > 0", nn)
} }
dgst, err := digest.FromReader(bytes.NewReader([]byte{})) dgst, err := digest.FromReader(bytes.NewReader(blob))
if err != nil { if err != nil {
t.Fatalf("error getting zero digest: %v", err) t.Fatalf("error getting digest: %v", err)
} }
if dgst != digest.DigestSha256EmptyTar { if dgst != expectedDigest {
// sanity check on zero digest // sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar) t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
} }

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver"
) )
// blobStore implements a the read side of the blob store interface over a // blobStore implements the read side of the blob store interface over a
// driver without enforcing per-repository membership. This object is // driver without enforcing per-repository membership. This object is
// intentionally a leaky abstraction, providing utility methods that support // intentionally a leaky abstraction, providing utility methods that support
// creating and traversing backend links. // creating and traversing backend links.
@ -143,7 +143,7 @@ type blobStatter struct {
pm *pathMapper pm *pathMapper
} }
var _ distribution.BlobStatter = &blobStatter{} var _ distribution.BlobDescriptorService = &blobStatter{}
// Stat implements BlobStatter.Stat by returning the descriptor for the blob // Stat implements BlobStatter.Stat by returning the descriptor for the blob
// in the main blob store. If this method returns successfully, there is // in the main blob store. If this method returns successfully, there is
@ -188,3 +188,11 @@ func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distributi
Digest: dgst, Digest: dgst,
}, nil }, nil
} }
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
return distribution.ErrUnsupported
}

View file

@ -70,6 +70,11 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
if err != nil {
return distribution.Descriptor{}, err
}
return canonical, nil return canonical, nil
} }

View file

@ -26,13 +26,13 @@ type MetricsTracker interface {
type cachedBlobStatter struct { type cachedBlobStatter struct {
cache distribution.BlobDescriptorService cache distribution.BlobDescriptorService
backend distribution.BlobStatter backend distribution.BlobDescriptorService
tracker MetricsTracker tracker MetricsTracker
} }
// NewCachedBlobStatter creates a new statter which prefers a cache and // NewCachedBlobStatter creates a new statter which prefers a cache and
// falls back to a backend. // falls back to a backend.
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobStatter) distribution.BlobStatter { func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService) distribution.BlobDescriptorService {
return &cachedBlobStatter{ return &cachedBlobStatter{
cache: cache, cache: cache,
backend: backend, backend: backend,
@ -41,7 +41,7 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist
// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and // NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
// falls back to a backend. Hits and misses will send to the tracker. // falls back to a backend. Hits and misses will send to the tracker.
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobStatter, tracker MetricsTracker) distribution.BlobStatter { func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter {
return &cachedBlobStatter{ return &cachedBlobStatter{
cache: cache, cache: cache,
backend: backend, backend: backend,
@ -77,4 +77,25 @@ fallback:
} }
return desc, err return desc, err
}
func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
err := cbds.cache.Clear(ctx, dgst)
if err != nil {
return err
}
err = cbds.backend.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
}
return nil
} }

View file

@ -44,6 +44,10 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) Stat(ctx context.Context, dgs
return imbdcp.global.Stat(ctx, dgst) return imbdcp.global.Stat(ctx, dgst)
} }
func (imbdcp *inMemoryBlobDescriptorCacheProvider) Clear(ctx context.Context, dgst digest.Digest) error {
return imbdcp.global.Clear(ctx, dgst)
}
func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
_, err := imbdcp.Stat(ctx, dgst) _, err := imbdcp.Stat(ctx, dgst)
if err == distribution.ErrBlobUnknown { if err == distribution.ErrBlobUnknown {
@ -80,6 +84,14 @@ func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Co
return rsimbdcp.repository.Stat(ctx, dgst) return rsimbdcp.repository.Stat(ctx, dgst)
} }
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
if rsimbdcp.repository == nil {
return distribution.ErrBlobUnknown
}
return rsimbdcp.repository.Clear(ctx, dgst)
}
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if rsimbdcp.repository == nil { if rsimbdcp.repository == nil {
// allocate map since we are setting it now. // allocate map since we are setting it now.
@ -133,6 +145,14 @@ func (mbdc *mapBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest
return desc, nil return desc, nil
} }
func (mbdc *mapBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
mbdc.mu.Lock()
defer mbdc.mu.Unlock()
delete(mbdc.descriptors, dgst)
return nil
}
func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil { if err := dgst.Validate(); err != nil {
return err return err

View file

@ -12,7 +12,7 @@ import (
) )
// redisBlobStatService provides an implementation of // redisBlobStatService provides an implementation of
// BlobDescriptorCacheProvider based on redis. Blob descritors are stored in // BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
// two parts. The first provide fast access to repository membership through a // two parts. The first provide fast access to repository membership through a
// redis set for each repo. The second is a redis hash keyed by the digest of // redis set for each repo. The second is a redis hash keyed by the digest of
// the layer, providing path, length and mediatype information. There is also // the layer, providing path, length and mediatype information. There is also
@ -63,6 +63,27 @@ func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Di
return rbds.stat(ctx, conn, dgst) return rbds.stat(ctx, conn, dgst)
} }
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rbds.pool.Get()
defer conn.Close()
// Not atomic in redis <= 2.3
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype")
if err != nil {
return err
}
if reply == 0 {
return distribution.ErrBlobUnknown
}
return nil
}
// stat provides an internal stat call that takes a connection parameter. This // stat provides an internal stat call that takes a connection parameter. This
// allows some internal management of the connection scope. // allows some internal management of the connection scope.
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) { func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
@ -170,6 +191,28 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte
return upstream, nil return upstream, nil
} }
// Clear removes the descriptor from the cache and forwards to the upstream descriptor store
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
if err := dgst.Validate(); err != nil {
return err
}
conn := rsrbds.upstream.pool.Get()
defer conn.Close()
// Check membership to repository first
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
if err != nil {
return err
}
if !member {
return distribution.ErrBlobUnknown
}
return rsrbds.upstream.Clear(ctx, dgst)
}
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
if err := dgst.Validate(); err != nil { if err := dgst.Validate(); err != nil {
return err return err

View file

@ -139,3 +139,40 @@ func checkBlobDescriptorCacheSetAndRead(t *testing.T, ctx context.Context, provi
t.Fatalf("unexpected descriptor: %#v != %#v", desc, expected) t.Fatalf("unexpected descriptor: %#v != %#v", desc, expected)
} }
} }
func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider BlobDescriptorCacheProvider) {
localDigest := digest.Digest("sha384:abc")
expected := distribution.Descriptor{
Digest: "sha256:abc",
Size: 10,
MediaType: "application/octet-stream"}
cache, err := provider.RepositoryScoped("foo/bar")
if err != nil {
t.Fatalf("unexpected error getting scoped cache: %v", err)
}
if err := cache.SetDescriptor(ctx, localDigest, expected); err != nil {
t.Fatalf("error setting descriptor: %v", err)
}
desc, err := cache.Stat(ctx, localDigest)
if err != nil {
t.Fatalf("unexpected error statting fake2:abc: %v", err)
}
if expected != desc {
t.Fatalf("unexpected descriptor: %#v != %#v", expected, desc)
}
err = cache.Clear(ctx, localDigest)
if err != nil {
t.Fatalf("unexpected error deleting descriptor")
}
nonExistantDigest := digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
err = cache.Clear(ctx, nonExistantDigest)
if err == nil {
t.Fatalf("expected error deleting unknown descriptor")
}
}

View file

@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv {
d := inmemory.New() d := inmemory.New()
c := []byte("") c := []byte("")
ctx := context.Background() ctx := context.Background()
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{}) rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{})
repos := []string{ repos := []string{

View file

@ -17,9 +17,10 @@ import (
type linkedBlobStore struct { type linkedBlobStore struct {
*blobStore *blobStore
blobServer distribution.BlobServer blobServer distribution.BlobServer
statter distribution.BlobStatter blobAccessController distribution.BlobDescriptorService
repository distribution.Repository repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
// linkPath allows one to control the repository blob link set to which // linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer // the blob store dispatches. This is required because manifest and layer
@ -31,7 +32,7 @@ type linkedBlobStore struct {
var _ distribution.BlobStore = &linkedBlobStore{} var _ distribution.BlobStore = &linkedBlobStore{}
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return lbs.statter.Stat(ctx, dgst) return lbs.blobAccessController.Stat(ctx, dgst)
} }
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
@ -67,6 +68,10 @@ func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter
} }
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
dgst, err := digest.FromBytes(p)
if err != nil {
return distribution.Descriptor{}, err
}
// Place the data in the blob store first. // Place the data in the blob store first.
desc, err := lbs.blobStore.Put(ctx, mediaType, p) desc, err := lbs.blobStore.Put(ctx, mediaType, p)
if err != nil { if err != nil {
@ -74,6 +79,10 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte)
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
return distribution.Descriptor{}, err
}
// TODO(stevvooe): Write out mediatype if incoming differs from what is // TODO(stevvooe): Write out mediatype if incoming differs from what is
// returned by Put above. Note that we should allow updates for a given // returned by Put above. Note that we should allow updates for a given
// repository. // repository.
@ -153,7 +162,26 @@ func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution
return lbs.newBlobUpload(ctx, id, path, startedAt) return lbs.newBlobUpload(ctx, id, path, startedAt)
} }
// newLayerUpload allocates a new upload controller with the given state. func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
if !lbs.deleteEnabled {
return distribution.ErrUnsupported
}
// Ensure the blob is available for deletion
_, err := lbs.blobAccessController.Stat(ctx, dgst)
if err != nil {
return err
}
err = lbs.blobAccessController.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
// newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
fw, err := newFileWriter(ctx, lbs.driver, path) fw, err := newFileWriter(ctx, lbs.driver, path)
if err != nil { if err != nil {
@ -213,7 +241,7 @@ type linkedBlobStatter struct {
linkPath func(pm *pathMapper, name string, dgst digest.Digest) (string, error) linkPath func(pm *pathMapper, name string, dgst digest.Digest) (string, error)
} }
var _ distribution.BlobStatter = &linkedBlobStatter{} var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst) blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
@ -246,6 +274,20 @@ func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (dis
return lbs.blobStore.statter.Stat(ctx, target) return lbs.blobStore.statter.Stat(ctx, target)
} }
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
if err != nil {
return err
}
return lbs.blobStore.driver.Delete(ctx, blobLinkPath)
}
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
// The canonical descriptor for a blob is set at the commit phase of upload
return nil
}
// blobLinkPath provides the path to the blob link, also known as layers. // blobLinkPath provides the path to the blob link, also known as layers.
func blobLinkPath(pm *pathMapper, name string, dgst digest.Digest) (string, error) { func blobLinkPath(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
return pm.path(layerLinkPathSpec{name: name, digest: dgst}) return pm.path(layerLinkPathSpec{name: name, digest: dgst})

View file

@ -69,8 +69,8 @@ func (ms *manifestStore) Put(manifest *manifest.SignedManifest) error {
// Delete removes the revision of the specified manfiest. // Delete removes the revision of the specified manfiest.
func (ms *manifestStore) Delete(dgst digest.Digest) error { func (ms *manifestStore) Delete(dgst digest.Digest) error {
context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete - unsupported") context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete")
return fmt.Errorf("deletion of manifests not supported") return ms.revisionStore.delete(ms.ctx, dgst)
} }
func (ms *manifestStore) Tags() ([]string, error) { func (ms *manifestStore) Tags() ([]string, error) {

View file

@ -29,8 +29,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background() ctx := context.Background()
driver := inmemory.New() driver := inmemory.New()
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider()) registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
repo, err := registry.Repository(ctx, name) repo, err := registry.Repository(ctx, name)
if err != nil { if err != nil {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
@ -156,6 +155,7 @@ func TestManifestStorage(t *testing.T) {
} }
fetchedManifest, err := ms.GetByTag(env.tag) fetchedManifest, err := ms.GetByTag(env.tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err) t.Fatalf("unexpected error fetching manifest: %v", err)
} }
@ -296,11 +296,68 @@ func TestManifestStorage(t *testing.T) {
} }
} }
// TODO(stevvooe): Currently, deletes are not supported due to some // Test deleting manifests
// complexity around managing tag indexes. We'll add this support back in err = ms.Delete(dgst)
// when the manifest format has settled. For now, we expect an error for if err != nil {
// all deletes.
if err := ms.Delete(dgst); err == nil {
t.Fatalf("unexpected an error deleting manifest by digest: %v", err) t.Fatalf("unexpected an error deleting manifest by digest: %v", err)
} }
exists, err = ms.Exists(dgst)
if err != nil {
t.Fatalf("Error querying manifest existence")
}
if exists {
t.Errorf("Deleted manifest should not exist")
}
deletedManifest, err := ms.Get(dgst)
if err == nil {
t.Errorf("Unexpected success getting deleted manifest")
}
switch err.(type) {
case distribution.ErrManifestUnknownRevision:
break
default:
t.Errorf("Unexpected error getting deleted manifest: %s", reflect.ValueOf(err).Type())
}
if deletedManifest != nil {
t.Errorf("Deleted manifest get returned non-nil")
}
// Re-upload should restore manifest to a good state
err = ms.Put(sm)
if err != nil {
t.Errorf("Error re-uploading deleted manifest")
}
exists, err = ms.Exists(dgst)
if err != nil {
t.Fatalf("Error querying manifest existence")
}
if !exists {
t.Errorf("Restored manifest should exist")
}
deletedManifest, err = ms.Get(dgst)
if err != nil {
t.Errorf("Unexpected error getting manifest")
}
if deletedManifest == nil {
t.Errorf("Deleted manifest get returned non-nil")
}
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
repo, err := r.Repository(ctx, env.name)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
ms, err = repo.Manifests(ctx)
if err != nil {
t.Fatal(err)
}
err = ms.Delete(dgst)
if err == nil {
t.Errorf("Unexpected success deleting while disabled")
}
} }

View file

@ -15,15 +15,16 @@ type registry struct {
blobServer distribution.BlobServer blobServer distribution.BlobServer
statter distribution.BlobStatter // global statter service. statter distribution.BlobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
} }
// NewRegistryWithDriver creates a new registry instance from the provided // NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is // driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate. // cheap to allocate.
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider) distribution.Namespace { func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool) distribution.Namespace {
// create global statter, with cache. // create global statter, with cache.
var statter distribution.BlobStatter = &blobStatter{ var statter distribution.BlobDescriptorService = &blobStatter{
driver: driver, driver: driver,
pm: defaultPathMapper, pm: defaultPathMapper,
} }
@ -46,6 +47,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
pathFn: bs.path, pathFn: bs.path,
}, },
blobDescriptorCacheProvider: blobDescriptorCacheProvider, blobDescriptorCacheProvider: blobDescriptorCacheProvider,
deleteEnabled: deleteEnabled,
} }
} }
@ -110,7 +112,8 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
ctx: ctx, ctx: ctx,
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
statter: &linkedBlobStatter{ deleteEnabled: repo.registry.deleteEnabled,
blobAccessController: &linkedBlobStatter{
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
linkPath: manifestRevisionLinkPath, linkPath: manifestRevisionLinkPath,
@ -143,7 +146,7 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
// may be context sensitive in the future. The instance should be used similar // may be context sensitive in the future. The instance should be used similar
// to a request local. // to a request local.
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore { func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
var statter distribution.BlobStatter = &linkedBlobStatter{ var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore, blobStore: repo.blobStore,
repository: repo, repository: repo,
linkPath: blobLinkPath, linkPath: blobLinkPath,
@ -156,13 +159,14 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
return &linkedBlobStore{ return &linkedBlobStore{
blobStore: repo.blobStore, blobStore: repo.blobStore,
blobServer: repo.blobServer, blobServer: repo.blobServer,
statter: statter, blobAccessController: statter,
repository: repo, repository: repo,
ctx: ctx, ctx: ctx,
// TODO(stevvooe): linkPath limits this blob store to only layers. // TODO(stevvooe): linkPath limits this blob store to only layers.
// This instance cannot be used for manifest checks. // This instance cannot be used for manifest checks.
linkPath: blobLinkPath, linkPath: blobLinkPath,
deleteEnabled: repo.registry.deleteEnabled,
} }
} }

View file

@ -17,19 +17,6 @@ type revisionStore struct {
ctx context.Context ctx context.Context
} }
func newRevisionStore(ctx context.Context, repo *repository, blobStore *blobStore) *revisionStore {
return &revisionStore{
ctx: ctx,
repository: repo,
blobStore: &linkedBlobStore{
blobStore: blobStore,
repository: repo,
ctx: ctx,
linkPath: manifestRevisionLinkPath,
},
}
}
// get retrieves the manifest, keyed by revision digest. // get retrieves the manifest, keyed by revision digest.
func (rs *revisionStore) get(ctx context.Context, revision digest.Digest) (*manifest.SignedManifest, error) { func (rs *revisionStore) get(ctx context.Context, revision digest.Digest) (*manifest.SignedManifest, error) {
// Ensure that this revision is available in this repository. // Ensure that this revision is available in this repository.
@ -118,3 +105,7 @@ func (rs *revisionStore) put(ctx context.Context, sm *manifest.SignedManifest) (
return revision, nil return revision, nil
} }
func (rs *revisionStore) delete(ctx context.Context, revision digest.Digest) error {
return rs.blobStore.Delete(ctx, revision)
}

View file

@ -115,8 +115,8 @@ func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
return nil return nil
} }
// namedBlobStore returns the namedBlobStore of the signatures for the // linkedBlobStore returns the namedBlobStore of the signatures for the
// manifest with the given digest. Effectively, each singature link path // manifest with the given digest. Effectively, each signature link path
// layout is a unique linked blob store. // layout is a unique linked blob store.
func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Digest) *linkedBlobStore { func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Digest) *linkedBlobStore {
linkpath := func(pm *pathMapper, name string, dgst digest.Digest) (string, error) { linkpath := func(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
@ -131,7 +131,7 @@ func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Di
ctx: ctx, ctx: ctx,
repository: s.repository, repository: s.repository,
blobStore: s.blobStore, blobStore: s.blobStore,
statter: &linkedBlobStatter{ blobAccessController: &linkedBlobStatter{
blobStore: s.blobStore, blobStore: s.blobStore,
repository: s.repository, repository: s.repository,
linkPath: linkpath, linkPath: linkpath,