Notification should send digest URL in event target
Previously, the most accurate reference for a manifest was the tag url. After adding pull by digest, all event notifications should refer directly to the digest url. This ensures that event uniquely identifies the target of the notification. Testing has been added for manifest pull events to check that this doesn't change. In addition, the listener interface has been refactored to only use the repository name, rather than the full repository object. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
		
							parent
							
								
									1f015478a0
								
							
						
					
					
						commit
						a2d4f51aa4
					
				
					 4 changed files with 200 additions and 37 deletions
				
			
		|  | @ -53,31 +53,31 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (b *bridge) ManifestPushed(repo string, sm *manifest.SignedManifest) error { | ||||
| 	return b.createManifestEventAndWrite(EventActionPush, repo, sm) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (b *bridge) ManifestPulled(repo string, sm *manifest.SignedManifest) error { | ||||
| 	return b.createManifestEventAndWrite(EventActionPull, repo, sm) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (b *bridge) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { | ||||
| 	return b.createManifestEventAndWrite(EventActionDelete, repo, sm) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (b *bridge) BlobPushed(repo string, desc distribution.Descriptor) error { | ||||
| 	return b.createBlobEventAndWrite(EventActionPush, repo, desc) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error { | ||||
| 	return b.createBlobEventAndWrite(EventActionPull, repo, desc) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error { | ||||
| 	return b.createBlobEventAndWrite(EventActionDelete, repo, desc) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (b *bridge) createManifestEventAndWrite(action string, repo string, sm *manifest.SignedManifest) error { | ||||
| 	manifestEvent, err := b.createManifestEvent(action, repo, sm) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|  | @ -86,10 +86,10 @@ func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Re | |||
| 	return b.sink.Write(*manifestEvent) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) { | ||||
| func (b *bridge) createManifestEvent(action string, repo string, sm *manifest.SignedManifest) (*Event, error) { | ||||
| 	event := b.createEvent(action) | ||||
| 	event.Target.MediaType = manifest.ManifestMediaType | ||||
| 	event.Target.Repository = repo.Name() | ||||
| 	event.Target.Repository = repo | ||||
| 
 | ||||
| 	p, err := sm.Payload() | ||||
| 	if err != nil { | ||||
|  | @ -97,15 +97,12 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository | |||
| 	} | ||||
| 
 | ||||
| 	event.Target.Length = int64(len(p)) | ||||
| 
 | ||||
| 	event.Target.Digest, err = digest.FromBytes(p) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is | ||||
| 	// implemented, this should be replaced. | ||||
| 	event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) | ||||
| 	event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, event.Target.Digest.String()) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | @ -113,7 +110,7 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository | |||
| 	return event, nil | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (b *bridge) createBlobEventAndWrite(action string, repo string, desc distribution.Descriptor) error { | ||||
| 	event, err := b.createBlobEvent(action, repo, desc) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|  | @ -122,13 +119,13 @@ func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Reposi | |||
| 	return b.sink.Write(*event) | ||||
| } | ||||
| 
 | ||||
| func (b *bridge) createBlobEvent(action string, repo distribution.Repository, desc distribution.Descriptor) (*Event, error) { | ||||
| func (b *bridge) createBlobEvent(action string, repo string, desc distribution.Descriptor) (*Event, error) { | ||||
| 	event := b.createEvent(action) | ||||
| 	event.Target.Descriptor = desc | ||||
| 	event.Target.Repository = repo.Name() | ||||
| 	event.Target.Repository = repo | ||||
| 
 | ||||
| 	var err error | ||||
| 	event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), desc.Digest) | ||||
| 	event.Target.URL, err = b.ub.BuildBlobURL(repo, desc.Digest) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  |  | |||
							
								
								
									
										166
									
								
								notifications/bridge_test.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										166
									
								
								notifications/bridge_test.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,166 @@ | |||
| package notifications | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/digest" | ||||
| 
 | ||||
| 	"github.com/docker/libtrust" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/manifest" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/registry/api/v2" | ||||
| 	"github.com/docker/distribution/uuid" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	// common environment for expected manifest events. | ||||
| 
 | ||||
| 	repo   = "test/repo" | ||||
| 	source = SourceRecord{ | ||||
| 		Addr:       "remote.test", | ||||
| 		InstanceID: uuid.Generate().String(), | ||||
| 	} | ||||
| 	ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/")) | ||||
| 
 | ||||
| 	actor = ActorRecord{ | ||||
| 		Name: "test", | ||||
| 	} | ||||
| 	request = RequestRecord{} | ||||
| 	m       = manifest.Manifest{ | ||||
| 		Name: repo, | ||||
| 		Tag:  "latest", | ||||
| 	} | ||||
| 
 | ||||
| 	sm      *manifest.SignedManifest | ||||
| 	payload []byte | ||||
| 	dgst    digest.Digest | ||||
| ) | ||||
| 
 | ||||
| func TestEventBridgeManifestPulled(t *testing.T) { | ||||
| 
 | ||||
| 	l := createTestEnv(t, testSinkFn(func(events ...Event) error { | ||||
| 		checkCommonManifest(t, EventActionPull, events...) | ||||
| 
 | ||||
| 		return nil | ||||
| 	})) | ||||
| 
 | ||||
| 	if err := l.ManifestPulled(repo, sm); err != nil { | ||||
| 		t.Fatalf("unexpected error notifying manifest pull: %v", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestEventBridgeManifestPushed(t *testing.T) { | ||||
| 	l := createTestEnv(t, testSinkFn(func(events ...Event) error { | ||||
| 		checkCommonManifest(t, EventActionPush, events...) | ||||
| 
 | ||||
| 		return nil | ||||
| 	})) | ||||
| 
 | ||||
| 	if err := l.ManifestPushed(repo, sm); err != nil { | ||||
| 		t.Fatalf("unexpected error notifying manifest pull: %v", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestEventBridgeManifestDeleted(t *testing.T) { | ||||
| 	l := createTestEnv(t, testSinkFn(func(events ...Event) error { | ||||
| 		checkCommonManifest(t, EventActionDelete, events...) | ||||
| 
 | ||||
| 		return nil | ||||
| 	})) | ||||
| 
 | ||||
| 	if err := l.ManifestDeleted(repo, sm); err != nil { | ||||
| 		t.Fatalf("unexpected error notifying manifest pull: %v", err) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func createTestEnv(t *testing.T, fn testSinkFn) Listener { | ||||
| 	pk, err := libtrust.GenerateECP256PrivateKey() | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error generating private key: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	sm, err = manifest.Sign(&m, pk) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error signing manifest: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	payload, err = sm.Payload() | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error getting manifest payload: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	dgst, err = digest.FromBytes(payload) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error digesting manifest payload: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return NewBridge(ub, source, actor, request, fn) | ||||
| } | ||||
| 
 | ||||
| func checkCommonManifest(t *testing.T, action string, events ...Event) { | ||||
| 	checkCommon(t, events...) | ||||
| 
 | ||||
| 	event := events[0] | ||||
| 	if event.Action != action { | ||||
| 		t.Fatalf("unexpected event action: %q != %q", event.Action, action) | ||||
| 	} | ||||
| 
 | ||||
| 	u, err := ub.BuildManifestURL(repo, dgst.String()) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error building expected url: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Target.URL != u { | ||||
| 		t.Fatalf("incorrect url passed: %q != %q", event.Target.URL, u) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func checkCommon(t *testing.T, events ...Event) { | ||||
| 	if len(events) != 1 { | ||||
| 		t.Fatalf("unexpected number of events: %v != 1", len(events)) | ||||
| 	} | ||||
| 
 | ||||
| 	event := events[0] | ||||
| 
 | ||||
| 	if event.Source != source { | ||||
| 		t.Fatalf("source not equal: %#v != %#v", event.Source, source) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Request != request { | ||||
| 		t.Fatalf("request not equal: %#v != %#v", event.Request, request) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Actor != actor { | ||||
| 		t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Target.Digest != dgst { | ||||
| 		t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Target.Length != int64(len(payload)) { | ||||
| 		t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload)) | ||||
| 	} | ||||
| 
 | ||||
| 	if event.Target.Repository != repo { | ||||
| 		t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| type testSinkFn func(events ...Event) error | ||||
| 
 | ||||
| func (tsf testSinkFn) Write(events ...Event) error { | ||||
| 	return tsf(events...) | ||||
| } | ||||
| 
 | ||||
| func (tsf testSinkFn) Close() error { return nil } | ||||
| 
 | ||||
| func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder { | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	return ub | ||||
| } | ||||
|  | @ -12,24 +12,24 @@ import ( | |||
| 
 | ||||
| // ManifestListener describes a set of methods for listening to events related to manifests. | ||||
| type ManifestListener interface { | ||||
| 	ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error | ||||
| 	ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error | ||||
| 	ManifestPushed(repo string, sm *manifest.SignedManifest) error | ||||
| 	ManifestPulled(repo string, sm *manifest.SignedManifest) error | ||||
| 
 | ||||
| 	// TODO(stevvooe): Please note that delete support is still a little shaky | ||||
| 	// and we'll need to propagate these in the future. | ||||
| 
 | ||||
| 	ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error | ||||
| 	ManifestDeleted(repo string, sm *manifest.SignedManifest) error | ||||
| } | ||||
| 
 | ||||
| // BlobListener describes a listener that can respond to layer related events. | ||||
| type BlobListener interface { | ||||
| 	BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error | ||||
| 	BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error | ||||
| 	BlobPushed(repo string, desc distribution.Descriptor) error | ||||
| 	BlobPulled(repo string, desc distribution.Descriptor) error | ||||
| 
 | ||||
| 	// TODO(stevvooe): Please note that delete support is still a little shaky | ||||
| 	// and we'll need to propagate these in the future. | ||||
| 
 | ||||
| 	BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error | ||||
| 	BlobDeleted(repo string, desc distribution.Descriptor) error | ||||
| } | ||||
| 
 | ||||
| // Listener combines all repository events into a single interface. | ||||
|  | @ -73,7 +73,7 @@ type manifestServiceListener struct { | |||
| func (msl *manifestServiceListener) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { | ||||
| 	sm, err := msl.ManifestService.Get(dgst) | ||||
| 	if err == nil { | ||||
| 		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { | ||||
| 		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { | ||||
| 			logrus.Errorf("error dispatching manifest pull to listener: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | @ -85,7 +85,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { | |||
| 	err := msl.ManifestService.Put(sm) | ||||
| 
 | ||||
| 	if err == nil { | ||||
| 		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { | ||||
| 		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Name(), sm); err != nil { | ||||
| 			logrus.Errorf("error dispatching manifest push to listener: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | @ -96,7 +96,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { | |||
| func (msl *manifestServiceListener) GetByTag(tag string) (*manifest.SignedManifest, error) { | ||||
| 	sm, err := msl.ManifestService.GetByTag(tag) | ||||
| 	if err == nil { | ||||
| 		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { | ||||
| 		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { | ||||
| 			logrus.Errorf("error dispatching manifest pull to listener: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | @ -117,7 +117,7 @@ func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([] | |||
| 		if desc, err := bsl.Stat(ctx, dgst); err != nil { | ||||
| 			context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) | ||||
| 		} else { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { | ||||
| 				context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) | ||||
| 			} | ||||
| 		} | ||||
|  | @ -132,7 +132,7 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (d | |||
| 		if desc, err := bsl.Stat(ctx, dgst); err != nil { | ||||
| 			context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) | ||||
| 		} else { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { | ||||
| 				context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) | ||||
| 			} | ||||
| 		} | ||||
|  | @ -147,7 +147,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr | |||
| 		if desc, err := bsl.Stat(ctx, dgst); err != nil { | ||||
| 			context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) | ||||
| 		} else { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { | ||||
| 			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { | ||||
| 				context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) | ||||
| 			} | ||||
| 		} | ||||
|  | @ -159,7 +159,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr | |||
| func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { | ||||
| 	desc, err := bsl.BlobStore.Put(ctx, mediaType, p) | ||||
| 	if err == nil { | ||||
| 		if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository, desc); err != nil { | ||||
| 		if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Name(), desc); err != nil { | ||||
| 			context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | @ -192,7 +192,7 @@ type blobWriterListener struct { | |||
| func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { | ||||
| 	committed, err := bwl.BlobWriter.Commit(ctx, desc) | ||||
| 	if err == nil { | ||||
| 		if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository, committed); err != nil { | ||||
| 		if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Name(), committed); err != nil { | ||||
| 			context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -51,33 +51,33 @@ type testListener struct { | |||
| 	ops map[string]int | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (tl *testListener) ManifestPushed(repo string, sm *manifest.SignedManifest) error { | ||||
| 	tl.ops["manifest:push"]++ | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (tl *testListener) ManifestPulled(repo string, sm *manifest.SignedManifest) error { | ||||
| 	tl.ops["manifest:pull"]++ | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { | ||||
| func (tl *testListener) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { | ||||
| 	tl.ops["manifest:delete"]++ | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (tl *testListener) BlobPushed(repo string, desc distribution.Descriptor) error { | ||||
| 	tl.ops["layer:push"]++ | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) error { | ||||
| 	tl.ops["layer:pull"]++ | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tl *testListener) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { | ||||
| func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error { | ||||
| 	tl.ops["layer:delete"]++ | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue