Merge pull request #1096 from aaronlehmann/simplify-proxy-scheduler

Simplify proxy scheduler
This commit is contained in:
Richard Scothern 2015-10-28 13:23:00 -07:00
commit 987a69dd05
2 changed files with 84 additions and 115 deletions

View file

@ -3,13 +3,14 @@ package scheduler
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver"
) )
// onTTLExpiryFunc is called when a repositories' TTL expires // onTTLExpiryFunc is called when a repository's TTL expires
type expiryFunc func(string) error type expiryFunc func(string) error
const ( const (
@ -23,14 +24,14 @@ type schedulerEntry struct {
Key string `json:"Key"` Key string `json:"Key"`
Expiry time.Time `json:"ExpiryData"` Expiry time.Time `json:"ExpiryData"`
EntryType int `json:"EntryType"` EntryType int `json:"EntryType"`
timer *time.Timer
} }
// New returns a new instance of the scheduler // New returns a new instance of the scheduler
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler { func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
return &TTLExpirationScheduler{ return &TTLExpirationScheduler{
entries: make(map[string]schedulerEntry), entries: make(map[string]*schedulerEntry),
addChan: make(chan schedulerEntry),
stopChan: make(chan bool),
driver: driver, driver: driver,
pathToStateFile: path, pathToStateFile: path,
ctx: ctx, ctx: ctx,
@ -41,9 +42,9 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi
// TTLExpirationScheduler is a scheduler used to perform actions // TTLExpirationScheduler is a scheduler used to perform actions
// when TTLs expire // when TTLs expire
type TTLExpirationScheduler struct { type TTLExpirationScheduler struct {
entries map[string]schedulerEntry sync.Mutex
addChan chan schedulerEntry
stopChan chan bool entries map[string]*schedulerEntry
driver driver.StorageDriver driver driver.StorageDriver
ctx context.Context ctx context.Context
@ -55,24 +56,27 @@ type TTLExpirationScheduler struct {
onManifestExpire expiryFunc onManifestExpire expiryFunc
} }
// addChan allows more TTLs to be pushed to the scheduler
type addChan chan schedulerEntry
// stopChan allows the scheduler to be stopped - used for testing.
type stopChan chan bool
// OnBlobExpire is called when a scheduled blob's TTL expires // OnBlobExpire is called when a scheduled blob's TTL expires
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) { func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
ttles.Lock()
defer ttles.Unlock()
ttles.onBlobExpire = f ttles.onBlobExpire = f
} }
// OnManifestExpire is called when a scheduled manifest's TTL expires // OnManifestExpire is called when a scheduled manifest's TTL expires
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) { func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
ttles.Lock()
defer ttles.Unlock()
ttles.onManifestExpire = f ttles.onManifestExpire = f
} }
// AddBlob schedules a blob cleanup after ttl expires // AddBlob schedules a blob cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error { func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
ttles.Lock()
defer ttles.Unlock()
if ttles.stopped { if ttles.stopped {
return fmt.Errorf("scheduler not started") return fmt.Errorf("scheduler not started")
} }
@ -82,6 +86,9 @@ func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) err
// AddManifest schedules a manifest cleanup after ttl expires // AddManifest schedules a manifest cleanup after ttl expires
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error { func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
ttles.Lock()
defer ttles.Unlock()
if ttles.stopped { if ttles.stopped {
return fmt.Errorf("scheduler not started") return fmt.Errorf("scheduler not started")
} }
@ -92,23 +99,9 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Durat
// Start starts the scheduler // Start starts the scheduler
func (ttles *TTLExpirationScheduler) Start() error { func (ttles *TTLExpirationScheduler) Start() error {
return ttles.start() ttles.Lock()
} defer ttles.Unlock()
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
entry := schedulerEntry{
Key: key,
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
ttles.addChan <- entry
}
func (ttles *TTLExpirationScheduler) stop() {
ttles.stopChan <- true
}
func (ttles *TTLExpirationScheduler) start() error {
err := ttles.readState() err := ttles.readState()
if err != nil { if err != nil {
return err return err
@ -120,32 +113,41 @@ func (ttles *TTLExpirationScheduler) start() error {
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...") context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
ttles.stopped = false ttles.stopped = false
go ttles.mainloop()
// Start timer for each deserialized entry
for _, entry := range ttles.entries {
entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
}
return nil return nil
} }
// mainloop uses a select statement to listen for events. Most of its time func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
// is spent in waiting on a TTL to expire but can be interrupted when TTLs entry := &schedulerEntry{
// are added. Key: key,
func (ttles *TTLExpirationScheduler) mainloop() { Expiry: time.Now().Add(ttl),
for { EntryType: eType,
if ttles.stopped {
return
} }
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
nextEntry, ttl := nextExpiringEntry(ttles.entries) if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
if len(ttles.entries) == 0 { oldEntry.timer.Stop()
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
} else {
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
} }
ttles.entries[key] = entry
entry.timer = ttles.startTimer(entry, ttl)
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
}
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
return time.AfterFunc(ttl, func() {
ttles.Lock()
defer ttles.Unlock()
select {
case <-time.After(ttl):
var f expiryFunc var f expiryFunc
switch nextEntry.EntryType { switch entry.EntryType {
case entryTypeBlob: case entryTypeBlob:
f = ttles.onBlobExpire f = ttles.onBlobExpire
case entryTypeManifest: case entryTypeManifest:
@ -156,61 +158,30 @@ func (ttles *TTLExpirationScheduler) mainloop() {
} }
} }
if err := f(nextEntry.Key); err != nil { if err := f(entry.Key); err != nil {
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err) context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
} }
delete(ttles.entries, nextEntry.Key) delete(ttles.entries, entry.Key)
if err := ttles.writeState(); err != nil { if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
} }
case entry := <-ttles.addChan: })
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
ttles.entries[entry.Key] = entry
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
break
case <-ttles.stopChan:
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
}
ttles.stopped = true
}
}
} }
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) { // Stop stops the scheduler.
if len(entries) == 0 { func (ttles *TTLExpirationScheduler) Stop() {
return nil, 24 * time.Hour ttles.Lock()
defer ttles.Unlock()
if err := ttles.writeState(); err != nil {
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
} }
// todo:(richardscothern) this is a primitive o(n) algorithm for _, entry := range ttles.entries {
// but n will never be *that* big and it's all in memory. Investigate entry.timer.Stop()
// time.AfterFunc for heap based expiries
first := true
var nextEntry schedulerEntry
for _, entry := range entries {
if first {
nextEntry = entry
first = false
continue
} }
if entry.Expiry.Before(nextEntry.Expiry) { ttles.stopped = true
nextEntry = entry
}
}
// Dates may be from the past if the scheduler has
// been restarted, set their ttl to 0
if nextEntry.Expiry.Before(time.Now()) {
nextEntry.Expiry = time.Now()
return &nextEntry, 0
}
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
} }
func (ttles *TTLExpirationScheduler) writeState() error { func (ttles *TTLExpirationScheduler) writeState() error {

View file

@ -2,7 +2,6 @@ package scheduler
import ( import (
"encoding/json" "encoding/json"
"fmt"
"testing" "testing"
"time" "time"
@ -27,13 +26,13 @@ func TestSchedule(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Trying to remove nonexistant repo: %s", repoName) t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
} }
fmt.Println("removing", repoName) t.Log("removing", repoName)
delete(remainingRepos, repoName) delete(remainingRepos, repoName)
return nil return nil
} }
s.onBlobExpire = deleteFunc s.onBlobExpire = deleteFunc
err := s.start() err := s.Start()
if err != nil { if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err) t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
} }
@ -97,7 +96,7 @@ func TestRestoreOld(t *testing.T) {
} }
s := New(context.Background(), fs, "/ttl") s := New(context.Background(), fs, "/ttl")
s.onBlobExpire = deleteFunc s.onBlobExpire = deleteFunc
err = s.start() err = s.Start()
if err != nil { if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err) t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
} }
@ -124,7 +123,7 @@ func TestStopRestore(t *testing.T) {
s := New(context.Background(), fs, pathToStateFile) s := New(context.Background(), fs, pathToStateFile)
s.onBlobExpire = deleteFunc s.onBlobExpire = deleteFunc
err := s.start() err := s.Start()
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -133,13 +132,13 @@ func TestStopRestore(t *testing.T) {
// Start and stop before all operations complete // Start and stop before all operations complete
// state will be written to fs // state will be written to fs
s.stop() s.Stop()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// v2 will restore state from fs // v2 will restore state from fs
s2 := New(context.Background(), fs, pathToStateFile) s2 := New(context.Background(), fs, pathToStateFile)
s2.onBlobExpire = deleteFunc s2.onBlobExpire = deleteFunc
err = s2.start() err = s2.Start()
if err != nil { if err != nil {
t.Fatalf("Error starting v2: %s", err.Error()) t.Fatalf("Error starting v2: %s", err.Error())
} }
@ -153,12 +152,11 @@ func TestStopRestore(t *testing.T) {
func TestDoubleStart(t *testing.T) { func TestDoubleStart(t *testing.T) {
s := New(context.Background(), inmemory.New(), "/ttl") s := New(context.Background(), inmemory.New(), "/ttl")
err := s.start() err := s.Start()
if err != nil { if err != nil {
t.Fatalf("Unable to start scheduler") t.Fatalf("Unable to start scheduler")
} }
fmt.Printf("%#v", s) err = s.Start()
err = s.start()
if err == nil { if err == nil {
t.Fatalf("Scheduler started twice without error") t.Fatalf("Scheduler started twice without error")
} }