Merge pull request #2648 from manishtomar/tag-deleted-event
add repo and tag deletion event
This commit is contained in:
commit
003aa051b4
8 changed files with 154 additions and 18 deletions
|
@ -108,6 +108,21 @@ func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
|
|||
return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
|
||||
}
|
||||
|
||||
func (b *bridge) TagDeleted(repo reference.Named, tag string) error {
|
||||
event := b.createEvent(EventActionDelete)
|
||||
event.Target.Repository = repo.Name()
|
||||
event.Target.Tag = tag
|
||||
|
||||
return b.sink.Write(*event)
|
||||
}
|
||||
|
||||
func (b *bridge) RepoDeleted(repo reference.Named) error {
|
||||
event := b.createEvent(EventActionDelete)
|
||||
event.Target.Repository = repo.Name()
|
||||
|
||||
return b.sink.Write(*event)
|
||||
}
|
||||
|
||||
func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
|
||||
manifestEvent, err := b.createManifestEvent(action, repo, sm)
|
||||
if err != nil {
|
||||
|
|
|
@ -97,6 +97,9 @@ func TestEventBridgeManifestPulledWithTag(t *testing.T) {
|
|||
func TestEventBridgeManifestDeleted(t *testing.T) {
|
||||
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
|
||||
checkDeleted(t, EventActionDelete, events...)
|
||||
if events[0].Target.Digest != dgst {
|
||||
t.Fatalf("unexpected digest on event target: %q != %q", events[0].Target.Digest, dgst)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
||||
|
@ -106,6 +109,33 @@ func TestEventBridgeManifestDeleted(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEventBridgeTagDeleted(t *testing.T) {
|
||||
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
|
||||
checkDeleted(t, EventActionDelete, events...)
|
||||
if events[0].Target.Tag != m.Tag {
|
||||
t.Fatalf("unexpected tag on event target: %q != %q", events[0].Target.Tag, m.Tag)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
||||
repoRef, _ := reference.WithName(repo)
|
||||
if err := l.TagDeleted(repoRef, m.Tag); err != nil {
|
||||
t.Fatalf("unexpected error notifying tag deletion: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBridgeRepoDeleted(t *testing.T) {
|
||||
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
|
||||
checkDeleted(t, EventActionDelete, events...)
|
||||
return nil
|
||||
}))
|
||||
|
||||
repoRef, _ := reference.WithName(repo)
|
||||
if err := l.RepoDeleted(repoRef); err != nil {
|
||||
t.Fatalf("unexpected error notifying repo deletion: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createTestEnv(t *testing.T, fn testSinkFn) Listener {
|
||||
pk, err := libtrust.GenerateECP256PrivateKey()
|
||||
if err != nil {
|
||||
|
@ -142,14 +172,9 @@ func checkDeleted(t *testing.T, action string, events ...Event) {
|
|||
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
|
||||
}
|
||||
|
||||
if event.Target.Digest != dgst {
|
||||
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
|
||||
}
|
||||
|
||||
if event.Target.Repository != repo {
|
||||
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func checkCommonManifest(t *testing.T, action string, events ...Event) {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
|
||||
dcontext "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/opencontainers/go-digest"
|
||||
|
@ -25,10 +26,17 @@ type BlobListener interface {
|
|||
BlobDeleted(repo reference.Named, desc digest.Digest) error
|
||||
}
|
||||
|
||||
// RepoListener provides repository methods that respond to repository lifecycle
|
||||
type RepoListener interface {
|
||||
TagDeleted(repo reference.Named, tag string) error
|
||||
RepoDeleted(repo reference.Named) error
|
||||
}
|
||||
|
||||
// Listener combines all repository events into a single interface.
|
||||
type Listener interface {
|
||||
ManifestListener
|
||||
BlobListener
|
||||
RepoListener
|
||||
}
|
||||
|
||||
type repositoryListener struct {
|
||||
|
@ -36,12 +44,28 @@ type repositoryListener struct {
|
|||
listener Listener
|
||||
}
|
||||
|
||||
type removerListener struct {
|
||||
distribution.RepositoryRemover
|
||||
listener Listener
|
||||
}
|
||||
|
||||
// Listen dispatches events on the repository to the listener.
|
||||
func Listen(repo distribution.Repository, listener Listener) distribution.Repository {
|
||||
func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) {
|
||||
return &repositoryListener{
|
||||
Repository: repo,
|
||||
listener: listener,
|
||||
Repository: repo,
|
||||
listener: listener,
|
||||
}, &removerListener{
|
||||
RepositoryRemover: remover,
|
||||
listener: listener,
|
||||
}
|
||||
}
|
||||
|
||||
func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error {
|
||||
err := nl.RepositoryRemover.Remove(ctx, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nl.listener.RepoDeleted(name)
|
||||
}
|
||||
|
||||
func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
|
||||
|
@ -214,3 +238,26 @@ func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Des
|
|||
|
||||
return committed, err
|
||||
}
|
||||
|
||||
type tagServiceListener struct {
|
||||
distribution.TagService
|
||||
parent *repositoryListener
|
||||
}
|
||||
|
||||
func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService {
|
||||
return &tagServiceListener{
|
||||
TagService: rl.Repository.Tags(ctx),
|
||||
parent: rl,
|
||||
}
|
||||
}
|
||||
|
||||
func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error {
|
||||
if err := tagSL.TagService.Untag(ctx, tag); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -38,10 +38,15 @@ func TestListener(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
repository = Listen(repository, tl)
|
||||
|
||||
remover, ok := registry.(distribution.RepositoryRemover)
|
||||
if !ok {
|
||||
t.Fatal("registry does not implement RepositoryRemover")
|
||||
}
|
||||
repository, remover = Listen(repository, remover, tl)
|
||||
|
||||
// Now take the registry through a number of operations
|
||||
checkExerciseRepository(t, repository)
|
||||
checkExerciseRepository(t, repository, remover)
|
||||
|
||||
expectedOps := map[string]int{
|
||||
"manifest:push": 1,
|
||||
|
@ -50,12 +55,13 @@ func TestListener(t *testing.T) {
|
|||
"layer:push": 2,
|
||||
"layer:pull": 2,
|
||||
"layer:delete": 2,
|
||||
"tag:delete": 1,
|
||||
"repo:delete": 1,
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tl.ops, expectedOps) {
|
||||
t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type testListener struct {
|
||||
|
@ -64,7 +70,6 @@ type testListener struct {
|
|||
|
||||
func (tl *testListener) ManifestPushed(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
|
||||
tl.ops["manifest:push"]++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -98,9 +103,19 @@ func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (tl *testListener) TagDeleted(repo reference.Named, tag string) error {
|
||||
tl.ops["tag:delete"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tl *testListener) RepoDeleted(repo reference.Named) error {
|
||||
tl.ops["repo:delete"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkExerciseRegistry takes the registry through all of its operations,
|
||||
// carrying out generic checks.
|
||||
func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
|
||||
func checkExerciseRepository(t *testing.T, repository distribution.Repository, remover distribution.RepositoryRemover) {
|
||||
// TODO(stevvooe): This would be a nice testutil function. Basically, it
|
||||
// takes the registry through a common set of operations. This could be
|
||||
// used to make cross-cutting updates by changing internals that affect
|
||||
|
@ -200,6 +215,15 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting blob: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = repository.Tags(ctx).Untag(ctx, m.Tag)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting tag: %v", err)
|
||||
}
|
||||
|
||||
err = remover.Remove(ctx, repository.Named())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error deleting repo: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,11 @@ type RepositoryEnumerator interface {
|
|||
Enumerate(ctx context.Context, ingester func(string) error) error
|
||||
}
|
||||
|
||||
// RepositoryRemover removes given repository
|
||||
type RepositoryRemover interface {
|
||||
Remove(ctx context.Context, name reference.Named) error
|
||||
}
|
||||
|
||||
// ManifestServiceOption is a function argument for Manifest Service methods
|
||||
type ManifestServiceOption interface {
|
||||
Apply(ManifestService) error
|
||||
|
|
|
@ -58,10 +58,11 @@ type App struct {
|
|||
|
||||
Config *configuration.Configuration
|
||||
|
||||
router *mux.Router // main application router, configured with dispatchers
|
||||
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
|
||||
registry distribution.Namespace // registry is the primary registry backend for the app instance.
|
||||
accessController auth.AccessController // main access controller for application
|
||||
router *mux.Router // main application router, configured with dispatchers
|
||||
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
|
||||
registry distribution.Namespace // registry is the primary registry backend for the app instance.
|
||||
repoRemover distribution.RepositoryRemover // repoRemover provides ability to delete repos
|
||||
accessController auth.AccessController // main access controller for application
|
||||
|
||||
// httpHost is a parsed representation of the http.host parameter from
|
||||
// the configuration. Only the Scheme and Host fields are used.
|
||||
|
@ -320,6 +321,11 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
|
|||
app.isCache = true
|
||||
dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
|
||||
}
|
||||
var ok bool
|
||||
app.repoRemover, ok = app.registry.(distribution.RepositoryRemover)
|
||||
if !ok {
|
||||
dcontext.GetLogger(app).Warnf("Registry does not implement RempositoryRemover. Will not be able to delete repos and tags")
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
@ -696,8 +702,9 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
|
|||
}
|
||||
|
||||
// assign and decorate the authorized repository with an event bridge.
|
||||
context.Repository = notifications.Listen(
|
||||
context.Repository, context.App.repoRemover = notifications.Listen(
|
||||
repository,
|
||||
context.App.repoRemover,
|
||||
app.eventBridge(context, r))
|
||||
|
||||
context.Repository, err = applyRepoMiddleware(app, context.Repository, app.Config.Middleware["repository"])
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
|
@ -70,6 +71,16 @@ func (reg *registry) Enumerate(ctx context.Context, ingester func(string) error)
|
|||
return err
|
||||
}
|
||||
|
||||
// Remove removes a repository from storage
|
||||
func (reg *registry) Remove(ctx context.Context, name reference.Named) error {
|
||||
root, err := pathFor(repositoriesRootPathSpec{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
repoDir := path.Join(root, name.Name())
|
||||
return reg.driver.Delete(ctx, repoDir)
|
||||
}
|
||||
|
||||
// lessPath returns true if one path a is less than path b.
|
||||
//
|
||||
// A component-wise comparison is done, rather than the lexical comparison of
|
||||
|
|
|
@ -23,6 +23,7 @@ type registry struct {
|
|||
schema1SigningKey libtrust.PrivateKey
|
||||
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
|
||||
manifestURLs manifestURLs
|
||||
driver storagedriver.StorageDriver
|
||||
}
|
||||
|
||||
// manifestURLs holds regular expressions for controlling manifest URL whitelisting
|
||||
|
@ -133,6 +134,7 @@ func NewRegistry(ctx context.Context, driver storagedriver.StorageDriver, option
|
|||
},
|
||||
statter: statter,
|
||||
resumableDigestEnabled: true,
|
||||
driver: driver,
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
|
|
Loading…
Reference in a new issue