Merge pull request #1404 from RichardScothern/delete-events
Send manifest and blob delete events to the notifications subsystem.
This commit is contained in:
commit
3f97c258a6
5 changed files with 109 additions and 25 deletions
|
@ -104,6 +104,17 @@ manifest:
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The target struct of events which are sent when manifests and blobs are deleted
|
||||||
|
will contain a subset of the data contained in Get and Put events. Specifically,
|
||||||
|
only the digest and repository will be sent.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"target": {
|
||||||
|
"digest": "sha256:d89e1bee20d9cb344674e213b581f14fbd8e70274ecf9d10c514bab78a307845",
|
||||||
|
"repository": "library/test"
|
||||||
|
},
|
||||||
|
```
|
||||||
|
|
||||||
> __NOTE:__ As of version 2.1, the `length` field for event targets
|
> __NOTE:__ As of version 2.1, the `length` field for event targets
|
||||||
> is being deprecated for the `size` field, bringing the target in line with
|
> is being deprecated for the `size` field, bringing the target in line with
|
||||||
> common nomenclature. Both will continue to be set for the foreseeable
|
> common nomenclature. Both will continue to be set for the foreseeable
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/uuid"
|
"github.com/docker/distribution/uuid"
|
||||||
)
|
)
|
||||||
|
@ -60,8 +61,8 @@ func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest)
|
||||||
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
|
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridge) ManifestDeleted(repo reference.Named, sm distribution.Manifest) error {
|
func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {
|
||||||
return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
|
return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
|
func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
|
||||||
|
@ -81,8 +82,8 @@ func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor,
|
||||||
return b.sink.Write(*event)
|
return b.sink.Write(*event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridge) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error {
|
func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
|
||||||
return b.createBlobEventAndWrite(EventActionDelete, repo, desc)
|
return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
|
func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
|
||||||
|
@ -94,6 +95,14 @@ func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named
|
||||||
return b.sink.Write(*manifestEvent)
|
return b.sink.Write(*manifestEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
|
||||||
|
event := b.createEvent(action)
|
||||||
|
event.Target.Repository = repo.Name()
|
||||||
|
event.Target.Digest = dgst
|
||||||
|
|
||||||
|
return b.sink.Write(*event)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
|
func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
|
||||||
event := b.createEvent(action)
|
event := b.createEvent(action)
|
||||||
event.Target.Repository = repo.Name()
|
event.Target.Repository = repo.Name()
|
||||||
|
@ -127,6 +136,14 @@ func (b *bridge) createManifestEvent(action string, repo reference.Named, sm dis
|
||||||
return event, nil
|
return event, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
|
||||||
|
event := b.createEvent(action)
|
||||||
|
event.Target.Digest = dgst
|
||||||
|
event.Target.Repository = repo.Name()
|
||||||
|
|
||||||
|
return b.sink.Write(*event)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
|
func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
|
||||||
event, err := b.createBlobEvent(action, repo, desc)
|
event, err := b.createBlobEvent(action, repo, desc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -63,13 +63,12 @@ func TestEventBridgeManifestPushed(t *testing.T) {
|
||||||
|
|
||||||
func TestEventBridgeManifestDeleted(t *testing.T) {
|
func TestEventBridgeManifestDeleted(t *testing.T) {
|
||||||
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
|
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
|
||||||
checkCommonManifest(t, EventActionDelete, events...)
|
checkDeleted(t, EventActionDelete, events...)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}))
|
}))
|
||||||
|
|
||||||
repoRef, _ := reference.ParseNamed(repo)
|
repoRef, _ := reference.ParseNamed(repo)
|
||||||
if err := l.ManifestDeleted(repoRef, sm); err != nil {
|
if err := l.ManifestDeleted(repoRef, dgst); err != nil {
|
||||||
t.Fatalf("unexpected error notifying manifest pull: %v", err)
|
t.Fatalf("unexpected error notifying manifest pull: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,6 +90,35 @@ func createTestEnv(t *testing.T, fn testSinkFn) Listener {
|
||||||
return NewBridge(ub, source, actor, request, fn)
|
return NewBridge(ub, source, actor, request, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkDeleted(t *testing.T, action string, events ...Event) {
|
||||||
|
if len(events) != 1 {
|
||||||
|
t.Fatalf("unexpected number of events: %v != 1", len(events))
|
||||||
|
}
|
||||||
|
|
||||||
|
event := events[0]
|
||||||
|
|
||||||
|
if event.Source != source {
|
||||||
|
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Request != request {
|
||||||
|
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Actor != actor {
|
||||||
|
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Target.Digest != dgst {
|
||||||
|
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Target.Repository != repo {
|
||||||
|
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func checkCommonManifest(t *testing.T, action string, events ...Event) {
|
func checkCommonManifest(t *testing.T, action string, events ...Event) {
|
||||||
checkCommon(t, events...)
|
checkCommon(t, events...)
|
||||||
|
|
||||||
|
|
|
@ -14,11 +14,7 @@ import (
|
||||||
type ManifestListener interface {
|
type ManifestListener interface {
|
||||||
ManifestPushed(repo reference.Named, sm distribution.Manifest) error
|
ManifestPushed(repo reference.Named, sm distribution.Manifest) error
|
||||||
ManifestPulled(repo reference.Named, sm distribution.Manifest) error
|
ManifestPulled(repo reference.Named, sm distribution.Manifest) error
|
||||||
|
ManifestDeleted(repo reference.Named, dgst digest.Digest) error
|
||||||
// TODO(stevvooe): Please note that delete support is still a little shaky
|
|
||||||
// and we'll need to propagate these in the future.
|
|
||||||
|
|
||||||
ManifestDeleted(repo reference.Named, sm distribution.Manifest) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlobListener describes a listener that can respond to layer related events.
|
// BlobListener describes a listener that can respond to layer related events.
|
||||||
|
@ -26,11 +22,7 @@ type BlobListener interface {
|
||||||
BlobPushed(repo reference.Named, desc distribution.Descriptor) error
|
BlobPushed(repo reference.Named, desc distribution.Descriptor) error
|
||||||
BlobPulled(repo reference.Named, desc distribution.Descriptor) error
|
BlobPulled(repo reference.Named, desc distribution.Descriptor) error
|
||||||
BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
|
BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
|
||||||
|
BlobDeleted(repo reference.Named, desc digest.Digest) error
|
||||||
// TODO(stevvooe): Please note that delete support is still a little shaky
|
|
||||||
// and we'll need to propagate these in the future.
|
|
||||||
|
|
||||||
BlobDeleted(repo reference.Named, desc distribution.Descriptor) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listener combines all repository events into a single interface.
|
// Listener combines all repository events into a single interface.
|
||||||
|
@ -75,6 +67,17 @@ type manifestServiceListener struct {
|
||||||
parent *repositoryListener
|
parent *repositoryListener
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
|
err := msl.ManifestService.Delete(ctx, dgst)
|
||||||
|
if err == nil {
|
||||||
|
if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
|
||||||
|
logrus.Errorf("error dispatching manifest delete to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
||||||
sm, err := msl.ManifestService.Get(ctx, dgst)
|
sm, err := msl.ManifestService.Get(ctx, dgst)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -173,6 +176,17 @@ func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribut
|
||||||
return bsl.decorateWriter(wr), err
|
return bsl.decorateWriter(wr), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
|
err := bsl.BlobStore.Delete(ctx, dgst)
|
||||||
|
if err == nil {
|
||||||
|
if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
|
||||||
|
context.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||||
wr, err := bsl.BlobStore.Resume(ctx, id)
|
wr, err := bsl.BlobStore.Resume(ctx, id)
|
||||||
return bsl.decorateWriter(wr), err
|
return bsl.decorateWriter(wr), err
|
||||||
|
|
|
@ -39,12 +39,12 @@ func TestListener(t *testing.T) {
|
||||||
checkExerciseRepository(t, repository)
|
checkExerciseRepository(t, repository)
|
||||||
|
|
||||||
expectedOps := map[string]int{
|
expectedOps := map[string]int{
|
||||||
"manifest:push": 1,
|
"manifest:push": 1,
|
||||||
"manifest:pull": 1,
|
"manifest:pull": 1,
|
||||||
// "manifest:delete": 0, // deletes not supported for now
|
"manifest:delete": 1,
|
||||||
"layer:push": 2,
|
"layer:push": 2,
|
||||||
"layer:pull": 2,
|
"layer:pull": 2,
|
||||||
// "layer:delete": 0, // deletes not supported for now
|
"layer:delete": 2, // deletes not supported for now
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(tl.ops, expectedOps) {
|
if !reflect.DeepEqual(tl.ops, expectedOps) {
|
||||||
|
@ -68,7 +68,7 @@ func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Mani
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tl *testListener) ManifestDeleted(repo reference.Named, m distribution.Manifest) error {
|
func (tl *testListener) ManifestDeleted(repo reference.Named, d digest.Digest) error {
|
||||||
tl.ops["manifest:delete"]++
|
tl.ops["manifest:delete"]++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Desc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tl *testListener) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error {
|
func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error {
|
||||||
tl.ops["layer:delete"]++
|
tl.ops["layer:delete"]++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -113,6 +113,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
|
||||||
Tag: tag,
|
Tag: tag,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var blobDigests []digest.Digest
|
||||||
blobs := repository.Blobs(ctx)
|
blobs := repository.Blobs(ctx)
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
rs, ds, err := testutil.CreateRandomTarFile()
|
rs, ds, err := testutil.CreateRandomTarFile()
|
||||||
|
@ -120,6 +121,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
|
||||||
t.Fatalf("error creating test layer: %v", err)
|
t.Fatalf("error creating test layer: %v", err)
|
||||||
}
|
}
|
||||||
dgst := digest.Digest(ds)
|
dgst := digest.Digest(ds)
|
||||||
|
blobDigests = append(blobDigests, dgst)
|
||||||
|
|
||||||
wr, err := blobs.Create(ctx)
|
wr, err := blobs.Create(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -183,4 +185,16 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
|
||||||
t.Fatalf("unexpected error fetching manifest: %v", err)
|
t.Fatalf("unexpected error fetching manifest: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = manifests.Delete(ctx, dgst)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error deleting blob: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, d := range blobDigests {
|
||||||
|
err = blobs.Delete(ctx, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error deleting blob: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue