Manifest and layer soft deletion.
Implement the delete API by implementing soft delete for layers and blobs by removing link files and updating the blob descriptor cache. Deletion is configurable - if it is disabled API calls will return an unsupported error. We invalidate the blob descriptor cache by changing the linkedBlobStore's blobStatter to a blobDescriptorService and naming it blobAccessController. Delete() is added throughout the relevant API to support this functionality. Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
parent
7dbe35176d
commit
390bb97a88
21 changed files with 816 additions and 92 deletions
|
@ -354,7 +354,7 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
|
|||
defer resp.Body.Close()
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
case http.StatusAccepted:
|
||||
return nil
|
||||
default:
|
||||
return handleErrorResponse(resp)
|
||||
|
@ -366,7 +366,8 @@ type blobs struct {
|
|||
ub *v2.URLBuilder
|
||||
client *http.Client
|
||||
|
||||
statter distribution.BlobStatter
|
||||
statter distribution.BlobDescriptorService
|
||||
distribution.BlobDeleter
|
||||
}
|
||||
|
||||
func sanitizeLocation(location, source string) (string, error) {
|
||||
|
@ -484,6 +485,10 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return bs.statter.Clear(ctx, dgst)
|
||||
}
|
||||
|
||||
type blobStatter struct {
|
||||
name string
|
||||
ub *v2.URLBuilder
|
||||
|
@ -535,3 +540,32 @@ func buildCatalogValues(maxEntries int, last string) url.Values {
|
|||
|
||||
return values
|
||||
}
|
||||
|
||||
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("DELETE", blobURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := bs.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusAccepted:
|
||||
return nil
|
||||
default:
|
||||
return handleErrorResponse(resp)
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -101,6 +101,39 @@ func addTestCatalog(route string, content []byte, link string, m *testutil.Reque
|
|||
})
|
||||
}
|
||||
|
||||
func TestBlobDelete(t *testing.T) {
|
||||
dgst, _ := newRandomBlob(1024)
|
||||
var m testutil.RequestResponseMap
|
||||
repo := "test.example.com/repo1"
|
||||
m = append(m, testutil.RequestResponseMapping{
|
||||
Request: testutil.Request{
|
||||
Method: "DELETE",
|
||||
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
|
||||
},
|
||||
Response: testutil.Response{
|
||||
StatusCode: http.StatusAccepted,
|
||||
Headers: http.Header(map[string][]string{
|
||||
"Content-Length": {"0"},
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
e, c := testServer(m)
|
||||
defer c()
|
||||
|
||||
ctx := context.Background()
|
||||
r, err := NewRepository(ctx, repo, e, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l := r.Blobs(ctx)
|
||||
err = l.Delete(ctx, dgst)
|
||||
if err != nil {
|
||||
t.Errorf("Error deleting blob: %s", err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestBlobFetch(t *testing.T) {
|
||||
d1, b1 := newRandomBlob(1024)
|
||||
var m testutil.RequestResponseMap
|
||||
|
@ -590,7 +623,7 @@ func TestManifestDelete(t *testing.T) {
|
|||
Route: "/v2/" + repo + "/manifests/" + dgst1.String(),
|
||||
},
|
||||
Response: testutil.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
StatusCode: http.StatusAccepted,
|
||||
Headers: http.Header(map[string][]string{
|
||||
"Content-Length": {"0"},
|
||||
}),
|
||||
|
|
|
@ -33,7 +33,7 @@ import (
|
|||
// TestCheckAPI hits the base endpoint (/v2/) ensures we return the specified
|
||||
// 200 OK response.
|
||||
func TestCheckAPI(t *testing.T) {
|
||||
env := newTestEnv(t)
|
||||
env := newTestEnv(t, false)
|
||||
|
||||
baseURL, err := env.builder.BuildBaseURL()
|
||||
if err != nil {
|
||||
|
@ -65,7 +65,7 @@ func TestCheckAPI(t *testing.T) {
|
|||
// TestCatalogAPI tests the /v2/_catalog endpoint
|
||||
func TestCatalogAPI(t *testing.T) {
|
||||
chunkLen := 2
|
||||
env := newTestEnv(t)
|
||||
env := newTestEnv(t, false)
|
||||
|
||||
values := url.Values{
|
||||
"last": []string{""},
|
||||
|
@ -239,18 +239,16 @@ func TestURLPrefix(t *testing.T) {
|
|||
"Content-Type": []string{"application/json; charset=utf-8"},
|
||||
"Content-Length": []string{"2"},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// TestBlobAPI conducts a full test of the of the blob api.
|
||||
func TestBlobAPI(t *testing.T) {
|
||||
// TODO(stevvooe): This test code is complete junk but it should cover the
|
||||
// complete flow. This must be broken down and checked against the
|
||||
// specification *before* we submit the final to docker core.
|
||||
env := newTestEnv(t)
|
||||
type blobArgs struct {
|
||||
imageName string
|
||||
layerFile io.ReadSeeker
|
||||
layerDigest digest.Digest
|
||||
tarSumStr string
|
||||
}
|
||||
|
||||
imageName := "foo/bar"
|
||||
// "build" our layer file
|
||||
func makeBlobArgs(t *testing.T) blobArgs {
|
||||
layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
|
||||
if err != nil {
|
||||
t.Fatalf("error creating random layer file: %v", err)
|
||||
|
@ -258,6 +256,66 @@ func TestBlobAPI(t *testing.T) {
|
|||
|
||||
layerDigest := digest.Digest(tarSumStr)
|
||||
|
||||
args := blobArgs{
|
||||
imageName: "foo/bar",
|
||||
layerFile: layerFile,
|
||||
layerDigest: layerDigest,
|
||||
tarSumStr: tarSumStr,
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
// TestBlobAPI conducts a full test of the of the blob api.
|
||||
func TestBlobAPI(t *testing.T) {
|
||||
deleteEnabled := false
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
args := makeBlobArgs(t)
|
||||
testBlobAPI(t, env, args)
|
||||
|
||||
deleteEnabled = true
|
||||
env = newTestEnv(t, deleteEnabled)
|
||||
args = makeBlobArgs(t)
|
||||
testBlobAPI(t, env, args)
|
||||
|
||||
}
|
||||
|
||||
func TestBlobDelete(t *testing.T) {
|
||||
deleteEnabled := true
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
|
||||
args := makeBlobArgs(t)
|
||||
env = testBlobAPI(t, env, args)
|
||||
testBlobDelete(t, env, args)
|
||||
}
|
||||
|
||||
func TestBlobDeleteDisabled(t *testing.T) {
|
||||
deleteEnabled := false
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
args := makeBlobArgs(t)
|
||||
|
||||
imageName := args.imageName
|
||||
layerDigest := args.layerDigest
|
||||
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
|
||||
if err != nil {
|
||||
t.Fatalf("error building url: %v", err)
|
||||
}
|
||||
|
||||
resp, err := httpDelete(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting when disabled: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "status of disabled delete", resp, http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
func testBlobAPI(t *testing.T, env *testEnv, args blobArgs) *testEnv {
|
||||
// TODO(stevvooe): This test code is complete junk but it should cover the
|
||||
// complete flow. This must be broken down and checked against the
|
||||
// specification *before* we submit the final to docker core.
|
||||
imageName := args.imageName
|
||||
layerFile := args.layerFile
|
||||
layerDigest := args.layerDigest
|
||||
|
||||
// -----------------------------------
|
||||
// Test fetch for non-existent content
|
||||
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
|
||||
|
@ -372,6 +430,7 @@ func TestBlobAPI(t *testing.T) {
|
|||
uploadURLBase, uploadUUID = startPushLayer(t, env.builder, imageName)
|
||||
uploadURLBase, dgst := pushChunk(t, env.builder, imageName, uploadURLBase, layerFile, layerLength)
|
||||
finishUpload(t, env.builder, imageName, uploadURLBase, dgst)
|
||||
|
||||
// ------------------------
|
||||
// Use a head request to see if the layer exists.
|
||||
resp, err = http.Head(layerURL)
|
||||
|
@ -459,12 +518,188 @@ func TestBlobAPI(t *testing.T) {
|
|||
// Missing tests:
|
||||
// - Upload the same tarsum file under and different repository and
|
||||
// ensure the content remains uncorrupted.
|
||||
return env
|
||||
}
|
||||
|
||||
func testBlobDelete(t *testing.T, env *testEnv, args blobArgs) {
|
||||
// Upload a layer
|
||||
imageName := args.imageName
|
||||
layerFile := args.layerFile
|
||||
layerDigest := args.layerDigest
|
||||
|
||||
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
// ---------------
|
||||
// Delete a layer
|
||||
resp, err := httpDelete(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting layer: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "deleting layer", resp, http.StatusAccepted)
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Content-Length": []string{"0"},
|
||||
})
|
||||
|
||||
// ---------------
|
||||
// Try and get it back
|
||||
// Use a head request to see if the layer exists.
|
||||
resp, err = http.Head(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking head on existing layer: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "checking existence of deleted layer", resp, http.StatusNotFound)
|
||||
|
||||
// Delete already deleted layer
|
||||
resp, err = httpDelete(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting layer: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "deleting layer", resp, http.StatusNotFound)
|
||||
|
||||
// ----------------
|
||||
// Attempt to delete a layer with an invalid digest
|
||||
badURL := strings.Replace(layerURL, "tarsum", "trsum", 1)
|
||||
resp, err = httpDelete(badURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error fetching layer: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "deleting layer bad digest", resp, http.StatusBadRequest)
|
||||
|
||||
// ----------------
|
||||
// Reupload previously deleted blob
|
||||
layerFile.Seek(0, os.SEEK_SET)
|
||||
|
||||
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
|
||||
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
|
||||
|
||||
layerFile.Seek(0, os.SEEK_SET)
|
||||
canonicalDigester := digest.Canonical.New()
|
||||
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
|
||||
t.Fatalf("error copying to digest: %v", err)
|
||||
}
|
||||
canonicalDigest := canonicalDigester.Digest()
|
||||
|
||||
// ------------------------
|
||||
// Use a head request to see if it exists
|
||||
resp, err = http.Head(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking head on existing layer: %v", err)
|
||||
}
|
||||
|
||||
layerLength, _ := layerFile.Seek(0, os.SEEK_END)
|
||||
checkResponse(t, "checking head on reuploaded layer", resp, http.StatusOK)
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Content-Length": []string{fmt.Sprint(layerLength)},
|
||||
"Docker-Content-Digest": []string{canonicalDigest.String()},
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteDisabled(t *testing.T) {
|
||||
env := newTestEnv(t, false)
|
||||
|
||||
imageName := "foo/bar"
|
||||
// "build" our layer file
|
||||
layerFile, tarSumStr, err := testutil.CreateRandomTarFile()
|
||||
if err != nil {
|
||||
t.Fatalf("error creating random layer file: %v", err)
|
||||
}
|
||||
|
||||
layerDigest := digest.Digest(tarSumStr)
|
||||
layerURL, err := env.builder.BuildBlobURL(imageName, layerDigest)
|
||||
if err != nil {
|
||||
t.Fatalf("Error building blob URL")
|
||||
}
|
||||
uploadURLBase, _ := startPushLayer(t, env.builder, imageName)
|
||||
pushLayer(t, env.builder, imageName, layerDigest, uploadURLBase, layerFile)
|
||||
|
||||
resp, err := httpDelete(layerURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting layer: %v", err)
|
||||
}
|
||||
|
||||
checkResponse(t, "deleting layer with delete disabled", resp, http.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
func httpDelete(url string) (*http.Response, error) {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// defer resp.Body.Close()
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type manifestArgs struct {
|
||||
imageName string
|
||||
signedManifest *manifest.SignedManifest
|
||||
dgst digest.Digest
|
||||
}
|
||||
|
||||
func makeManifestArgs(t *testing.T) manifestArgs {
|
||||
args := manifestArgs{
|
||||
imageName: "foo/bar",
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func TestManifestAPI(t *testing.T) {
|
||||
env := newTestEnv(t)
|
||||
deleteEnabled := false
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
args := makeManifestArgs(t)
|
||||
testManifestAPI(t, env, args)
|
||||
|
||||
imageName := "foo/bar"
|
||||
deleteEnabled = true
|
||||
env = newTestEnv(t, deleteEnabled)
|
||||
args = makeManifestArgs(t)
|
||||
testManifestAPI(t, env, args)
|
||||
}
|
||||
|
||||
func TestManifestDelete(t *testing.T) {
|
||||
deleteEnabled := true
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
args := makeManifestArgs(t)
|
||||
env, args = testManifestAPI(t, env, args)
|
||||
testManifestDelete(t, env, args)
|
||||
}
|
||||
|
||||
func TestManifestDeleteDisabled(t *testing.T) {
|
||||
deleteEnabled := false
|
||||
env := newTestEnv(t, deleteEnabled)
|
||||
args := makeManifestArgs(t)
|
||||
testManifestDeleteDisabled(t, env, args)
|
||||
}
|
||||
|
||||
func testManifestDeleteDisabled(t *testing.T, env *testEnv, args manifestArgs) *testEnv {
|
||||
imageName := args.imageName
|
||||
manifestURL, err := env.builder.BuildManifestURL(imageName, digest.DigestSha256EmptyTar)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting manifest url: %v", err)
|
||||
}
|
||||
|
||||
resp, err := httpDelete(manifestURL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting manifest %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
checkResponse(t, "status of disabled delete of manifest", resp, http.StatusMethodNotAllowed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func testManifestAPI(t *testing.T, env *testEnv, args manifestArgs) (*testEnv, manifestArgs) {
|
||||
imageName := args.imageName
|
||||
tag := "thetag"
|
||||
|
||||
manifestURL, err := env.builder.BuildManifestURL(imageName, tag)
|
||||
|
@ -567,6 +802,9 @@ func TestManifestAPI(t *testing.T) {
|
|||
dgst, err := digest.FromBytes(payload)
|
||||
checkErr(t, err, "digesting manifest")
|
||||
|
||||
args.signedManifest = signedManifest
|
||||
args.dgst = dgst
|
||||
|
||||
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
|
||||
checkErr(t, err, "building manifest url")
|
||||
|
||||
|
@ -687,6 +925,70 @@ func TestManifestAPI(t *testing.T) {
|
|||
if tagsResponse.Tags[0] != tag {
|
||||
t.Fatalf("tag not as expected: %q != %q", tagsResponse.Tags[0], tag)
|
||||
}
|
||||
|
||||
return env, args
|
||||
}
|
||||
|
||||
func testManifestDelete(t *testing.T, env *testEnv, args manifestArgs) {
|
||||
imageName := args.imageName
|
||||
dgst := args.dgst
|
||||
signedManifest := args.signedManifest
|
||||
manifestDigestURL, err := env.builder.BuildManifestURL(imageName, dgst.String())
|
||||
// ---------------
|
||||
// Delete by digest
|
||||
resp, err := httpDelete(manifestDigestURL)
|
||||
checkErr(t, err, "deleting manifest by digest")
|
||||
|
||||
checkResponse(t, "deleting manifest", resp, http.StatusAccepted)
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Content-Length": []string{"0"},
|
||||
})
|
||||
|
||||
// ---------------
|
||||
// Attempt to fetch deleted manifest
|
||||
resp, err = http.Get(manifestDigestURL)
|
||||
checkErr(t, err, "fetching deleted manifest by digest")
|
||||
defer resp.Body.Close()
|
||||
|
||||
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
|
||||
|
||||
// ---------------
|
||||
// Delete already deleted manifest by digest
|
||||
resp, err = httpDelete(manifestDigestURL)
|
||||
checkErr(t, err, "re-deleting manifest by digest")
|
||||
|
||||
checkResponse(t, "re-deleting manifest", resp, http.StatusNotFound)
|
||||
|
||||
// --------------------
|
||||
// Re-upload manifest by digest
|
||||
resp = putManifest(t, "putting signed manifest", manifestDigestURL, signedManifest)
|
||||
checkResponse(t, "putting signed manifest", resp, http.StatusAccepted)
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Location": []string{manifestDigestURL},
|
||||
"Docker-Content-Digest": []string{dgst.String()},
|
||||
})
|
||||
|
||||
// ---------------
|
||||
// Attempt to fetch re-uploaded deleted digest
|
||||
resp, err = http.Get(manifestDigestURL)
|
||||
checkErr(t, err, "fetching re-uploaded manifest by digest")
|
||||
defer resp.Body.Close()
|
||||
|
||||
checkResponse(t, "fetching re-uploaded manifest", resp, http.StatusOK)
|
||||
checkHeaders(t, resp, http.Header{
|
||||
"Docker-Content-Digest": []string{dgst.String()},
|
||||
})
|
||||
|
||||
// ---------------
|
||||
// Attempt to delete an unknown manifest
|
||||
unknownDigest := "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
unknownManifestDigestURL, err := env.builder.BuildManifestURL(imageName, unknownDigest)
|
||||
checkErr(t, err, "building unknown manifest url")
|
||||
|
||||
resp, err = httpDelete(unknownManifestDigestURL)
|
||||
checkErr(t, err, "delting unknown manifest by digest")
|
||||
checkResponse(t, "fetching deleted manifest", resp, http.StatusNotFound)
|
||||
|
||||
}
|
||||
|
||||
type testEnv struct {
|
||||
|
@ -698,10 +1000,11 @@ type testEnv struct {
|
|||
builder *v2.URLBuilder
|
||||
}
|
||||
|
||||
func newTestEnv(t *testing.T) *testEnv {
|
||||
func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
|
||||
config := configuration.Configuration{
|
||||
Storage: configuration.Storage{
|
||||
"inmemory": configuration.Parameters{},
|
||||
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -1005,7 +1308,7 @@ func checkHeaders(t *testing.T, resp *http.Response, headers http.Header) {
|
|||
|
||||
for _, hv := range resp.Header[k] {
|
||||
if hv != v {
|
||||
t.Fatalf("%v header value not matched in response: %q != %q", k, hv, v)
|
||||
t.Fatalf("%+v %v header value not matched in response: %q != %q", resp.Header, k, hv, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,6 +106,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
|||
app.configureRedis(&configuration)
|
||||
app.configureLogHook(&configuration)
|
||||
|
||||
deleteEnabled := false
|
||||
if d, ok := configuration.Storage["delete"]; ok {
|
||||
e, ok := d["enabled"]
|
||||
if ok {
|
||||
if deleteEnabled, ok = e.(bool); !ok {
|
||||
deleteEnabled = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// configure storage caches
|
||||
if cc, ok := configuration.Storage["cache"]; ok {
|
||||
v, ok := cc["blobdescriptor"]
|
||||
|
@ -119,10 +129,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
|||
if app.redis == nil {
|
||||
panic("redis configuration required to use for layerinfo cache")
|
||||
}
|
||||
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis))
|
||||
app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled)
|
||||
ctxu.GetLogger(app).Infof("using redis blob descriptor cache")
|
||||
case "inmemory":
|
||||
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider())
|
||||
app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled)
|
||||
ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache")
|
||||
default:
|
||||
if v != "" {
|
||||
|
@ -133,7 +143,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
|
|||
|
||||
if app.registry == nil {
|
||||
// configure the registry if no cache section is available.
|
||||
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil)
|
||||
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled)
|
||||
}
|
||||
|
||||
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
|
||||
|
|
|
@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) {
|
|||
Context: ctx,
|
||||
router: v2.Router(),
|
||||
driver: driver,
|
||||
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider()),
|
||||
registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true),
|
||||
}
|
||||
server := httptest.NewServer(app)
|
||||
router := v2.Router()
|
||||
|
|
|
@ -35,6 +35,7 @@ func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
|
|||
return handlers.MethodHandler{
|
||||
"GET": http.HandlerFunc(blobHandler.GetBlob),
|
||||
"HEAD": http.HandlerFunc(blobHandler.GetBlob),
|
||||
"DELETE": http.HandlerFunc(blobHandler.DeleteBlob),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,3 +67,27 @@ func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBlob deletes a layer blob
|
||||
func (bh *blobHandler) DeleteBlob(w http.ResponseWriter, r *http.Request) {
|
||||
context.GetLogger(bh).Debug("DeleteBlob")
|
||||
|
||||
blobs := bh.Repository.Blobs(bh)
|
||||
err := blobs.Delete(bh, bh.Digest)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case distribution.ErrBlobUnknown:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
bh.Errors = append(bh.Errors, v2.ErrorCodeBlobUnknown)
|
||||
case distribution.ErrUnsupported:
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
bh.Errors = append(bh.Errors, v2.ErrorCodeUnsupported)
|
||||
default:
|
||||
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
|
|
@ -186,16 +186,38 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
|
|||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
// DeleteImageManifest removes the image with the given tag from the registry.
|
||||
// DeleteImageManifest removes the manifest with the given digest from the registry.
|
||||
func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) {
|
||||
ctxu.GetLogger(imh).Debug("DeleteImageManifest")
|
||||
|
||||
// TODO(stevvooe): Unfortunately, at this point, manifest deletes are
|
||||
// unsupported. There are issues with schema version 1 that make removing
|
||||
// tag index entries a serious problem in eventually consistent storage.
|
||||
// Once we work out schema version 2, the full deletion system will be
|
||||
// worked out and we can add support back.
|
||||
manifests, err := imh.Repository.Manifests(imh)
|
||||
if err != nil {
|
||||
imh.Errors = append(imh.Errors, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = manifests.Delete(imh.Digest)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case digest.ErrDigestUnsupported:
|
||||
case digest.ErrDigestInvalidFormat:
|
||||
imh.Errors = append(imh.Errors, v2.ErrorCodeDigestInvalid)
|
||||
return
|
||||
case distribution.ErrBlobUnknown:
|
||||
imh.Errors = append(imh.Errors, v2.ErrorCodeManifestUnknown)
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
case distribution.ErrUnsupported:
|
||||
imh.Errors = append(imh.Errors, v2.ErrorCodeUnsupported)
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
default:
|
||||
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
// digestManifest takes a digest of the given manifest. This belongs somewhere
|
||||
|
|
|
@ -21,13 +21,11 @@ import (
|
|||
// error paths that might be seen during an upload.
|
||||
func TestSimpleBlobUpload(t *testing.T) {
|
||||
randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile()
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("error creating random reader: %v", err)
|
||||
}
|
||||
|
||||
dgst := digest.Digest(tarSumStr)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("error allocating upload store: %v", err)
|
||||
}
|
||||
|
@ -35,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider())
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
@ -139,6 +137,72 @@ func TestSimpleBlobUpload(t *testing.T) {
|
|||
if digest.NewDigest("sha256", h) != sha256Digest {
|
||||
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest)
|
||||
}
|
||||
|
||||
// Delete a blob
|
||||
err = bs.Delete(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error deleting blob")
|
||||
}
|
||||
|
||||
d, err := bs.Stat(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected non-error stating deleted blob: %s", d)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case distribution.ErrBlobUnknown:
|
||||
break
|
||||
default:
|
||||
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
|
||||
}
|
||||
|
||||
_, err = bs.Open(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected success opening deleted blob for read")
|
||||
}
|
||||
|
||||
switch err {
|
||||
case distribution.ErrBlobUnknown:
|
||||
break
|
||||
default:
|
||||
t.Errorf("Unexpected error type getting deleted manifest: %#v", err)
|
||||
}
|
||||
|
||||
// Re-upload the blob
|
||||
randomBlob, err := ioutil.ReadAll(randomDataReader)
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading all of blob %s", err.Error())
|
||||
}
|
||||
expectedDigest, err := digest.FromBytes(randomBlob)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting digest from bytes: %s", err)
|
||||
}
|
||||
simpleUpload(t, bs, randomBlob, expectedDigest)
|
||||
|
||||
d, err = bs.Stat(ctx, expectedDigest)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error stat-ing blob")
|
||||
}
|
||||
if d.Digest != expectedDigest {
|
||||
t.Errorf("Mismatching digest with restored blob")
|
||||
}
|
||||
|
||||
_, err = bs.Open(ctx, expectedDigest)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error opening blob")
|
||||
}
|
||||
|
||||
// Reuse state to test delete with a delete-disabled registry
|
||||
registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
|
||||
repository, err = registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
bs = repository.Blobs(ctx)
|
||||
err = bs.Delete(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected success deleting while disabled")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSimpleBlobRead just creates a simple blob file and ensures that basic
|
||||
|
@ -148,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider())
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
@ -252,19 +316,24 @@ func TestLayerUploadZeroLength(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider())
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
bs := repository.Blobs(ctx)
|
||||
|
||||
simpleUpload(t, bs, []byte{}, digest.DigestSha256EmptyTar)
|
||||
}
|
||||
|
||||
func simpleUpload(t *testing.T, bs distribution.BlobIngester, blob []byte, expectedDigest digest.Digest) {
|
||||
ctx := context.Background()
|
||||
wr, err := bs.Create(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error starting upload: %v", err)
|
||||
}
|
||||
|
||||
nn, err := io.Copy(wr, bytes.NewReader([]byte{}))
|
||||
nn, err := io.Copy(wr, bytes.NewReader(blob))
|
||||
if err != nil {
|
||||
t.Fatalf("error copying into blob writer: %v", err)
|
||||
}
|
||||
|
@ -273,12 +342,12 @@ func TestLayerUploadZeroLength(t *testing.T) {
|
|||
t.Fatalf("unexpected number of bytes copied: %v > 0", nn)
|
||||
}
|
||||
|
||||
dgst, err := digest.FromReader(bytes.NewReader([]byte{}))
|
||||
dgst, err := digest.FromReader(bytes.NewReader(blob))
|
||||
if err != nil {
|
||||
t.Fatalf("error getting zero digest: %v", err)
|
||||
t.Fatalf("error getting digest: %v", err)
|
||||
}
|
||||
|
||||
if dgst != digest.DigestSha256EmptyTar {
|
||||
if dgst != expectedDigest {
|
||||
// sanity check on zero digest
|
||||
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// blobStore implements a the read side of the blob store interface over a
|
||||
// blobStore implements the read side of the blob store interface over a
|
||||
// driver without enforcing per-repository membership. This object is
|
||||
// intentionally a leaky abstraction, providing utility methods that support
|
||||
// creating and traversing backend links.
|
||||
|
@ -143,7 +143,7 @@ type blobStatter struct {
|
|||
pm *pathMapper
|
||||
}
|
||||
|
||||
var _ distribution.BlobStatter = &blobStatter{}
|
||||
var _ distribution.BlobDescriptorService = &blobStatter{}
|
||||
|
||||
// Stat implements BlobStatter.Stat by returning the descriptor for the blob
|
||||
// in the main blob store. If this method returns successfully, there is
|
||||
|
@ -188,3 +188,11 @@ func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distributi
|
|||
Digest: dgst,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
||||
|
|
|
@ -70,6 +70,11 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor)
|
|||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
return canonical, nil
|
||||
}
|
||||
|
||||
|
|
27
docs/storage/cache/cachedblobdescriptorstore.go
vendored
27
docs/storage/cache/cachedblobdescriptorstore.go
vendored
|
@ -26,13 +26,13 @@ type MetricsTracker interface {
|
|||
|
||||
type cachedBlobStatter struct {
|
||||
cache distribution.BlobDescriptorService
|
||||
backend distribution.BlobStatter
|
||||
backend distribution.BlobDescriptorService
|
||||
tracker MetricsTracker
|
||||
}
|
||||
|
||||
// NewCachedBlobStatter creates a new statter which prefers a cache and
|
||||
// falls back to a backend.
|
||||
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobStatter) distribution.BlobStatter {
|
||||
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService) distribution.BlobDescriptorService {
|
||||
return &cachedBlobStatter{
|
||||
cache: cache,
|
||||
backend: backend,
|
||||
|
@ -41,7 +41,7 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist
|
|||
|
||||
// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
|
||||
// falls back to a backend. Hits and misses will send to the tracker.
|
||||
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobStatter, tracker MetricsTracker) distribution.BlobStatter {
|
||||
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter {
|
||||
return &cachedBlobStatter{
|
||||
cache: cache,
|
||||
backend: backend,
|
||||
|
@ -77,4 +77,25 @@ fallback:
|
|||
}
|
||||
|
||||
return desc, err
|
||||
|
||||
}
|
||||
|
||||
func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
err := cbds.cache.Clear(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = cbds.backend.Clear(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
|
||||
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
20
docs/storage/cache/memory/memory.go
vendored
20
docs/storage/cache/memory/memory.go
vendored
|
@ -44,6 +44,10 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) Stat(ctx context.Context, dgs
|
|||
return imbdcp.global.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (imbdcp *inMemoryBlobDescriptorCacheProvider) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
return imbdcp.global.Clear(ctx, dgst)
|
||||
}
|
||||
|
||||
func (imbdcp *inMemoryBlobDescriptorCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
_, err := imbdcp.Stat(ctx, dgst)
|
||||
if err == distribution.ErrBlobUnknown {
|
||||
|
@ -80,6 +84,14 @@ func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Co
|
|||
return rsimbdcp.repository.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
if rsimbdcp.repository == nil {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return rsimbdcp.repository.Clear(ctx, dgst)
|
||||
}
|
||||
|
||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if rsimbdcp.repository == nil {
|
||||
// allocate map since we are setting it now.
|
||||
|
@ -133,6 +145,14 @@ func (mbdc *mapBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest
|
|||
return desc, nil
|
||||
}
|
||||
|
||||
func (mbdc *mapBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
mbdc.mu.Lock()
|
||||
defer mbdc.mu.Unlock()
|
||||
|
||||
delete(mbdc.descriptors, dgst)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mbdc *mapBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
|
|
45
docs/storage/cache/redis/redis.go
vendored
45
docs/storage/cache/redis/redis.go
vendored
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
// redisBlobStatService provides an implementation of
|
||||
// BlobDescriptorCacheProvider based on redis. Blob descritors are stored in
|
||||
// BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in
|
||||
// two parts. The first provide fast access to repository membership through a
|
||||
// redis set for each repo. The second is a redis hash keyed by the digest of
|
||||
// the layer, providing path, length and mediatype information. There is also
|
||||
|
@ -63,6 +63,27 @@ func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Di
|
|||
return rbds.stat(ctx, conn, dgst)
|
||||
}
|
||||
|
||||
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// Not atomic in redis <= 2.3
|
||||
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reply == 0 {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// stat provides an internal stat call that takes a connection parameter. This
|
||||
// allows some internal management of the connection scope.
|
||||
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
|
@ -170,6 +191,28 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte
|
|||
return upstream, nil
|
||||
}
|
||||
|
||||
// Clear removes the descriptor from the cache and forwards to the upstream descriptor store
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// Check membership to repository first
|
||||
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !member {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return rsrbds.upstream.Clear(ctx, dgst)
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
|
|
37
docs/storage/cache/suite.go
vendored
37
docs/storage/cache/suite.go
vendored
|
@ -139,3 +139,40 @@ func checkBlobDescriptorCacheSetAndRead(t *testing.T, ctx context.Context, provi
|
|||
t.Fatalf("unexpected descriptor: %#v != %#v", desc, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider BlobDescriptorCacheProvider) {
|
||||
localDigest := digest.Digest("sha384:abc")
|
||||
expected := distribution.Descriptor{
|
||||
Digest: "sha256:abc",
|
||||
Size: 10,
|
||||
MediaType: "application/octet-stream"}
|
||||
|
||||
cache, err := provider.RepositoryScoped("foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting scoped cache: %v", err)
|
||||
}
|
||||
|
||||
if err := cache.SetDescriptor(ctx, localDigest, expected); err != nil {
|
||||
t.Fatalf("error setting descriptor: %v", err)
|
||||
}
|
||||
|
||||
desc, err := cache.Stat(ctx, localDigest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error statting fake2:abc: %v", err)
|
||||
}
|
||||
|
||||
if expected != desc {
|
||||
t.Fatalf("unexpected descriptor: %#v != %#v", expected, desc)
|
||||
}
|
||||
|
||||
err = cache.Clear(ctx, localDigest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting descriptor")
|
||||
}
|
||||
|
||||
nonExistantDigest := digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||
err = cache.Clear(ctx, nonExistantDigest)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error deleting unknown descriptor")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv {
|
|||
d := inmemory.New()
|
||||
c := []byte("")
|
||||
ctx := context.Background()
|
||||
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider())
|
||||
registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
|
||||
rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{})
|
||||
|
||||
repos := []string{
|
||||
|
|
|
@ -17,9 +17,10 @@ import (
|
|||
type linkedBlobStore struct {
|
||||
*blobStore
|
||||
blobServer distribution.BlobServer
|
||||
statter distribution.BlobStatter
|
||||
blobAccessController distribution.BlobDescriptorService
|
||||
repository distribution.Repository
|
||||
ctx context.Context // only to be used where context can't come through method args
|
||||
deleteEnabled bool
|
||||
|
||||
// linkPath allows one to control the repository blob link set to which
|
||||
// the blob store dispatches. This is required because manifest and layer
|
||||
|
@ -31,7 +32,7 @@ type linkedBlobStore struct {
|
|||
var _ distribution.BlobStore = &linkedBlobStore{}
|
||||
|
||||
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
return lbs.statter.Stat(ctx, dgst)
|
||||
return lbs.blobAccessController.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||
|
@ -67,6 +68,10 @@ func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter
|
|||
}
|
||||
|
||||
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
||||
dgst, err := digest.FromBytes(p)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
// Place the data in the blob store first.
|
||||
desc, err := lbs.blobStore.Put(ctx, mediaType, p)
|
||||
if err != nil {
|
||||
|
@ -74,6 +79,10 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte)
|
|||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Write out mediatype if incoming differs from what is
|
||||
// returned by Put above. Note that we should allow updates for a given
|
||||
// repository.
|
||||
|
@ -153,7 +162,26 @@ func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution
|
|||
return lbs.newBlobUpload(ctx, id, path, startedAt)
|
||||
}
|
||||
|
||||
// newLayerUpload allocates a new upload controller with the given state.
|
||||
func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
if !lbs.deleteEnabled {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
// Ensure the blob is available for deletion
|
||||
_, err := lbs.blobAccessController.Stat(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = lbs.blobAccessController.Clear(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newBlobUpload allocates a new upload controller with the given state.
|
||||
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
|
||||
fw, err := newFileWriter(ctx, lbs.driver, path)
|
||||
if err != nil {
|
||||
|
@ -213,7 +241,7 @@ type linkedBlobStatter struct {
|
|||
linkPath func(pm *pathMapper, name string, dgst digest.Digest) (string, error)
|
||||
}
|
||||
|
||||
var _ distribution.BlobStatter = &linkedBlobStatter{}
|
||||
var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
|
||||
|
||||
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
|
||||
|
@ -246,6 +274,20 @@ func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (dis
|
|||
return lbs.blobStore.statter.Stat(ctx, target)
|
||||
}
|
||||
|
||||
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
blobLinkPath, err := lbs.linkPath(lbs.pm, lbs.repository.Name(), dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return lbs.blobStore.driver.Delete(ctx, blobLinkPath)
|
||||
}
|
||||
|
||||
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
// The canonical descriptor for a blob is set at the commit phase of upload
|
||||
return nil
|
||||
}
|
||||
|
||||
// blobLinkPath provides the path to the blob link, also known as layers.
|
||||
func blobLinkPath(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
|
||||
return pm.path(layerLinkPathSpec{name: name, digest: dgst})
|
||||
|
|
|
@ -69,8 +69,8 @@ func (ms *manifestStore) Put(manifest *manifest.SignedManifest) error {
|
|||
|
||||
// Delete removes the revision of the specified manfiest.
|
||||
func (ms *manifestStore) Delete(dgst digest.Digest) error {
|
||||
context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete - unsupported")
|
||||
return fmt.Errorf("deletion of manifests not supported")
|
||||
context.GetLogger(ms.ctx).Debug("(*manifestStore).Delete")
|
||||
return ms.revisionStore.delete(ms.ctx, dgst)
|
||||
}
|
||||
|
||||
func (ms *manifestStore) Tags() ([]string, error) {
|
||||
|
|
|
@ -29,8 +29,7 @@ type manifestStoreTestEnv struct {
|
|||
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
|
||||
ctx := context.Background()
|
||||
driver := inmemory.New()
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider())
|
||||
|
||||
registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true)
|
||||
repo, err := registry.Repository(ctx, name)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
|
@ -156,6 +155,7 @@ func TestManifestStorage(t *testing.T) {
|
|||
}
|
||||
|
||||
fetchedManifest, err := ms.GetByTag(env.tag)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error fetching manifest: %v", err)
|
||||
}
|
||||
|
@ -296,11 +296,68 @@ func TestManifestStorage(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Currently, deletes are not supported due to some
|
||||
// complexity around managing tag indexes. We'll add this support back in
|
||||
// when the manifest format has settled. For now, we expect an error for
|
||||
// all deletes.
|
||||
if err := ms.Delete(dgst); err == nil {
|
||||
// Test deleting manifests
|
||||
err = ms.Delete(dgst)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected an error deleting manifest by digest: %v", err)
|
||||
}
|
||||
|
||||
exists, err = ms.Exists(dgst)
|
||||
if err != nil {
|
||||
t.Fatalf("Error querying manifest existence")
|
||||
}
|
||||
if exists {
|
||||
t.Errorf("Deleted manifest should not exist")
|
||||
}
|
||||
|
||||
deletedManifest, err := ms.Get(dgst)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected success getting deleted manifest")
|
||||
}
|
||||
switch err.(type) {
|
||||
case distribution.ErrManifestUnknownRevision:
|
||||
break
|
||||
default:
|
||||
t.Errorf("Unexpected error getting deleted manifest: %s", reflect.ValueOf(err).Type())
|
||||
}
|
||||
|
||||
if deletedManifest != nil {
|
||||
t.Errorf("Deleted manifest get returned non-nil")
|
||||
}
|
||||
|
||||
// Re-upload should restore manifest to a good state
|
||||
err = ms.Put(sm)
|
||||
if err != nil {
|
||||
t.Errorf("Error re-uploading deleted manifest")
|
||||
}
|
||||
|
||||
exists, err = ms.Exists(dgst)
|
||||
if err != nil {
|
||||
t.Fatalf("Error querying manifest existence")
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("Restored manifest should exist")
|
||||
}
|
||||
|
||||
deletedManifest, err = ms.Get(dgst)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error getting manifest")
|
||||
}
|
||||
if deletedManifest == nil {
|
||||
t.Errorf("Deleted manifest get returned non-nil")
|
||||
}
|
||||
|
||||
r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false)
|
||||
repo, err := r.Repository(ctx, env.name)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
ms, err = repo.Manifests(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ms.Delete(dgst)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected success deleting while disabled")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,15 +15,16 @@ type registry struct {
|
|||
blobServer distribution.BlobServer
|
||||
statter distribution.BlobStatter // global statter service.
|
||||
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
|
||||
deleteEnabled bool
|
||||
}
|
||||
|
||||
// NewRegistryWithDriver creates a new registry instance from the provided
|
||||
// driver. The resulting registry may be shared by multiple goroutines but is
|
||||
// cheap to allocate.
|
||||
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider) distribution.Namespace {
|
||||
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool) distribution.Namespace {
|
||||
|
||||
// create global statter, with cache.
|
||||
var statter distribution.BlobStatter = &blobStatter{
|
||||
var statter distribution.BlobDescriptorService = &blobStatter{
|
||||
driver: driver,
|
||||
pm: defaultPathMapper,
|
||||
}
|
||||
|
@ -46,6 +47,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
|
|||
pathFn: bs.path,
|
||||
},
|
||||
blobDescriptorCacheProvider: blobDescriptorCacheProvider,
|
||||
deleteEnabled: deleteEnabled,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,7 +112,8 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
|
|||
ctx: ctx,
|
||||
blobStore: repo.blobStore,
|
||||
repository: repo,
|
||||
statter: &linkedBlobStatter{
|
||||
deleteEnabled: repo.registry.deleteEnabled,
|
||||
blobAccessController: &linkedBlobStatter{
|
||||
blobStore: repo.blobStore,
|
||||
repository: repo,
|
||||
linkPath: manifestRevisionLinkPath,
|
||||
|
@ -143,7 +146,7 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M
|
|||
// may be context sensitive in the future. The instance should be used similar
|
||||
// to a request local.
|
||||
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
|
||||
var statter distribution.BlobStatter = &linkedBlobStatter{
|
||||
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
|
||||
blobStore: repo.blobStore,
|
||||
repository: repo,
|
||||
linkPath: blobLinkPath,
|
||||
|
@ -156,13 +159,14 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
|
|||
return &linkedBlobStore{
|
||||
blobStore: repo.blobStore,
|
||||
blobServer: repo.blobServer,
|
||||
statter: statter,
|
||||
blobAccessController: statter,
|
||||
repository: repo,
|
||||
ctx: ctx,
|
||||
|
||||
// TODO(stevvooe): linkPath limits this blob store to only layers.
|
||||
// This instance cannot be used for manifest checks.
|
||||
linkPath: blobLinkPath,
|
||||
deleteEnabled: repo.registry.deleteEnabled,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,19 +17,6 @@ type revisionStore struct {
|
|||
ctx context.Context
|
||||
}
|
||||
|
||||
func newRevisionStore(ctx context.Context, repo *repository, blobStore *blobStore) *revisionStore {
|
||||
return &revisionStore{
|
||||
ctx: ctx,
|
||||
repository: repo,
|
||||
blobStore: &linkedBlobStore{
|
||||
blobStore: blobStore,
|
||||
repository: repo,
|
||||
ctx: ctx,
|
||||
linkPath: manifestRevisionLinkPath,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// get retrieves the manifest, keyed by revision digest.
|
||||
func (rs *revisionStore) get(ctx context.Context, revision digest.Digest) (*manifest.SignedManifest, error) {
|
||||
// Ensure that this revision is available in this repository.
|
||||
|
@ -118,3 +105,7 @@ func (rs *revisionStore) put(ctx context.Context, sm *manifest.SignedManifest) (
|
|||
|
||||
return revision, nil
|
||||
}
|
||||
|
||||
func (rs *revisionStore) delete(ctx context.Context, revision digest.Digest) error {
|
||||
return rs.blobStore.Delete(ctx, revision)
|
||||
}
|
||||
|
|
|
@ -115,8 +115,8 @@ func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// namedBlobStore returns the namedBlobStore of the signatures for the
|
||||
// manifest with the given digest. Effectively, each singature link path
|
||||
// linkedBlobStore returns the namedBlobStore of the signatures for the
|
||||
// manifest with the given digest. Effectively, each signature link path
|
||||
// layout is a unique linked blob store.
|
||||
func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Digest) *linkedBlobStore {
|
||||
linkpath := func(pm *pathMapper, name string, dgst digest.Digest) (string, error) {
|
||||
|
@ -131,7 +131,7 @@ func (s *signatureStore) linkedBlobStore(ctx context.Context, revision digest.Di
|
|||
ctx: ctx,
|
||||
repository: s.repository,
|
||||
blobStore: s.blobStore,
|
||||
statter: &linkedBlobStatter{
|
||||
blobAccessController: &linkedBlobStatter{
|
||||
blobStore: s.blobStore,
|
||||
repository: s.repository,
|
||||
linkPath: linkpath,
|
||||
|
|
Loading…
Reference in a new issue