Add event subscribers.

- Add exit event for exec processes.

Signed-off-by: David Calavera <david.calavera@gmail.com>
This commit is contained in:
David Calavera 2015-12-01 18:49:24 -05:00
parent 05f20c993d
commit e4a61633c5
4 changed files with 40 additions and 11 deletions

View File

@ -63,11 +63,7 @@ func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) {
}
func (s *server) events(w http.ResponseWriter, r *http.Request) {
events, err := s.supervisor.Events()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
events := s.supervisor.Events()
enc := json.NewEncoder(w)
for evt := range events {
var v interface{}

View File

@ -11,6 +11,7 @@ import (
type EventType string
const (
ExecExitEventType EventType = "execExit"
ExitEventType EventType = "exit"
StartContainerEventType EventType = "startContainer"
DeleteEventType EventType = "deleteContainerEvent"

25
exit.go
View File

@ -6,17 +6,23 @@ type ExitEvent struct {
s *Supervisor
}
type ExecExitEvent struct {
s *Supervisor
}
func (h *ExitEvent) Handle(e *Event) error {
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
Debug("containerd: process exited")
// is it the child process of a container
if container, ok := h.s.processes[e.Pid]; ok {
if err := container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
delete(h.s.processes, e.Pid)
ne := NewEvent(ExecExitEventType)
ne.ID = container.ID()
ne.Pid = e.Pid
ne.Status = e.Status
h.s.SendEvent(ne)
return nil
}
// is it the main container's process
container, err := h.s.getContainerForPid(e.Pid)
if err != nil {
@ -31,3 +37,14 @@ func (h *ExitEvent) Handle(e *Event) error {
h.s.SendEvent(ne)
return nil
}
func (h *ExecExitEvent) Handle(e *Event) error {
// exec process: we remove this process without notifying the main event loop
container := h.s.processes[e.Pid]
if err := container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
delete(h.s.processes, e.Pid)
h.s.NotifySubscribers(e)
return nil
}

View File

@ -36,6 +36,7 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
}
// register default event handlers
s.handlers = map[EventType]Handler{
ExecExitEventType: &ExecExitEvent{s},
ExitEventType: &ExitEvent{s},
StartContainerEventType: &StartEvent{s},
DeleteEventType: &DeleteEvent{s},
@ -63,16 +64,30 @@ type Supervisor struct {
events chan *Event
tasks chan *startTask
workerGroup sync.WaitGroup
subscribers map[subscriber]bool
}
type subscriber chan *Event
// need proper close logic for jobs and stuff so that sending to the channels dont panic
// but can complete jobs
func (s *Supervisor) Close() error {
//TODO: unsubscribe all channels
return s.journal.Close()
}
func (s *Supervisor) Events() (<-chan *Event, error) {
return nil, nil
func (s *Supervisor) Events() subscriber {
return subscriber(make(chan *Event))
}
func (s *Supervisor) Unsubscribe(sub subscriber) {
delete(s.subscribers, sub)
}
func (s *Supervisor) NotifySubscribers(e *Event) {
for sub := range s.subscribers {
sub <- e
}
}
// Start is a non-blocking call that runs the supervisor for monitoring contianer processes and