Use chanotify in supervisor

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
Alexander Morozov 2015-12-18 12:17:53 -08:00
parent fed10a2c22
commit 4f0c3850ba
2 changed files with 13 additions and 82 deletions

View file

@ -1,80 +0,0 @@
package supervisor
import (
"reflect"
"sync"
)
func newNotifier(s *Supervisor) *notifier {
n := &notifier{
s: s,
channels: make(map[<-chan struct{}]string),
controller: make(chan struct{}),
}
go n.start()
return n
}
type notifier struct {
m sync.Mutex
channels map[<-chan struct{}]string
controller chan struct{}
s *Supervisor
}
func (n *notifier) start() {
for {
c := n.createCase()
i, _, ok := reflect.Select(c)
if i == 0 {
continue
}
if ok {
ch := c[i].Chan.Interface().(<-chan struct{})
id := n.channels[ch]
e := NewEvent(OOMEventType)
e.ID = id
n.s.SendEvent(e)
continue
}
// the channel was closed and we should remove it
ch := c[i].Chan.Interface().(<-chan struct{})
n.removeChan(ch)
}
}
func (n *notifier) Add(ch <-chan struct{}, id string) {
n.m.Lock()
n.channels[ch] = id
n.m.Unlock()
// signal the main loop to break and add the new
// channels
n.controller <- struct{}{}
}
func (n *notifier) createCase() []reflect.SelectCase {
var out []reflect.SelectCase
// add controller chan so that we can signal when we need to make
// changes in the select. The controller chan will always be at
// index 0 in the slice
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(n.controller),
})
n.m.Lock()
for ch := range n.channels {
v := reflect.ValueOf(ch)
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: v,
})
}
n.m.Unlock()
return out
}
func (n *notifier) removeChan(ch <-chan struct{}) {
n.m.Lock()
delete(n.channels, ch)
n.m.Unlock()
}

View file

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify"
"github.com/docker/containerd/eventloop" "github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer"
@ -45,7 +46,14 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err
el: eventloop.NewChanLoop(defaultBufferSize), el: eventloop.NewChanLoop(defaultBufferSize),
} }
if oom { if oom {
s.notifier = newNotifier(s) s.notifier = chanotify.New()
go func() {
for id := range s.notifier.Chan() {
e := NewEvent(OOMEventType)
e.ID = id
s.SendEvent(e)
}
}()
} }
// register default event handlers // register default event handlers
s.handlers = map[EventType]Handler{ s.handlers = map[EventType]Handler{
@ -88,7 +96,7 @@ type Supervisor struct {
machine Machine machine Machine
containerGroup sync.WaitGroup containerGroup sync.WaitGroup
statsCollector *statsCollector statsCollector *statsCollector
notifier *notifier notifier *chanotify.Notifier
el eventloop.EventLoop el eventloop.EventLoop
} }
@ -124,6 +132,9 @@ func (s *Supervisor) Stop(sig chan os.Signal) {
logrus.Debug("waiting for containers to exit") logrus.Debug("waiting for containers to exit")
s.containerGroup.Wait() s.containerGroup.Wait()
logrus.Debug("all containers exited") logrus.Debug("all containers exited")
if s.notifier != nil {
s.notifier.Close()
}
// stop receiving signals and close the channel // stop receiving signals and close the channel
signal.Stop(sig) signal.Stop(sig)
close(sig) close(sig)