diff --git a/container.go b/container.go index ed37f6a..e06cc60 100644 --- a/container.go +++ b/container.go @@ -2,6 +2,7 @@ package containerd type Container interface { ID() string + Start() error Pid() (int, error) SetExited(status int) Delete() error diff --git a/containerd/daemon.go b/containerd/daemon.go index a3d83d5..5eca592 100644 --- a/containerd/daemon.go +++ b/containerd/daemon.go @@ -43,7 +43,7 @@ var DaemonCommand = cli.Command{ }() go metrics.Log(metrics.DefaultRegistry, 60*time.Second, l) } - if err := daemon(context.String("state-dir"), 20, context.Int("buffer-size")); err != nil { + if err := daemon(context.String("state-dir"), 10, context.Int("buffer-size")); err != nil { logrus.Fatal(err) } }, diff --git a/errors.go b/errors.go index 5f620b0..3632ad2 100644 --- a/errors.go +++ b/errors.go @@ -10,6 +10,7 @@ var ( ErrContainerExists = errors.New("containerd: container already exists") // Internal errors - errShutdown = errors.New("containerd: supervisor is shutdown") - errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") + errShutdown = errors.New("containerd: supervisor is shutdown") + errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path") + errNoContainerForPid = errors.New("containerd: pid not registered for any container") ) diff --git a/event.go b/event.go index 3f495a6..8e0d9dd 100644 --- a/event.go +++ b/event.go @@ -18,15 +18,6 @@ func (e *ExitEvent) String() string { return "exit event" } -type StartedEvent struct { - ID string - Container Container -} - -func (s *StartedEvent) String() string { - return "started event" -} - type CreateContainerEvent struct { ID string BundlePath string diff --git a/jobs.go b/jobs.go deleted file mode 100644 index af445ca..0000000 --- a/jobs.go +++ /dev/null @@ -1,10 +0,0 @@ -package containerd - -type Job interface { -} - -type CreateJob struct { - ID string - BundlePath string - Err chan error -} diff --git a/runtime_linux.go b/runtime_linux.go index 0e6d138..da20d16 100644 --- a/runtime_linux.go +++ b/runtime_linux.go @@ -173,6 +173,10 @@ func (c *libcontainerContainer) Pid() (int, error) { return c.initProcess.Pid() } +func (c *libcontainerContainer) Start() error { + return c.c.Start(c.initProcess) +} + func (c *libcontainerContainer) SetExited(status int) { c.exitStatus = status // meh @@ -222,10 +226,6 @@ func (r *libcontainerRuntime) Create(id, bundlePath string) (Container, error) { c: container, initProcess: process, } - if err := container.Start(process); err != nil { - container.Destroy() - return nil, err - } return c, nil } diff --git a/supervisor.go b/supervisor.go index b43ae9c..5ec57ed 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,10 +2,10 @@ package containerd import ( "os" - "sync" "time" "github.com/Sirupsen/logrus" + "github.com/opencontainers/runc/libcontainer" "github.com/rcrowley/go-metrics" ) @@ -26,14 +26,8 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { } s := &Supervisor{ stateDir: stateDir, - processes: make(map[int]Container), containers: make(map[string]Container), runtime: runtime, - jobs: make(chan Job, 1024), - } - for i := 0; i < concurrency; i++ { - s.workerGroup.Add(1) - go s.worker(i) } return s, nil } @@ -42,15 +36,11 @@ type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string - processes map[int]Container containers map[string]Container runtime Runtime events chan Event - jobs chan Job - - workerGroup sync.WaitGroup } // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and @@ -71,35 +61,56 @@ func (s *Supervisor) Start(events chan Event) error { "pid": e.Pid, "status": e.Status, }).Debug("containerd: process exited") - if container, ok := s.processes[e.Pid]; ok { - container.SetExited(e.Status) - delete(s.processes, e.Pid) - delete(s.containers, container.ID()) - if err := container.Delete(); err != nil { - logrus.WithField("error", err).Error("containerd: deleting container") - } - } - case *StartedEvent: - s.containers[e.ID] = e.Container - pid, err := e.Container.Pid() + container, err := s.getContainerForPid(e.Pid) if err != nil { - logrus.WithField("error", err).Error("containerd: getting container pid") + if err != errNoContainerForPid { + logrus.WithField("error", err).Error("containerd: find container for pid") + } continue } - s.processes[pid] = e.Container - case *CreateContainerEvent: - j := &CreateJob{ - ID: e.ID, - BundlePath: e.BundlePath, - Err: e.Err, + container.SetExited(e.Status) + delete(s.containers, container.ID()) + if err := container.Delete(); err != nil { + logrus.WithField("error", err).Error("containerd: deleting container") } - s.jobs <- j + case *CreateContainerEvent: + start := time.Now() + container, err := s.runtime.Create(e.ID, e.BundlePath) + if err != nil { + e.Err <- err + continue + } + s.containers[e.ID] = container + if err := container.Start(); err != nil { + e.Err <- err + continue + } + e.Err <- nil + containerStartTimer.UpdateSince(start) } } }() return nil } +func (s *Supervisor) getContainerForPid(pid int) (Container, error) { + for _, container := range s.containers { + cpid, err := container.Pid() + if err != nil { + if lerr, ok := err.(libcontainer.Error); ok { + if lerr.Code() == libcontainer.ProcessNotExecuted { + continue + } + } + logrus.WithField("error", err).Error("containerd: get container pid") + } + if pid == cpid { + return container, nil + } + } + return nil, errNoContainerForPid +} + func (s *Supervisor) SendEvent(evt Event) { s.events <- evt } @@ -108,28 +119,3 @@ func (s *Supervisor) SendEvent(evt Event) { func (s *Supervisor) Stop() { } - -func (s *Supervisor) worker(id int) { - defer func() { - s.workerGroup.Done() - logrus.WithField("worker", id).Debug("containerd: worker finished") - }() - logrus.WithField("worker", id).Debug("containerd: starting worker") - for job := range s.jobs { - switch j := job.(type) { - case *CreateJob: - start := time.Now() - container, err := s.runtime.Create(j.ID, j.BundlePath) - if err != nil { - j.Err <- err - continue - } - s.SendEvent(&StartedEvent{ - ID: j.ID, - Container: container, - }) - j.Err <- nil - containerStartTimer.UpdateSince(start) - } - } -}