Invalidate the blob store descriptor caches when content is removed from from

the proxy.  Also, switch to reference in the scheduler API.

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
Richard Scothern 2016-01-26 16:42:10 -08:00
parent a7740f5d0f
commit 3e570e59f1
7 changed files with 152 additions and 62 deletions

View file

@ -7,13 +7,12 @@ import (
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver"
)
// onTTLExpiryFunc is called when a repository's TTL expires
type expiryFunc func(string) error
type expiryFunc func(reference.Reference) error
const (
entryTypeBlob = iota
@ -82,19 +81,20 @@ func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
}
// AddBlob schedules a blob cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddBlob(dgst digest.Digest, ttl time.Duration) error {
func (ttles *TTLExpirationScheduler) AddBlob(blobRef reference.Canonical, ttl time.Duration) error {
ttles.Lock()
defer ttles.Unlock()
if ttles.stopped {
return fmt.Errorf("scheduler not started")
}
ttles.add(dgst.String(), ttl, entryTypeBlob)
ttles.add(blobRef, ttl, entryTypeBlob)
return nil
}
// AddManifest schedules a manifest cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl time.Duration) error {
func (ttles *TTLExpirationScheduler) AddManifest(manifestRef reference.Canonical, ttl time.Duration) error {
ttles.Lock()
defer ttles.Unlock()
@ -102,7 +102,7 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl t
return fmt.Errorf("scheduler not started")
}
ttles.add(repoName.Name(), ttl, entryTypeManifest)
ttles.add(manifestRef, ttl, entryTypeManifest)
return nil
}
@ -156,17 +156,17 @@ func (ttles *TTLExpirationScheduler) Start() error {
return nil
}
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
entry := &schedulerEntry{
Key: key,
Key: r.String(),
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
oldEntry.timer.Stop()
}
ttles.entries[key] = entry
ttles.entries[entry.Key] = entry
entry.timer = ttles.startTimer(entry, ttl)
ttles.indexDirty = true
}
@ -184,13 +184,18 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
case entryTypeManifest:
f = ttles.onManifestExpire
default:
f = func(repoName string) error {
return fmt.Errorf("Unexpected scheduler entry type")
f = func(reference.Reference) error {
return fmt.Errorf("scheduler entry type")
}
}
if err := f(entry.Key); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
ref, err := reference.Parse(entry.Key)
if err == nil {
if err := f(ref); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}
} else {
context.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
}
delete(ttles.entries, entry.Key)
@ -249,6 +254,5 @@ func (ttles *TTLExpirationScheduler) readState() error {
if err != nil {
return err
}
return nil
}

View file

@ -6,28 +6,49 @@ import (
"time"
"github.com/docker/distribution/context"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
return ref1, ref2, ref3
}
func TestSchedule(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
"ch00": true,
ref1.String(): true,
ref2.String(): true,
ref3.String(): true,
}
s := New(context.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName string) error {
deleteFunc := func(repoName reference.Reference) error {
if len(remainingRepos) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok := remainingRepos[repoName]
_, ok := remainingRepos[repoName.String()]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
}
t.Log("removing", repoName)
delete(remainingRepos, repoName)
delete(remainingRepos, repoName.String())
return nil
}
@ -37,11 +58,11 @@ func TestSchedule(t *testing.T) {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}
s.add("testBlob1", 3*timeUnit, entryTypeBlob)
s.add("testBlob2", 1*timeUnit, entryTypeBlob)
s.add(ref1, 3*timeUnit, entryTypeBlob)
s.add(ref2, 1*timeUnit, entryTypeBlob)
func() {
s.add("ch00", 1*timeUnit, entryTypeBlob)
s.add(ref3, 1*timeUnit, entryTypeBlob)
}()
@ -53,33 +74,34 @@ func TestSchedule(t *testing.T) {
}
func TestRestoreOld(t *testing.T) {
ref1, ref2, _ := testRefs(t)
remainingRepos := map[string]bool{
"testBlob1": true,
"oldRepo": true,
ref1.String(): true,
ref2.String(): true,
}
deleteFunc := func(repoName string) error {
if repoName == "oldRepo" && len(remainingRepos) == 3 {
t.Errorf("oldRepo should be removed first")
deleteFunc := func(r reference.Reference) error {
if r.String() == ref1.String() && len(remainingRepos) == 2 {
t.Errorf("ref1 should be removed first")
}
_, ok := remainingRepos[repoName]
_, ok := remainingRepos[r.String()]
if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
t.Fatalf("Trying to remove nonexistant repo: %s", r)
}
delete(remainingRepos, repoName)
delete(remainingRepos, r.String())
return nil
}
timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{
"testBlob1": {
ref1.String(): {
Expiry: time.Now().Add(1 * timeUnit),
Key: "testBlob1",
Key: ref1.String(),
EntryType: 0,
},
"oldRepo": {
ref2.String(): {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: "oldRepo",
Key: ref2.String(),
EntryType: 0,
},
})
@ -108,13 +130,16 @@ func TestRestoreOld(t *testing.T) {
}
func TestStopRestore(t *testing.T) {
ref1, ref2, _ := testRefs(t)
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
"testBlob1": true,
"testBlob2": true,
ref1.String(): true,
ref2.String(): true,
}
deleteFunc := func(repoName string) error {
delete(remainingRepos, repoName)
deleteFunc := func(r reference.Reference) error {
delete(remainingRepos, r.String())
return nil
}
@ -127,8 +152,8 @@ func TestStopRestore(t *testing.T) {
if err != nil {
t.Fatalf(err.Error())
}
s.add("testBlob1", 300*timeUnit, entryTypeBlob)
s.add("testBlob2", 100*timeUnit, entryTypeBlob)
s.add(ref1, 300*timeUnit, entryTypeBlob)
s.add(ref2, 100*timeUnit, entryTypeBlob)
// Start and stop before all operations complete
// state will be written to fs