diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go index 8ab4bf0..932bfc1 100644 --- a/chanotify/chanotify.go +++ b/chanotify/chanotify.go @@ -11,41 +11,41 @@ import ( // notification from Notifier, close doesn't spawn anything and removes channel // from Notifier. type Notifier struct { - c chan string + c chan interface{} m sync.Mutex // guards doneCh - doneCh map[string]chan struct{} + doneCh map[interface{}]chan struct{} } // New returns a new *Notifier. func New() *Notifier { s := &Notifier{ - c: make(chan string), - doneCh: make(map[string]chan struct{}), + c: make(chan interface{}), + doneCh: make(map[interface{}]chan struct{}), } return s } // Chan returns channel on which client listen for notifications. // IDs of notifications is sent to the returned channel. -func (n *Notifier) Chan() <-chan string { +func (n *Notifier) Chan() <-chan interface{} { return n.c } -func (n *Notifier) killWorker(id string, done chan struct{}) { +func (n *Notifier) killWorker(id interface{}, done chan struct{}) { n.m.Lock() delete(n.doneCh, id) n.m.Unlock() } // Add adds new notification channel to Notifier. -func (n *Notifier) Add(ch <-chan struct{}, id string) { +func (n *Notifier) Add(id interface{}, ch <-chan struct{}) { done := make(chan struct{}) n.m.Lock() n.doneCh[id] = done n.m.Unlock() - go func(ch <-chan struct{}, id string, done chan struct{}) { + go func(ch <-chan struct{}, id interface{}, done chan struct{}) { for { select { case _, ok := <-ch: diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go index c88fc3e..e4cad95 100644 --- a/chanotify/chanotify_test.go +++ b/chanotify/chanotify_test.go @@ -11,8 +11,8 @@ func TestNotifier(t *testing.T) { s := New() ch1 := make(chan struct{}, 1) ch2 := make(chan struct{}, 1) - s.Add(ch1, "1") - s.Add(ch2, "2") + s.Add("1", ch1) + s.Add("2", ch2) s.m.Lock() if len(s.doneCh) != 2 { t.Fatalf("expected 2 channels, got %d", len(s.doneCh)) @@ -20,12 +20,12 @@ func TestNotifier(t *testing.T) { s.m.Unlock() ch1 <- struct{}{} id1 := <-s.Chan() - if id1 != "1" { + if id1.(string) != "1" { t.Fatalf("1 should be spawned, got %s", id1) } ch2 <- struct{}{} id2 := <-s.Chan() - if id2 != "2" { + if id2.(string) != "2" { t.Fatalf("2 should be spawned, got %s", id2) } close(ch1) @@ -43,10 +43,10 @@ func TestConcurrentNotifier(t *testing.T) { var chs []chan struct{} for i := 0; i < 8; i++ { ch := make(chan struct{}, 2) - s.Add(ch, strconv.Itoa(i)) + s.Add(strconv.Itoa(i), ch) chs = append(chs, ch) } - testCounter := make(map[string]int) + testCounter := make(map[interface{}]int) done := make(chan struct{}) go func() { for id := range s.Chan() { @@ -85,7 +85,7 @@ func TestAddToBlocked(t *testing.T) { go func() { // give some time to start first select time.Sleep(1 * time.Second) - s.Add(ch, "1") + s.Add("1", ch) ch <- struct{}{} }() val := <-s.Chan() diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 11fbd53..5c54a44 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -50,7 +50,7 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err go func() { for id := range s.notifier.Chan() { e := NewEvent(OOMEventType) - e.ID = id + e.ID = id.(string) s.SendEvent(e) } }() diff --git a/supervisor/worker.go b/supervisor/worker.go index d6ef78d..4acb8d2 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -74,7 +74,7 @@ func (w *worker) Start() { if err != nil { logrus.WithField("error", err).Error("containerd: notify OOM events") } else { - w.s.notifier.Add(n, t.Container.ID()) + w.s.notifier.Add(t.Container.ID(), n) } } ContainerStartTimer.UpdateSince(started)