diff --git a/add_process.go b/add_process.go new file mode 100644 index 0000000..85573be --- /dev/null +++ b/add_process.go @@ -0,0 +1,21 @@ +package containerd + +type AddProcessEvent struct { + s *Supervisor +} + +func (h *AddProcessEvent) Handle(e *Event) error { + container, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + p, err := h.s.runtime.StartProcess(container, *e.Process, e.Stdio) + if err != nil { + return err + } + if e.Pid, err = p.Pid(); err != nil { + return err + } + h.s.processes[e.Pid] = container + return nil +} diff --git a/delete.go b/delete.go new file mode 100644 index 0000000..9312549 --- /dev/null +++ b/delete.go @@ -0,0 +1,22 @@ +package containerd + +import "github.com/Sirupsen/logrus" + +type DeleteEvent struct { + s *Supervisor +} + +func (h *DeleteEvent) Handle(e *Event) error { + if container, ok := h.s.containers[e.ID]; ok { + if err := h.deleteContainer(container); err != nil { + logrus.WithField("error", err).Error("containerd: deleting container") + } + ContainersCounter.Dec(1) + } + return nil +} + +func (h *DeleteEvent) deleteContainer(container Container) error { + delete(h.s.containers, container.ID()) + return container.Delete() +} diff --git a/errors.go b/errors.go index 65bbaf5..d1cf2d0 100644 --- a/errors.go +++ b/errors.go @@ -10,6 +10,7 @@ var ( ErrContainerExists = errors.New("containerd: container already exists") ErrProcessNotFound = errors.New("containerd: processs not found for container") ErrUnknownContainerStatus = errors.New("containerd: unknown container status ") + ErrUnknownEvent = errors.New("containerd: unknown event type") // Internal errors errShutdown = errors.New("containerd: supervisor is shutdown") @@ -17,4 +18,9 @@ var ( errNoContainerForPid = errors.New("containerd: pid not registered for any container") errInvalidContainerType = errors.New("containerd: invalid container type for runtime") errNotChildProcess = errors.New("containerd: not a child process for container") + // internal error where the handler will defer to another for the final response + // + // TODO: we could probably do a typed error with another error channel for this to make it + // less like magic + errDeferedResponse = errors.New("containerd: defered response") ) diff --git a/event.go b/event.go index af1949f..93a2484 100644 --- a/event.go +++ b/event.go @@ -46,3 +46,7 @@ type Event struct { Containers []Container `json:"-"` Err chan error `json:"-"` } + +type Handler interface { + Handle(*Event) error +} diff --git a/exit.go b/exit.go new file mode 100644 index 0000000..b0a2a7c --- /dev/null +++ b/exit.go @@ -0,0 +1,33 @@ +package containerd + +import "github.com/Sirupsen/logrus" + +type ExitEvent struct { + s *Supervisor +} + +func (h *ExitEvent) Handle(e *Event) error { + logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). + Debug("containerd: process exited") + // is it the child process of a container + if container, ok := h.s.processes[e.Pid]; ok { + if err := container.RemoveProcess(e.Pid); err != nil { + logrus.WithField("error", err).Error("containerd: find container for pid") + } + delete(h.s.processes, e.Pid) + return nil + } + // is it the main container's process + container, err := h.s.getContainerForPid(e.Pid) + if err != nil { + if err != errNoContainerForPid { + logrus.WithField("error", err).Error("containerd: find container for pid") + } + return nil + } + container.SetExited(e.Status) + ne := NewEvent(DeleteEventType) + ne.ID = container.ID() + h.s.SendEvent(ne) + return nil +} diff --git a/get_containers.go b/get_containers.go new file mode 100644 index 0000000..23bd449 --- /dev/null +++ b/get_containers.go @@ -0,0 +1,12 @@ +package containerd + +type GetContainersEvent struct { + s *Supervisor +} + +func (h *GetContainersEvent) Handle(e *Event) error { + for _, c := range h.s.containers { + e.Containers = append(e.Containers, c) + } + return nil +} diff --git a/signal.go b/signal.go new file mode 100644 index 0000000..5cdd1c4 --- /dev/null +++ b/signal.go @@ -0,0 +1,22 @@ +package containerd + +type SignalEvent struct { + s *Supervisor +} + +func (h *SignalEvent) Handle(e *Event) error { + container, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + processes, err := container.Processes() + if err != nil { + return err + } + for _, p := range processes { + if pid, err := p.Pid(); err == nil && pid == e.Pid { + return p.Signal(e.Signal) + } + } + return ErrProcessNotFound +} diff --git a/start.go b/start.go new file mode 100644 index 0000000..894927b --- /dev/null +++ b/start.go @@ -0,0 +1,19 @@ +package containerd + +type StartEvent struct { + s *Supervisor +} + +func (h *StartEvent) Handle(e *Event) error { + container, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Stdio) + if err != nil { + return err + } + h.s.containers[e.ID] = container + ContainersCounter.Inc(1) + h.s.tasks <- &startTask{ + err: e.Err, + container: container, + } + return errDeferedResponse +} diff --git a/stats.go b/stats.go index 249fab2..41ae893 100644 --- a/stats.go +++ b/stats.go @@ -5,11 +5,13 @@ import "github.com/rcrowley/go-metrics" var ( ContainerStartTimer = metrics.NewTimer() ContainersCounter = metrics.NewCounter() + EventsCounter = metrics.NewCounter() ) func Metrics() map[string]interface{} { return map[string]interface{}{ "container-start-time": ContainerStartTimer, "containers": ContainersCounter, + "events": EventsCounter, } } diff --git a/supervisor.go b/supervisor.go index 5771712..4599792 100644 --- a/supervisor.go +++ b/supervisor.go @@ -33,6 +33,17 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { tasks: make(chan *startTask, concurrency*100), journal: j, } + // register default event handlers + s.handlers = map[EventType]Handler{ + ExitEventType: &ExitEvent{s}, + StartContainerEventType: &StartEvent{s}, + DeleteEventType: &DeleteEvent{s}, + GetContainerEventType: &GetContainersEvent{s}, + SignalEventType: &SignalEvent{s}, + AddProcessEventType: &AddProcessEvent{s}, + UpdateContainerEventType: &UpdateEvent{s}, + } + // start the container workers for concurrent container starts for i := 0; i < concurrency; i++ { s.workerGroup.Add(1) go s.startContainerWorker(s.tasks) @@ -42,16 +53,12 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. - stateDir string - - containers map[string]Container - - processes map[int]Container - - runtime Runtime - - journal *journal - + stateDir string + containers map[string]Container + processes map[int]Container + handlers map[EventType]Handler + runtime Runtime + journal *journal events chan *Event tasks chan *startTask workerGroup sync.WaitGroup @@ -82,113 +89,17 @@ func (s *Supervisor) Start(events chan *Event) error { runtime.LockOSThread() for e := range events { s.journal.write(e) - switch e.Type { - case ExitEventType: - logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). - Debug("containerd: process exited") - // is it the child process of a container - if container, ok := s.processes[e.Pid]; ok { - if err := container.RemoveProcess(e.Pid); err != nil { - logrus.WithField("error", err).Error("containerd: find container for pid") - } - delete(s.processes, e.Pid) + h, ok := s.handlers[e.Type] + if !ok { + e.Err <- ErrUnknownEvent + continue + } + if err := h.Handle(e); err != nil { + if err != errDeferedResponse { + e.Err <- err close(e.Err) - continue - } - // is it the main container's process - container, err := s.getContainerForPid(e.Pid) - if err != nil { - if err != errNoContainerForPid { - logrus.WithField("error", err).Error("containerd: find container for pid") - } - continue - } - container.SetExited(e.Status) - ne := NewEvent(DeleteEventType) - ne.ID = container.ID() - s.SendEvent(ne) - case StartContainerEventType: - container, err := s.runtime.Create(e.ID, e.BundlePath, e.Stdio) - if err != nil { - e.Err <- err - continue - } - s.containers[e.ID] = container - ContainersCounter.Inc(1) - s.tasks <- &startTask{ - err: e.Err, - container: container, } continue - case DeleteEventType: - if container, ok := s.containers[e.ID]; ok { - if err := s.deleteContainer(container); err != nil { - logrus.WithField("error", err).Error("containerd: deleting container") - } - ContainersCounter.Dec(1) - } - case GetContainerEventType: - for _, c := range s.containers { - e.Containers = append(e.Containers, c) - } - case SignalEventType: - container, ok := s.containers[e.ID] - if !ok { - e.Err <- ErrContainerNotFound - continue - } - processes, err := container.Processes() - if err != nil { - e.Err <- err - continue - } - for _, p := range processes { - if pid, err := p.Pid(); err == nil && pid == e.Pid { - e.Err <- p.Signal(e.Signal) - continue - } - } - e.Err <- ErrProcessNotFound - continue - case AddProcessEventType: - container, ok := s.containers[e.ID] - if !ok { - e.Err <- ErrContainerNotFound - continue - } - p, err := s.runtime.StartProcess(container, *e.Process, e.Stdio) - if err != nil { - e.Err <- err - continue - } - if e.Pid, err = p.Pid(); err != nil { - e.Err <- err - continue - } - s.processes[e.Pid] = container - case UpdateContainerEventType: - container, ok := s.containers[e.ID] - if !ok { - e.Err <- ErrContainerNotFound - continue - } - if e.State.Status != "" { - switch e.State.Status { - case Running: - if err := container.Resume(); err != nil { - e.Err <- ErrUnknownContainerStatus - continue - } - case Paused: - if err := container.Pause(); err != nil { - e.Err <- ErrUnknownContainerStatus - continue - } - default: - e.Err <- ErrUnknownContainerStatus - continue - } - } } close(e.Err) } @@ -196,11 +107,6 @@ func (s *Supervisor) Start(events chan *Event) error { return nil } -func (s *Supervisor) deleteContainer(container Container) error { - delete(s.containers, container.ID()) - return container.Delete() -} - func (s *Supervisor) getContainerForPid(pid int) (Container, error) { for _, container := range s.containers { cpid, err := container.Pid() @@ -220,6 +126,7 @@ func (s *Supervisor) getContainerForPid(pid int) (Container, error) { } func (s *Supervisor) SendEvent(evt *Event) { + EventsCounter.Inc(1) s.events <- evt } diff --git a/update.go b/update.go new file mode 100644 index 0000000..f61bf91 --- /dev/null +++ b/update.go @@ -0,0 +1,27 @@ +package containerd + +type UpdateEvent struct { + s *Supervisor +} + +func (h *UpdateEvent) Handle(e *Event) error { + container, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + if e.State.Status != "" { + switch e.State.Status { + case Running: + if err := container.Resume(); err != nil { + return ErrUnknownContainerStatus + } + case Paused: + if err := container.Pause(); err != nil { + return ErrUnknownContainerStatus + } + default: + return ErrUnknownContainerStatus + } + } + return nil +}