diff --git a/add_process.go b/add_process.go index 253eab0..862f289 100644 --- a/add_process.go +++ b/add_process.go @@ -9,15 +9,16 @@ type AddProcessEvent struct { // TODO: add this to worker for concurrent starts??? maybe not because of races where the container // could be stopped and removed... func (h *AddProcessEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + ci, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - p, io, err := h.s.runtime.StartProcess(container, *e.Process) + p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process) if err != nil { return err } - if err := h.s.log(container.Path(), io); err != nil { + l, err := h.s.log(ci.container.Path(), io) + if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ "error": err, @@ -27,6 +28,9 @@ func (h *AddProcessEvent) Handle(e *Event) error { if e.Pid, err = p.Pid(); err != nil { return err } - h.s.processes[e.Pid] = container + h.s.processes[e.Pid] = &containerInfo{ + container: ci.container, + logger: l, + } return nil } diff --git a/checkpoint.go b/checkpoint.go index 799b748..3fb7f0d 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -5,11 +5,11 @@ type CreateCheckpointEvent struct { } func (h *CreateCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.Checkpoint(*e.Checkpoint) + return i.container.Checkpoint(*e.Checkpoint) } type DeleteCheckpointEvent struct { @@ -17,9 +17,9 @@ type DeleteCheckpointEvent struct { } func (h *DeleteCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.DeleteCheckpoint(e.Checkpoint.Name) + return i.container.DeleteCheckpoint(e.Checkpoint.Name) } diff --git a/delete.go b/delete.go index 12d2595..075b5d4 100644 --- a/delete.go +++ b/delete.go @@ -10,10 +10,13 @@ type DeleteEvent struct { } func (h *DeleteEvent) Handle(e *Event) error { - if container, ok := h.s.containers[e.ID]; ok { - if err := h.deleteContainer(container); err != nil { + if i, ok := h.s.containers[e.ID]; ok { + if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } + if err := i.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container logger") + } h.s.notifySubscribers(&Event{ Type: ExitEventType, ID: e.ID, diff --git a/exit.go b/exit.go index 6bb758f..a997f7f 100644 --- a/exit.go +++ b/exit.go @@ -10,9 +10,9 @@ func (h *ExitEvent) Handle(e *Event) error { logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") // is it the child process of a container - if container, ok := h.s.processes[e.Pid]; ok { + if info, ok := h.s.processes[e.Pid]; ok { ne := NewEvent(ExecExitEventType) - ne.ID = container.ID() + ne.ID = info.container.ID() ne.Pid = e.Pid ne.Status = e.Status h.s.SendEvent(ne) @@ -42,10 +42,13 @@ type ExecExitEvent struct { func (h *ExecExitEvent) Handle(e *Event) error { // exec process: we remove this process without notifying the main event loop - container := h.s.processes[e.Pid] - if err := container.RemoveProcess(e.Pid); err != nil { + info := h.s.processes[e.Pid] + if err := info.container.RemoveProcess(e.Pid); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } + if err := info.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close process IO") + } delete(h.s.processes, e.Pid) h.s.notifySubscribers(e) return nil diff --git a/get_containers.go b/get_containers.go index 23bd449..f8d898e 100644 --- a/get_containers.go +++ b/get_containers.go @@ -5,8 +5,8 @@ type GetContainersEvent struct { } func (h *GetContainersEvent) Handle(e *Event) error { - for _, c := range h.s.containers { - e.Containers = append(e.Containers, c) + for _, i := range h.s.containers { + e.Containers = append(e.Containers, i.container) } return nil } diff --git a/log.go b/log.go index 1a84871..2dc96e7 100644 --- a/log.go +++ b/log.go @@ -24,6 +24,15 @@ func newLogger(i *logConfig) (*logger, error) { config: i, messages: make(chan *Message, DefaultBufferSize), } + f, err := os.OpenFile( + filepath.Join(l.config.BundlePath, "logs.json"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0655, + ) + if err != nil { + return nil, err + } + l.f = f hout := &logHandler{ stream: "stdout", messages: l.messages, @@ -32,16 +41,14 @@ func newLogger(i *logConfig) (*logger, error) { stream: "stderr", messages: l.messages, } - l.wg.Add(2) go func() { - defer l.wg.Done() io.Copy(hout, i.Stdout) }() go func() { - defer l.wg.Done() io.Copy(herr, i.Stderr) }() - return l, l.start() + l.start() + return l, nil } type Message struct { @@ -71,27 +78,17 @@ func (h *logHandler) Write(b []byte) (int, error) { return len(b), nil } -func (l *logger) start() error { - f, err := os.OpenFile( - filepath.Join(l.config.BundlePath, "logs.json"), - os.O_CREATE|os.O_WRONLY|os.O_APPEND, - 0655, - ) - if err != nil { - return err - } - l.f = f +func (l *logger) start() { l.wg.Add(1) go func() { l.wg.Done() - enc := json.NewEncoder(f) + enc := json.NewEncoder(l.f) for m := range l.messages { if err := enc.Encode(m); err != nil { logrus.WithField("error", err).Error("write log message") } } }() - return nil } func (l *logger) Close() (err error) { diff --git a/signal.go b/signal.go index 5cdd1c4..77abc30 100644 --- a/signal.go +++ b/signal.go @@ -5,11 +5,11 @@ type SignalEvent struct { } func (h *SignalEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - processes, err := container.Processes() + processes, err := i.container.Processes() if err != nil { return err } diff --git a/start.go b/start.go index 4c725ad..f84542a 100644 --- a/start.go +++ b/start.go @@ -10,7 +10,9 @@ func (h *StartEvent) Handle(e *Event) error { return err } h.s.containerGroup.Add(1) - h.s.containers[e.ID] = container + h.s.containers[e.ID] = &containerInfo{ + container: container, + } ContainersCounter.Inc(1) task := &StartTask{ Err: e.Err, diff --git a/supervisor.go b/supervisor.go index b2761df..b73bc02 100644 --- a/supervisor.go +++ b/supervisor.go @@ -29,8 +29,8 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err } s := &Supervisor{ stateDir: stateDir, - containers: make(map[string]runtime.Container), - processes: make(map[int]runtime.Container), + containers: make(map[string]*containerInfo), + processes: make(map[int]*containerInfo), runtime: r, tasks: tasks, events: make(chan *Event, DefaultBufferSize), @@ -54,11 +54,16 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err return s, nil } +type containerInfo struct { + container runtime.Container + logger *logger +} + type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string - containers map[string]runtime.Container - processes map[int]runtime.Container + containers map[string]*containerInfo + processes map[int]*containerInfo handlers map[EventType]Handler runtime runtime.Runtime events chan *Event @@ -78,7 +83,8 @@ func (s *Supervisor) Stop(sig chan os.Signal) { // Close the tasks channel so that no new containers get started close(s.tasks) // send a SIGTERM to all containers - for id, c := range s.containers { + for id, i := range s.containers { + c := i.container logrus.WithField("id", id).Debug("sending TERM to container processes") procs, err := c.Processes() if err != nil { @@ -193,7 +199,8 @@ func (s *Supervisor) Machine() Machine { // getContainerForPid returns the container where the provided pid is the pid1 or main // process in the container func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { - for _, container := range s.containers { + for _, i := range s.containers { + container := i.container cpid, err := container.Pid() if err != nil { if lerr, ok := err.(libcontainer.Error); ok { @@ -215,16 +222,16 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) log(path string, i *runtime.IO) error { +func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) { config := &logConfig{ BundlePath: path, Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, } - // TODO: save logger to call close after its all done - if _, err := newLogger(config); err != nil { - return err + l, err := newLogger(config) + if err != nil { + return nil, err } - return nil + return l, nil } diff --git a/update.go b/update.go index f27cbc3..925715d 100644 --- a/update.go +++ b/update.go @@ -7,10 +7,11 @@ type UpdateEvent struct { } func (h *UpdateEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } + container := i.container if e.State.Status != "" { switch e.State.Status { case runtime.Running: diff --git a/worker.go b/worker.go index c3ca8be..9f14d33 100644 --- a/worker.go +++ b/worker.go @@ -35,13 +35,15 @@ func (w *worker) Start() { for t := range w.s.tasks { started := time.Now() // start logging the container's stdio - if err := w.s.log(t.Container.Path(), t.IO); err != nil { + l, err := w.s.log(t.Container.Path(), t.IO) + if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID() w.s.SendEvent(evt) t.Err <- err continue } + w.s.containers[t.Container.ID()].logger = l if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType)