From c787ecada3842e688fcbb92cb7eb89254e3edda4 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 6 Nov 2015 16:44:52 -0800 Subject: [PATCH] Fix concurrency issues --- containerd/daemon.go | 3 +-- event.go | 8 +++++++ supervisor.go | 52 ++++++++++++++++++++++++++++++++++++-------- 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/containerd/daemon.go b/containerd/daemon.go index 5bd8aed..4a9f029 100644 --- a/containerd/daemon.go +++ b/containerd/daemon.go @@ -69,7 +69,6 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { signals := make(chan os.Signal, bufferSize) signal.Notify(signals) for s := range signals { - logrus.WithField("signal", s).Debug("containerd: received signal") switch s { case syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP: os.Exit(0) @@ -96,7 +95,7 @@ func reap() (exits []*containerd.ExitEvent, err error) { if err == syscall.ECHILD { return exits, nil } - return nil, err + return exits, err } if pid <= 0 { return exits, nil diff --git a/event.go b/event.go index a70bdb2..7c634e2 100644 --- a/event.go +++ b/event.go @@ -27,3 +27,11 @@ type StartContainerEvent struct { func (c *StartContainerEvent) String() string { return "create container" } + +type ContainerStartErrorEvent struct { + ID string +} + +func (c *ContainerStartErrorEvent) String() string { + return "container start error" +} diff --git a/supervisor.go b/supervisor.go index c9b4041..1e9adba 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,6 +2,7 @@ package containerd import ( "os" + "sync" "github.com/Sirupsen/logrus" "github.com/opencontainers/runc/libcontainer" @@ -21,6 +22,11 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { stateDir: stateDir, containers: make(map[string]Container), runtime: runtime, + tasks: make(chan *startTask, concurrency*100), + } + for i := 0; i < concurrency; i++ { + s.workerGroup.Add(1) + go s.startContainerWorker(s.tasks) } return s, nil } @@ -29,12 +35,13 @@ type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string - processes []Process containers map[string]Container runtime Runtime - events chan Event + events chan Event + tasks chan *startTask + workerGroup sync.WaitGroup } // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and @@ -48,7 +55,6 @@ func (s *Supervisor) Start(events chan Event) error { s.events = events go func() { for evt := range events { - logrus.WithField("event", evt).Debug("containerd: processing event") switch e := evt.(type) { case *ExitEvent: logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). @@ -61,8 +67,7 @@ func (s *Supervisor) Start(events chan Event) error { continue } container.SetExited(e.Status) - delete(s.containers, container.ID()) - if err := container.Delete(); err != nil { + if err := s.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } case *StartContainerEvent: @@ -72,17 +77,27 @@ func (s *Supervisor) Start(events chan Event) error { continue } s.containers[e.ID] = container - if err := container.Start(); err != nil { - e.Err <- err - continue + s.tasks <- &startTask{ + err: e.Err, + container: container, + } + case *ContainerStartErrorEvent: + if container, ok := s.containers[e.ID]; ok { + if err := s.deleteContainer(container); err != nil { + logrus.WithField("error", err).Error("containerd: deleting container") + } } - e.Err <- nil } } }() return nil } +func (s *Supervisor) deleteContainer(container Container) error { + delete(s.containers, container.ID()) + return container.Delete() +} + func (s *Supervisor) getContainerForPid(pid int) (Container, error) { for _, container := range s.containers { cpid, err := container.Pid() @@ -104,3 +119,22 @@ func (s *Supervisor) getContainerForPid(pid int) (Container, error) { func (s *Supervisor) SendEvent(evt Event) { s.events <- evt } + +type startTask struct { + container Container + err chan error +} + +func (s *Supervisor) startContainerWorker(tasks chan *startTask) { + defer s.workerGroup.Done() + for t := range tasks { + if err := t.container.Start(); err != nil { + s.SendEvent(&ContainerStartErrorEvent{ + ID: t.container.ID(), + }) + t.err <- err + continue + } + t.err <- nil + } +}