diff --git a/event.go b/event.go index c21ef6c..540dac2 100644 --- a/event.go +++ b/event.go @@ -63,3 +63,22 @@ type Event struct { type Handler interface { Handle(*Event) error } + +type commonEvent struct { + data *Event + sv *Supervisor +} + +func (e *commonEvent) Handle() { + h, ok := e.sv.handlers[e.data.Type] + if !ok { + e.data.Err <- ErrUnknownEvent + return + } + err := h.Handle(e.data) + if err != errDeferedResponse { + e.data.Err <- err + close(e.data.Err) + return + } +} diff --git a/supervisor.go b/supervisor.go index 2ac9a8c..9406899 100644 --- a/supervisor.go +++ b/supervisor.go @@ -4,12 +4,12 @@ import ( "os" "os/signal" "path/filepath" - goruntime "runtime" "sync" "syscall" "time" "github.com/Sirupsen/logrus" + "github.com/docker/containerd/eventloop" "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" ) @@ -36,10 +36,10 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super processes: make(map[int]*containerInfo), runtime: r, tasks: tasks, - events: make(chan *Event, DefaultBufferSize), machine: machine, subscribers: make(map[chan *Event]struct{}), statsCollector: newStatsCollector(statsInterval), + el: eventloop.NewChanLoop(DefaultBufferSize), } if oom { s.notifier = newNotifier(s) @@ -86,6 +86,7 @@ type Supervisor struct { containerGroup sync.WaitGroup statsCollector *statsCollector notifier *notifier + el eventloop.EventLoop } // Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to @@ -174,32 +175,11 @@ func (s *Supervisor) notifySubscribers(e *Event) { // therefore it is save to do operations in the handlers that modify state of the system or // state of the Supervisor func (s *Supervisor) Start() error { - go func() { - // allocate an entire thread to this goroutine for the main event loop - // so that nothing else is scheduled over the top of it. - goruntime.LockOSThread() - for e := range s.events { - EventsCounter.Inc(1) - h, ok := s.handlers[e.Type] - if !ok { - e.Err <- ErrUnknownEvent - continue - } - if err := h.Handle(e); err != nil { - if err != errDeferedResponse { - e.Err <- err - close(e.Err) - } - continue - } - close(e.Err) - } - }() logrus.WithFields(logrus.Fields{ "runtime": s.runtime.Type(), "stateDir": s.stateDir, }).Debug("Supervisor started") - return nil + return s.el.Start() } // Machine returns the machine information for which the @@ -231,7 +211,8 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { // SendEvent sends the provided event the the supervisors main event loop func (s *Supervisor) SendEvent(evt *Event) { - s.events <- evt + EventsCounter.Inc(1) + s.el.Send(&commonEvent{data: evt, sv: s}) } func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {