Simplify proxy scheduler
The proxy scheduler implemented its own timer state machine. It's simpler and more efficient to leverage the Go runtime's timer heap by using time.AfterFunc. This commit adds a time.Timer to each scheduler entry, and starts and stops those timers as necessary. Then the mainloop goroutine and its associated logic are not needed. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
da206d64cb
commit
84595fc628
2 changed files with 84 additions and 115 deletions
|
@ -3,13 +3,14 @@ package scheduler
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/registry/storage/driver"
|
"github.com/docker/distribution/registry/storage/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// onTTLExpiryFunc is called when a repositories' TTL expires
|
// onTTLExpiryFunc is called when a repository's TTL expires
|
||||||
type expiryFunc func(string) error
|
type expiryFunc func(string) error
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -23,14 +24,14 @@ type schedulerEntry struct {
|
||||||
Key string `json:"Key"`
|
Key string `json:"Key"`
|
||||||
Expiry time.Time `json:"ExpiryData"`
|
Expiry time.Time `json:"ExpiryData"`
|
||||||
EntryType int `json:"EntryType"`
|
EntryType int `json:"EntryType"`
|
||||||
|
|
||||||
|
timer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new instance of the scheduler
|
// New returns a new instance of the scheduler
|
||||||
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
|
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
|
||||||
return &TTLExpirationScheduler{
|
return &TTLExpirationScheduler{
|
||||||
entries: make(map[string]schedulerEntry),
|
entries: make(map[string]*schedulerEntry),
|
||||||
addChan: make(chan schedulerEntry),
|
|
||||||
stopChan: make(chan bool),
|
|
||||||
driver: driver,
|
driver: driver,
|
||||||
pathToStateFile: path,
|
pathToStateFile: path,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -41,9 +42,9 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi
|
||||||
// TTLExpirationScheduler is a scheduler used to perform actions
|
// TTLExpirationScheduler is a scheduler used to perform actions
|
||||||
// when TTLs expire
|
// when TTLs expire
|
||||||
type TTLExpirationScheduler struct {
|
type TTLExpirationScheduler struct {
|
||||||
entries map[string]schedulerEntry
|
sync.Mutex
|
||||||
addChan chan schedulerEntry
|
|
||||||
stopChan chan bool
|
entries map[string]*schedulerEntry
|
||||||
|
|
||||||
driver driver.StorageDriver
|
driver driver.StorageDriver
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
@ -55,24 +56,27 @@ type TTLExpirationScheduler struct {
|
||||||
onManifestExpire expiryFunc
|
onManifestExpire expiryFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// addChan allows more TTLs to be pushed to the scheduler
|
|
||||||
type addChan chan schedulerEntry
|
|
||||||
|
|
||||||
// stopChan allows the scheduler to be stopped - used for testing.
|
|
||||||
type stopChan chan bool
|
|
||||||
|
|
||||||
// OnBlobExpire is called when a scheduled blob's TTL expires
|
// OnBlobExpire is called when a scheduled blob's TTL expires
|
||||||
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
|
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
|
||||||
|
ttles.Lock()
|
||||||
|
defer ttles.Unlock()
|
||||||
|
|
||||||
ttles.onBlobExpire = f
|
ttles.onBlobExpire = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnManifestExpire is called when a scheduled manifest's TTL expires
|
// OnManifestExpire is called when a scheduled manifest's TTL expires
|
||||||
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
|
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
|
||||||
|
ttles.Lock()
|
||||||
|
defer ttles.Unlock()
|
||||||
|
|
||||||
ttles.onManifestExpire = f
|
ttles.onManifestExpire = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddBlob schedules a blob cleanup after ttl expires
|
// AddBlob schedules a blob cleanup after ttl expires
|
||||||
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
|
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
|
||||||
|
ttles.Lock()
|
||||||
|
defer ttles.Unlock()
|
||||||
|
|
||||||
if ttles.stopped {
|
if ttles.stopped {
|
||||||
return fmt.Errorf("scheduler not started")
|
return fmt.Errorf("scheduler not started")
|
||||||
}
|
}
|
||||||
|
@ -82,6 +86,9 @@ func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) err
|
||||||
|
|
||||||
// AddManifest schedules a manifest cleanup after ttl expires
|
// AddManifest schedules a manifest cleanup after ttl expires
|
||||||
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
|
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
|
||||||
|
ttles.Lock()
|
||||||
|
defer ttles.Unlock()
|
||||||
|
|
||||||
if ttles.stopped {
|
if ttles.stopped {
|
||||||
return fmt.Errorf("scheduler not started")
|
return fmt.Errorf("scheduler not started")
|
||||||
}
|
}
|
||||||
|
@ -92,23 +99,9 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Durat
|
||||||
|
|
||||||
// Start starts the scheduler
|
// Start starts the scheduler
|
||||||
func (ttles *TTLExpirationScheduler) Start() error {
|
func (ttles *TTLExpirationScheduler) Start() error {
|
||||||
return ttles.start()
|
ttles.Lock()
|
||||||
}
|
defer ttles.Unlock()
|
||||||
|
|
||||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
|
|
||||||
entry := schedulerEntry{
|
|
||||||
Key: key,
|
|
||||||
Expiry: time.Now().Add(ttl),
|
|
||||||
EntryType: eType,
|
|
||||||
}
|
|
||||||
ttles.addChan <- entry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ttles *TTLExpirationScheduler) stop() {
|
|
||||||
ttles.stopChan <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ttles *TTLExpirationScheduler) start() error {
|
|
||||||
err := ttles.readState()
|
err := ttles.readState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -120,32 +113,41 @@ func (ttles *TTLExpirationScheduler) start() error {
|
||||||
|
|
||||||
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
|
context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
|
||||||
ttles.stopped = false
|
ttles.stopped = false
|
||||||
go ttles.mainloop()
|
|
||||||
|
// Start timer for each deserialized entry
|
||||||
|
for _, entry := range ttles.entries {
|
||||||
|
entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mainloop uses a select statement to listen for events. Most of its time
|
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
|
||||||
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
|
entry := &schedulerEntry{
|
||||||
// are added.
|
Key: key,
|
||||||
func (ttles *TTLExpirationScheduler) mainloop() {
|
Expiry: time.Now().Add(ttl),
|
||||||
for {
|
EntryType: eType,
|
||||||
if ttles.stopped {
|
}
|
||||||
return
|
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
||||||
|
if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
|
||||||
|
oldEntry.timer.Stop()
|
||||||
|
}
|
||||||
|
ttles.entries[key] = entry
|
||||||
|
entry.timer = ttles.startTimer(entry, ttl)
|
||||||
|
|
||||||
|
if err := ttles.writeState(); err != nil {
|
||||||
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nextEntry, ttl := nextExpiringEntry(ttles.entries)
|
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
|
||||||
if len(ttles.entries) == 0 {
|
return time.AfterFunc(ttl, func() {
|
||||||
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
|
ttles.Lock()
|
||||||
} else {
|
defer ttles.Unlock()
|
||||||
context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(ttl):
|
|
||||||
var f expiryFunc
|
var f expiryFunc
|
||||||
|
|
||||||
switch nextEntry.EntryType {
|
switch entry.EntryType {
|
||||||
case entryTypeBlob:
|
case entryTypeBlob:
|
||||||
f = ttles.onBlobExpire
|
f = ttles.onBlobExpire
|
||||||
case entryTypeManifest:
|
case entryTypeManifest:
|
||||||
|
@ -156,62 +158,31 @@ func (ttles *TTLExpirationScheduler) mainloop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f(nextEntry.Key); err != nil {
|
if err := f(entry.Key); err != nil {
|
||||||
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
|
context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(ttles.entries, nextEntry.Key)
|
delete(ttles.entries, entry.Key)
|
||||||
if err := ttles.writeState(); err != nil {
|
if err := ttles.writeState(); err != nil {
|
||||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||||
}
|
}
|
||||||
case entry := <-ttles.addChan:
|
})
|
||||||
context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
|
||||||
ttles.entries[entry.Key] = entry
|
|
||||||
if err := ttles.writeState(); err != nil {
|
|
||||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
|
||||||
}
|
}
|
||||||
break
|
|
||||||
|
|
||||||
case <-ttles.stopChan:
|
// Stop stops the scheduler.
|
||||||
|
func (ttles *TTLExpirationScheduler) Stop() {
|
||||||
|
ttles.Lock()
|
||||||
|
defer ttles.Unlock()
|
||||||
|
|
||||||
if err := ttles.writeState(); err != nil {
|
if err := ttles.writeState(); err != nil {
|
||||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, entry := range ttles.entries {
|
||||||
|
entry.timer.Stop()
|
||||||
|
}
|
||||||
ttles.stopped = true
|
ttles.stopped = true
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
|
|
||||||
if len(entries) == 0 {
|
|
||||||
return nil, 24 * time.Hour
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo:(richardscothern) this is a primitive o(n) algorithm
|
|
||||||
// but n will never be *that* big and it's all in memory. Investigate
|
|
||||||
// time.AfterFunc for heap based expiries
|
|
||||||
|
|
||||||
first := true
|
|
||||||
var nextEntry schedulerEntry
|
|
||||||
for _, entry := range entries {
|
|
||||||
if first {
|
|
||||||
nextEntry = entry
|
|
||||||
first = false
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if entry.Expiry.Before(nextEntry.Expiry) {
|
|
||||||
nextEntry = entry
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dates may be from the past if the scheduler has
|
|
||||||
// been restarted, set their ttl to 0
|
|
||||||
if nextEntry.Expiry.Before(time.Now()) {
|
|
||||||
nextEntry.Expiry = time.Now()
|
|
||||||
return &nextEntry, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return &nextEntry, nextEntry.Expiry.Sub(time.Now())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ttles *TTLExpirationScheduler) writeState() error {
|
func (ttles *TTLExpirationScheduler) writeState() error {
|
||||||
jsonBytes, err := json.Marshal(ttles.entries)
|
jsonBytes, err := json.Marshal(ttles.entries)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -27,13 +26,13 @@ func TestSchedule(t *testing.T) {
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
|
t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
|
||||||
}
|
}
|
||||||
fmt.Println("removing", repoName)
|
t.Log("removing", repoName)
|
||||||
delete(remainingRepos, repoName)
|
delete(remainingRepos, repoName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.onBlobExpire = deleteFunc
|
s.onBlobExpire = deleteFunc
|
||||||
err := s.start()
|
err := s.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -97,7 +96,7 @@ func TestRestoreOld(t *testing.T) {
|
||||||
}
|
}
|
||||||
s := New(context.Background(), fs, "/ttl")
|
s := New(context.Background(), fs, "/ttl")
|
||||||
s.onBlobExpire = deleteFunc
|
s.onBlobExpire = deleteFunc
|
||||||
err = s.start()
|
err = s.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -124,7 +123,7 @@ func TestStopRestore(t *testing.T) {
|
||||||
s := New(context.Background(), fs, pathToStateFile)
|
s := New(context.Background(), fs, pathToStateFile)
|
||||||
s.onBlobExpire = deleteFunc
|
s.onBlobExpire = deleteFunc
|
||||||
|
|
||||||
err := s.start()
|
err := s.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf(err.Error())
|
t.Fatalf(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -133,13 +132,13 @@ func TestStopRestore(t *testing.T) {
|
||||||
|
|
||||||
// Start and stop before all operations complete
|
// Start and stop before all operations complete
|
||||||
// state will be written to fs
|
// state will be written to fs
|
||||||
s.stop()
|
s.Stop()
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
// v2 will restore state from fs
|
// v2 will restore state from fs
|
||||||
s2 := New(context.Background(), fs, pathToStateFile)
|
s2 := New(context.Background(), fs, pathToStateFile)
|
||||||
s2.onBlobExpire = deleteFunc
|
s2.onBlobExpire = deleteFunc
|
||||||
err = s2.start()
|
err = s2.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error starting v2: %s", err.Error())
|
t.Fatalf("Error starting v2: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -153,12 +152,11 @@ func TestStopRestore(t *testing.T) {
|
||||||
|
|
||||||
func TestDoubleStart(t *testing.T) {
|
func TestDoubleStart(t *testing.T) {
|
||||||
s := New(context.Background(), inmemory.New(), "/ttl")
|
s := New(context.Background(), inmemory.New(), "/ttl")
|
||||||
err := s.start()
|
err := s.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unable to start scheduler")
|
t.Fatalf("Unable to start scheduler")
|
||||||
}
|
}
|
||||||
fmt.Printf("%#v", s)
|
err = s.Start()
|
||||||
err = s.start()
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("Scheduler started twice without error")
|
t.Fatalf("Scheduler started twice without error")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue