From fed10a2c221a40efa8e7e6849f1b371a05d7e16c Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 18 Dec 2015 08:48:32 -0800 Subject: [PATCH 1/2] Add chanotify package It allows to listen for notifications on bunch of channels. Signed-off-by: Alexander Morozov --- chanotify/chanotify.go | 97 +++++++++++++++++++++++++++++++++++++ chanotify/chanotify_test.go | 80 ++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 chanotify/chanotify.go create mode 100644 chanotify/chanotify_test.go diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go new file mode 100644 index 0000000..75d376b --- /dev/null +++ b/chanotify/chanotify.go @@ -0,0 +1,97 @@ +package chanotify + +import ( + "reflect" + "sync" +) + +type dataPair struct { + id string + selectCase reflect.SelectCase +} + +// Notifier can effectively notify you about receiving from particular channels. +// It operates with pairs <-chan struct{} <-> string which is notification +// channel and its identificator respectively. +// Notification channel is <-chan struc{}, each send to which is spawn +// notification from Notifier, close doesn't spawn anything and removes channel +// from Notifier. +type Notifier struct { + c chan string + chMap map[<-chan struct{}]*dataPair + exit chan struct{} + m sync.Mutex +} + +// New returns already running *Notifier. +func New() *Notifier { + s := &Notifier{ + c: make(chan string), + chMap: make(map[<-chan struct{}]*dataPair), + exit: make(chan struct{}), + } + go s.start() + return s +} + +// Chan returns channel on which client listen for notifications. +// Ids of notifications is sent to that channel. +func (s *Notifier) Chan() <-chan string { + return s.c +} + +// Add adds new notification channel to Notifier. +func (s *Notifier) Add(ch <-chan struct{}, id string) { + s.m.Lock() + s.chMap[ch] = &dataPair{ + id: id, + selectCase: reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }, + } + s.m.Unlock() +} + +// Close stops Notifier to listen for any notifications and closes its +// "client-side" channel. +func (s *Notifier) Close() { + close(s.exit) +} + +func (s *Notifier) start() { + for { + c := s.createCase() + i, _, ok := reflect.Select(c) + if i == 0 { + // exit was closed, we can safely close output + close(s.c) + return + } + ch := c[i].Chan.Interface().(<-chan struct{}) + if ok { + s.c <- s.chMap[ch].id + continue + } + // the channel was closed and we should remove it + s.m.Lock() + delete(s.chMap, ch) + s.m.Unlock() + } +} + +func (s *Notifier) createCase() []reflect.SelectCase { + // put exit channel as 0 element of select + out := []reflect.SelectCase{ + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.exit), + }, + } + s.m.Lock() + for _, pair := range s.chMap { + out = append(out, pair.selectCase) + } + s.m.Unlock() + return out +} diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go new file mode 100644 index 0000000..34afd75 --- /dev/null +++ b/chanotify/chanotify_test.go @@ -0,0 +1,80 @@ +package chanotify + +import ( + "strconv" + "sync" + "testing" + "time" +) + +func TestNotifier(t *testing.T) { + s := New() + ch1 := make(chan struct{}, 1) + ch2 := make(chan struct{}, 1) + s.Add(ch1, "1") + s.Add(ch2, "2") + s.m.Lock() + if len(s.chMap) != 2 { + t.Fatalf("expected 2 channels, got %d", len(s.chMap)) + } + s.m.Unlock() + ch1 <- struct{}{} + id1 := <-s.Chan() + if id1 != "1" { + t.Fatalf("1 should be spawned, got %s", id1) + } + ch2 <- struct{}{} + id2 := <-s.Chan() + if id2 != "2" { + t.Fatalf("2 should be spawned, got %s", id2) + } + close(ch1) + close(ch2) + time.Sleep(100 * time.Millisecond) + s.m.Lock() + if len(s.chMap) != 0 { + t.Fatalf("expected 0 channels, got %d", len(s.chMap)) + } + s.m.Unlock() +} + +func TestConcurrentNotifier(t *testing.T) { + s := New() + var chs []chan struct{} + for i := 0; i < 8; i++ { + ch := make(chan struct{}, 2) + s.Add(ch, strconv.Itoa(i)) + chs = append(chs, ch) + } + testCounter := make(map[string]int) + done := make(chan struct{}) + go func() { + for id := range s.Chan() { + testCounter[id]++ + } + close(done) + }() + var wg sync.WaitGroup + for _, ch := range chs { + wg.Add(1) + go func(ch chan struct{}) { + ch <- struct{}{} + ch <- struct{}{} + close(ch) + wg.Done() + }(ch) + } + wg.Wait() + // wait for notifications + time.Sleep(1 * time.Second) + s.Close() + <-done + if len(testCounter) != 8 { + t.Fatalf("expect to find exactly 8 distinct ids, got %d", len(testCounter)) + } + for id, c := range testCounter { + if c != 2 { + t.Fatalf("Expected to find exactly 2 id %s, but got %d", id, c) + } + } +} From 4f0c3850ba5906449b6408426206177c8a559d38 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 18 Dec 2015 12:17:53 -0800 Subject: [PATCH 2/2] Use chanotify in supervisor Signed-off-by: Alexander Morozov --- supervisor/oom.go | 80 ---------------------------------------- supervisor/supervisor.go | 15 +++++++- 2 files changed, 13 insertions(+), 82 deletions(-) delete mode 100644 supervisor/oom.go diff --git a/supervisor/oom.go b/supervisor/oom.go deleted file mode 100644 index 1455b35..0000000 --- a/supervisor/oom.go +++ /dev/null @@ -1,80 +0,0 @@ -package supervisor - -import ( - "reflect" - "sync" -) - -func newNotifier(s *Supervisor) *notifier { - n := ¬ifier{ - s: s, - channels: make(map[<-chan struct{}]string), - controller: make(chan struct{}), - } - go n.start() - return n -} - -type notifier struct { - m sync.Mutex - channels map[<-chan struct{}]string - controller chan struct{} - s *Supervisor -} - -func (n *notifier) start() { - for { - c := n.createCase() - i, _, ok := reflect.Select(c) - if i == 0 { - continue - } - if ok { - ch := c[i].Chan.Interface().(<-chan struct{}) - id := n.channels[ch] - e := NewEvent(OOMEventType) - e.ID = id - n.s.SendEvent(e) - continue - } - // the channel was closed and we should remove it - ch := c[i].Chan.Interface().(<-chan struct{}) - n.removeChan(ch) - } -} - -func (n *notifier) Add(ch <-chan struct{}, id string) { - n.m.Lock() - n.channels[ch] = id - n.m.Unlock() - // signal the main loop to break and add the new - // channels - n.controller <- struct{}{} -} - -func (n *notifier) createCase() []reflect.SelectCase { - var out []reflect.SelectCase - // add controller chan so that we can signal when we need to make - // changes in the select. The controller chan will always be at - // index 0 in the slice - out = append(out, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(n.controller), - }) - n.m.Lock() - for ch := range n.channels { - v := reflect.ValueOf(ch) - out = append(out, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: v, - }) - } - n.m.Unlock() - return out -} - -func (n *notifier) removeChan(ch <-chan struct{}) { - n.m.Lock() - delete(n.channels, ch) - n.m.Unlock() -} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index df8bcfc..11fbd53 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -9,6 +9,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/containerd/chanotify" "github.com/docker/containerd/eventloop" "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" @@ -45,7 +46,14 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err el: eventloop.NewChanLoop(defaultBufferSize), } if oom { - s.notifier = newNotifier(s) + s.notifier = chanotify.New() + go func() { + for id := range s.notifier.Chan() { + e := NewEvent(OOMEventType) + e.ID = id + s.SendEvent(e) + } + }() } // register default event handlers s.handlers = map[EventType]Handler{ @@ -88,7 +96,7 @@ type Supervisor struct { machine Machine containerGroup sync.WaitGroup statsCollector *statsCollector - notifier *notifier + notifier *chanotify.Notifier el eventloop.EventLoop } @@ -124,6 +132,9 @@ func (s *Supervisor) Stop(sig chan os.Signal) { logrus.Debug("waiting for containers to exit") s.containerGroup.Wait() logrus.Debug("all containers exited") + if s.notifier != nil { + s.notifier.Close() + } // stop receiving signals and close the channel signal.Stop(sig) close(sig)