From 4f0c3850ba5906449b6408426206177c8a559d38 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 18 Dec 2015 12:17:53 -0800 Subject: [PATCH] Use chanotify in supervisor Signed-off-by: Alexander Morozov --- supervisor/oom.go | 80 ---------------------------------------- supervisor/supervisor.go | 15 +++++++- 2 files changed, 13 insertions(+), 82 deletions(-) delete mode 100644 supervisor/oom.go diff --git a/supervisor/oom.go b/supervisor/oom.go deleted file mode 100644 index 1455b35..0000000 --- a/supervisor/oom.go +++ /dev/null @@ -1,80 +0,0 @@ -package supervisor - -import ( - "reflect" - "sync" -) - -func newNotifier(s *Supervisor) *notifier { - n := ¬ifier{ - s: s, - channels: make(map[<-chan struct{}]string), - controller: make(chan struct{}), - } - go n.start() - return n -} - -type notifier struct { - m sync.Mutex - channels map[<-chan struct{}]string - controller chan struct{} - s *Supervisor -} - -func (n *notifier) start() { - for { - c := n.createCase() - i, _, ok := reflect.Select(c) - if i == 0 { - continue - } - if ok { - ch := c[i].Chan.Interface().(<-chan struct{}) - id := n.channels[ch] - e := NewEvent(OOMEventType) - e.ID = id - n.s.SendEvent(e) - continue - } - // the channel was closed and we should remove it - ch := c[i].Chan.Interface().(<-chan struct{}) - n.removeChan(ch) - } -} - -func (n *notifier) Add(ch <-chan struct{}, id string) { - n.m.Lock() - n.channels[ch] = id - n.m.Unlock() - // signal the main loop to break and add the new - // channels - n.controller <- struct{}{} -} - -func (n *notifier) createCase() []reflect.SelectCase { - var out []reflect.SelectCase - // add controller chan so that we can signal when we need to make - // changes in the select. The controller chan will always be at - // index 0 in the slice - out = append(out, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(n.controller), - }) - n.m.Lock() - for ch := range n.channels { - v := reflect.ValueOf(ch) - out = append(out, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: v, - }) - } - n.m.Unlock() - return out -} - -func (n *notifier) removeChan(ch <-chan struct{}) { - n.m.Lock() - delete(n.channels, ch) - n.m.Unlock() -} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index df8bcfc..11fbd53 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/containerd/chanotify" "github.com/docker/containerd/eventloop" "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" @@ -45,7 +46,14 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err el: eventloop.NewChanLoop(defaultBufferSize), } if oom { - s.notifier = newNotifier(s) + s.notifier = chanotify.New() + go func() { + for id := range s.notifier.Chan() { + e := NewEvent(OOMEventType) + e.ID = id + s.SendEvent(e) + } + }() } // register default event handlers s.handlers = map[EventType]Handler{ @@ -88,7 +96,7 @@ type Supervisor struct { machine Machine containerGroup sync.WaitGroup statsCollector *statsCollector - notifier *notifier + notifier *chanotify.Notifier el eventloop.EventLoop } @@ -124,6 +132,9 @@ func (s *Supervisor) Stop(sig chan os.Signal) { logrus.Debug("waiting for containers to exit") s.containerGroup.Wait() logrus.Debug("all containers exited") + if s.notifier != nil { + s.notifier.Close() + } // stop receiving signals and close the channel signal.Stop(sig) close(sig)