From e4a61633c59d95d02b760bbc6a671568628a29a7 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Tue, 1 Dec 2015 18:49:24 -0500 Subject: [PATCH] Add event subscribers. - Add exit event for exec processes. Signed-off-by: David Calavera --- api/v1/server.go | 6 +----- event.go | 1 + exit.go | 25 +++++++++++++++++++++---- supervisor.go | 19 +++++++++++++++++-- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/api/v1/server.go b/api/v1/server.go index 3f93e7c..9ffe0aa 100644 --- a/api/v1/server.go +++ b/api/v1/server.go @@ -63,11 +63,7 @@ func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) { } func (s *server) events(w http.ResponseWriter, r *http.Request) { - events, err := s.supervisor.Events() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + events := s.supervisor.Events() enc := json.NewEncoder(w) for evt := range events { var v interface{} diff --git a/event.go b/event.go index e5dd12f..a47a6fa 100644 --- a/event.go +++ b/event.go @@ -11,6 +11,7 @@ import ( type EventType string const ( + ExecExitEventType EventType = "execExit" ExitEventType EventType = "exit" StartContainerEventType EventType = "startContainer" DeleteEventType EventType = "deleteContainerEvent" diff --git a/exit.go b/exit.go index b0a2a7c..25d0c33 100644 --- a/exit.go +++ b/exit.go @@ -6,17 +6,23 @@ type ExitEvent struct { s *Supervisor } +type ExecExitEvent struct { + s *Supervisor +} + func (h *ExitEvent) Handle(e *Event) error { logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") // is it the child process of a container if container, ok := h.s.processes[e.Pid]; ok { - if err := container.RemoveProcess(e.Pid); err != nil { - logrus.WithField("error", err).Error("containerd: find container for pid") - } - delete(h.s.processes, e.Pid) + ne := NewEvent(ExecExitEventType) + ne.ID = container.ID() + ne.Pid = e.Pid + ne.Status = e.Status + h.s.SendEvent(ne) return nil } + // is it the main container's process container, err := h.s.getContainerForPid(e.Pid) if err != nil { @@ -31,3 +37,14 @@ func (h *ExitEvent) Handle(e *Event) error { h.s.SendEvent(ne) return nil } + +func (h *ExecExitEvent) Handle(e *Event) error { + // exec process: we remove this process without notifying the main event loop + container := h.s.processes[e.Pid] + if err := container.RemoveProcess(e.Pid); err != nil { + logrus.WithField("error", err).Error("containerd: find container for pid") + } + delete(h.s.processes, e.Pid) + h.s.NotifySubscribers(e) + return nil +} diff --git a/supervisor.go b/supervisor.go index c1717f9..bfc92c8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -36,6 +36,7 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { } // register default event handlers s.handlers = map[EventType]Handler{ + ExecExitEventType: &ExecExitEvent{s}, ExitEventType: &ExitEvent{s}, StartContainerEventType: &StartEvent{s}, DeleteEventType: &DeleteEvent{s}, @@ -63,16 +64,30 @@ type Supervisor struct { events chan *Event tasks chan *startTask workerGroup sync.WaitGroup + subscribers map[subscriber]bool } +type subscriber chan *Event + // need proper close logic for jobs and stuff so that sending to the channels dont panic // but can complete jobs func (s *Supervisor) Close() error { + //TODO: unsubscribe all channels return s.journal.Close() } -func (s *Supervisor) Events() (<-chan *Event, error) { - return nil, nil +func (s *Supervisor) Events() subscriber { + return subscriber(make(chan *Event)) +} + +func (s *Supervisor) Unsubscribe(sub subscriber) { + delete(s.subscribers, sub) +} + +func (s *Supervisor) NotifySubscribers(e *Event) { + for sub := range s.subscribers { + sub <- e + } } // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and