diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 20ccca6..605e0f3 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -21,6 +21,7 @@ import ( "github.com/docker/containerd/events" "github.com/docker/containerd/execution" "github.com/docker/containerd/execution/executors/oci" + "github.com/docker/containerd/execution/executors/shim" "github.com/docker/containerd/log" metrics "github.com/docker/go-metrics" "github.com/sirupsen/logrus" @@ -103,6 +104,16 @@ high performance container runtime return err } + // Get events publisher + nec, err := getNATSPublisher(context) + if err != nil { + return err + } + defer nec.Close() + ctx := log.WithModule(gocontext.Background(), "containerd") + ctx = log.WithModule(ctx, "execution") + ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + var ( executor execution.Executor runtime = context.GlobalString("runtime") @@ -113,20 +124,20 @@ high performance container runtime if err != nil { return err } + case "shim": + root := filepath.Join(context.GlobalString("root"), "shim") + err = os.Mkdir(root, 0700) + if err != nil && !os.IsExist(err) { + return err + } + executor, err = shim.New(log.WithModule(ctx, "shim"), root, "containerd-shim", "runc", nil) + if err != nil { + return err + } default: return fmt.Errorf("oci: runtime %q not implemented", runtime) } - // Get events publisher - nec, err := getNATSPublisher(context) - if err != nil { - return err - } - defer nec.Close() - - ctx := log.WithModule(gocontext.Background(), "containerd") - ctx = log.WithModule(ctx, "execution") - ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) execService, err := execution.New(ctx, executor) if err != nil { return err diff --git a/execution/container.go b/execution/container.go index c9c2db0..89feb68 100644 --- a/execution/container.go +++ b/execution/container.go @@ -16,12 +16,11 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) { }, nil } -func LoadContainer(dir StateDir, id, bundle string, status Status, initPid int64) *Container { +func LoadContainer(dir StateDir, id, bundle string, status Status) *Container { return &Container{ id: id, stateDir: dir, bundle: bundle, - initPid: initPid, status: status, processes: make(map[string]Process), } diff --git a/execution/error.go b/execution/error.go index 5d06fc5..f7aa271 100644 --- a/execution/error.go +++ b/execution/error.go @@ -3,5 +3,8 @@ package execution import "fmt" var ( - ErrProcessNotFound = fmt.Errorf("process not found") + ErrProcessNotFound = fmt.Errorf("process not found") + ErrProcessNotExited = fmt.Errorf("process has not exited") + ErrContainerNotFound = fmt.Errorf("container not found") + ErrContainerExists = fmt.Errorf("container already exists") ) diff --git a/execution/executor.go b/execution/executor.go index 8ee68f4..c01eafb 100644 --- a/execution/executor.go +++ b/execution/executor.go @@ -28,7 +28,6 @@ type Executor interface { Create(ctx context.Context, id string, o CreateOpts) (*Container, error) Pause(context.Context, *Container) error Resume(context.Context, *Container) error - Status(context.Context, *Container) (Status, error) List(context.Context) ([]*Container, error) Load(ctx context.Context, id string) (*Container, error) Delete(context.Context, *Container) error diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index d35f272..620df49 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -122,7 +122,6 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { runcC.ID, runcC.Bundle, execution.Status(runcC.Status), - int64(runcC.Pid), ) dirs, err := container.StateDir().Processes() diff --git a/execution/executors/shim/process.go b/execution/executors/shim/process.go new file mode 100644 index 0000000..f659a57 --- /dev/null +++ b/execution/executors/shim/process.go @@ -0,0 +1,381 @@ +package shim + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "syscall" + + "github.com/docker/containerd/execution" + "github.com/docker/containerd/log" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + + runc "github.com/crosbymichael/go-runc" + starttime "github.com/opencontainers/runc/libcontainer/system" +) + +type newProcessOpts struct { + shimBinary string + runtime string + runtimeArgs []string + container *execution.Container + exec bool + execution.StartProcessOpts +} + +func newProcess(ctx context.Context, o newProcessOpts) (*process, error) { + procStateDir, err := o.container.StateDir().NewProcess(o.ID) + if err != nil { + return nil, err + } + + exitPipe, controlPipe, err := getControlPipes(procStateDir) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + exitPipe.Close() + controlPipe.Close() + } + }() + + cmd, err := newShim(o, procStateDir) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + cmd.Process.Kill() + cmd.Wait() + } + }() + + abortCh := make(chan syscall.WaitStatus, 1) + go func() { + var shimStatus syscall.WaitStatus + if err := cmd.Wait(); err != nil { + shimStatus = execution.UnknownStatusCode + } else { + shimStatus = cmd.ProcessState.Sys().(syscall.WaitStatus) + } + abortCh <- shimStatus + close(abortCh) + }() + + process := &process{ + root: procStateDir, + id: o.ID, + exitChan: make(chan struct{}), + exitPipe: exitPipe, + controlPipe: controlPipe, + } + + pid, stime, status, err := waitForPid(ctx, abortCh, procStateDir) + if err != nil { + return nil, err + } + process.pid = int64(pid) + process.status = status + process.startTime = stime + + return process, nil +} + +func loadProcess(root, id string) (*process, error) { + pid, err := runc.ReadPidFile(filepath.Join(root, pidFilename)) + if err != nil { + return nil, err + } + + stime, err := ioutil.ReadFile(filepath.Join(root, startTimeFilename)) + if err != nil { + return nil, err + } + + path := filepath.Join(root, exitPipeFilename) + exitPipe, err := os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + exitPipe.Close() + } + }() + + path = filepath.Join(root, controlPipeFilename) + controlPipe, err := os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + controlPipe.Close() + } + }() + + p := &process{ + root: root, + id: id, + pid: int64(pid), + exitChan: make(chan struct{}), + exitPipe: exitPipe, + controlPipe: controlPipe, + startTime: string(stime), + // TODO: status may need to be stored on disk to handle + // Created state for init (i.e. a Start is needed to run the + // container) + status: execution.Running, + } + + markAsStopped := func(p *process) (*process, error) { + p.setStatus(execution.Stopped) + return p, nil + } + + if err = syscall.Kill(pid, 0); err != nil { + if err == syscall.ESRCH { + return markAsStopped(p) + } + return nil, err + } + + cstime, err := starttime.GetProcessStartTime(pid) + if err != nil { + if os.IsNotExist(err) { + return markAsStopped(p) + } + return nil, err + } + + if p.startTime != cstime { + return markAsStopped(p) + } + + return p, nil +} + +type process struct { + root string + id string + pid int64 + exitChan chan struct{} + exitPipe *os.File + controlPipe *os.File + startTime string + status execution.Status + ctx context.Context + mu sync.Mutex +} + +func (p *process) ID() string { + return p.id +} + +func (p *process) Pid() int64 { + return p.pid +} + +func (p *process) Wait() (uint32, error) { + <-p.exitChan + + log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). + Debugf("wait is over") + + // Cleanup those fds + p.exitPipe.Close() + p.controlPipe.Close() + + // If the container process is still alive, it means the shim crashed + // and the child process had updated it PDEATHSIG to something + // else than SIGKILL. Or that epollCtl failed + if p.isAlive() { + err := syscall.Kill(int(p.pid), syscall.SIGKILL) + if err != nil { + return execution.UnknownStatusCode, errors.Wrap(err, "failed to kill process") + } + + return uint32(128 + int(syscall.SIGKILL)), nil + } + + data, err := ioutil.ReadFile(filepath.Join(p.root, exitStatusFilename)) + if err != nil { + return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status") + } + + if len(data) == 0 { + return execution.UnknownStatusCode, errors.New(execution.ErrProcessNotExited.Error()) + } + + status, err := strconv.Atoi(string(data)) + if err != nil { + return execution.UnknownStatusCode, errors.Wrapf(err, "failed to parse exit status") + } + + p.setStatus(execution.Stopped) + return uint32(status), nil +} + +func (p *process) Signal(sig os.Signal) error { + err := syscall.Kill(int(p.pid), sig.(syscall.Signal)) + if err != nil { + return errors.Wrap(err, "failed to signal process") + } + return nil +} + +func (p *process) Status() execution.Status { + p.mu.Lock() + s := p.status + p.mu.Unlock() + return s +} + +func (p *process) setStatus(s execution.Status) { + p.mu.Lock() + p.status = s + p.mu.Unlock() +} + +func (p *process) isAlive() bool { + if err := syscall.Kill(int(p.pid), 0); err != nil { + if err == syscall.ESRCH { + return false + } + log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). + Warnf("kill(0) failed: %v", err) + return false + } + + // check that we have the same startttime + stime, err := starttime.GetProcessStartTime(int(p.pid)) + if err != nil { + if os.IsNotExist(err) { + return false + } + log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). + Warnf("failed to get process start time: %v", err) + return false + } + + if p.startTime != stime { + return false + } + + return true +} + +func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int, stime string, status execution.Status, err error) { + status = execution.Unknown + for { + select { + case <-ctx.Done(): + return + case wait := <-abortCh: + if wait.Signaled() { + err = errors.Errorf("shim died prematurarily: %v", wait.Signal()) + return + } + err = errors.Errorf("shim exited prematurarily with exit code %v", wait.ExitStatus()) + return + default: + } + pid, err = runc.ReadPidFile(filepath.Join(root, pidFilename)) + if err == nil { + break + } else if !os.IsNotExist(err) { + return + } + } + status = execution.Created + stime, err = starttime.GetProcessStartTime(pid) + switch { + case os.IsNotExist(err): + status = execution.Stopped + case err != nil: + return + default: + var b []byte + path := filepath.Join(root, startTimeFilename) + b, err = ioutil.ReadFile(path) + switch { + case os.IsNotExist(err): + err = ioutil.WriteFile(path, []byte(stime), 0600) + if err != nil { + return + } + case err != nil: + err = errors.Wrapf(err, "failed to get start time for pid %d", pid) + return + case string(b) != stime: + status = execution.Stopped + } + } + + return pid, stime, status, nil +} + +func newShim(o newProcessOpts, workDir string) (*exec.Cmd, error) { + cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime) + cmd.Dir = workDir + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + state := processState{ + Process: o.Spec, + Exec: o.exec, + Stdin: o.Stdin, + Stdout: o.Stdout, + Stderr: o.Stderr, + RuntimeArgs: o.runtimeArgs, + NoPivotRoot: false, + CheckpointPath: "", + RootUID: int(o.Spec.User.UID), + RootGID: int(o.Spec.User.GID), + } + + f, err := os.Create(filepath.Join(workDir, "process.json")) + if err != nil { + return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID()) + } + defer f.Close() + + if err := json.NewEncoder(f).Encode(state); err != nil { + return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID()) + } + + if err := cmd.Start(); err != nil { + return nil, errors.Wrapf(err, "failed to start shim for container %s", o.container.ID()) + } + + return cmd, nil +} + +func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) { + path := filepath.Join(root, exitPipeFilename) + if err = unix.Mkfifo(path, 0700); err != nil { + return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim exit fifo") + } + if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil { + return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim exit fifo") + } + + path = filepath.Join(root, controlPipeFilename) + if err = unix.Mkfifo(path, 0700); err != nil { + return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim control fifo") + } + if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil { + return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim control fifo") + } + + return exitPipe, controlPipe, nil +} diff --git a/execution/executors/shim/shim.go b/execution/executors/shim/shim.go new file mode 100644 index 0000000..fccbf30 --- /dev/null +++ b/execution/executors/shim/shim.go @@ -0,0 +1,420 @@ +package shim + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "sync" + "syscall" + + "github.com/docker/containerd/execution" + "github.com/docker/containerd/log" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + DefaultShimBinary = "containerd-shim" + + pidFilename = "pid" + startTimeFilename = "starttime" + exitPipeFilename = "exit" + controlPipeFilename = "control" + initProcessID = "init" + exitStatusFilename = "exitStatus" +) + +func New(ctx context.Context, root, shim, runtime string, runtimeArgs []string) (*ShimRuntime, error) { + fd, err := syscall.EpollCreate1(0) + if err != nil { + return nil, errors.Wrap(err, "epollcreate1 failed") + } + s := &ShimRuntime{ + ctx: ctx, + epollFd: fd, + root: root, + binaryName: shim, + runtime: runtime, + runtimeArgs: runtimeArgs, + exitChannels: make(map[int]*process), + containers: make(map[string]*execution.Container), + } + + s.loadContainers() + + go s.monitor() + + return s, nil +} + +type ShimRuntime struct { + ctx context.Context + + mutex sync.Mutex + exitChannels map[int]*process + containers map[string]*execution.Container + + epollFd int + root string + binaryName string + runtime string + runtimeArgs []string +} + +type ProcessOpts struct { + Bundle string + Terminal bool + Stdin string + Stdout string + Stderr string +} + +type processState struct { + specs.Process + Exec bool `json:"exec"` + Stdin string `json:"containerdStdin"` + Stdout string `json:"containerdStdout"` + Stderr string `json:"containerdStderr"` + RuntimeArgs []string `json:"runtimeArgs"` + NoPivotRoot bool `json:"noPivotRoot"` + CheckpointPath string `json:"checkpoint"` + RootUID int `json:"rootUID"` + RootGID int `json:"rootGID"` +} + +func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (*execution.Container, error) { + log.G(s.ctx).WithFields(logrus.Fields{"container-id": id, "options": o}).Debug("Create()") + + if s.getContainer(id) != nil { + return nil, execution.ErrContainerExists + } + + container, err := execution.NewContainer(s.root, id, o.Bundle) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + container.StateDir().Delete() + } + }() + + err = ioutil.WriteFile(filepath.Join(string(container.StateDir()), "bundle"), []byte(o.Bundle), 0600) + if err != nil { + return nil, errors.Wrap(err, "failed to save bundle path to disk") + } + + // extract Process spec from bundle's config.json + var spec specs.Spec + f, err := os.Open(filepath.Join(o.Bundle, "config.json")) + if err != nil { + return nil, errors.Wrap(err, "failed to open config.json") + } + defer f.Close() + if err := json.NewDecoder(f).Decode(&spec); err != nil { + return nil, errors.Wrap(err, "failed to decode container OCI specs") + } + + processOpts := newProcessOpts{ + shimBinary: s.binaryName, + runtime: s.runtime, + runtimeArgs: s.runtimeArgs, + container: container, + exec: false, + StartProcessOpts: execution.StartProcessOpts{ + ID: initProcessID, + Spec: spec.Process, + Console: o.Console, + Stdin: o.Stdin, + Stdout: o.Stdout, + Stderr: o.Stderr, + }, + } + + process, err := newProcess(ctx, processOpts) + if err != nil { + return nil, err + } + process.ctx = log.WithModule(log.WithModule(s.ctx, "container"), id) + + s.monitorProcess(process) + container.AddProcess(process, true) + + s.addContainer(container) + + return container, nil +} + +func (s *ShimRuntime) Start(ctx context.Context, c *execution.Container) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Start()") + + cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "start", c.ID())...) + out, err := cmd.CombinedOutput() + if err != nil { + return errors.Wrapf(err, "'%s start' failed with output: %v", s.runtime, string(out)) + } + return nil +} + +func (s *ShimRuntime) List(ctx context.Context) ([]*execution.Container, error) { + log.G(s.ctx).Debug("List()") + + containers := make([]*execution.Container, 0) + s.mutex.Lock() + for _, c := range s.containers { + containers = append(containers, c) + } + s.mutex.Unlock() + + return containers, nil +} + +func (s *ShimRuntime) Load(ctx context.Context, id string) (*execution.Container, error) { + log.G(s.ctx).WithFields(logrus.Fields{"container-id": id}).Debug("Start()") + + s.mutex.Lock() + c, ok := s.containers[id] + s.mutex.Unlock() + + if !ok { + return nil, errors.New(execution.ErrContainerNotFound.Error()) + } + + return c, nil +} + +func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Delete()") + + if c.Status() != execution.Stopped { + return errors.Errorf("cannot delete a container in the '%s' state", c.Status()) + } + + c.StateDir().Delete() + s.removeContainer(c) + return nil +} + +func (s *ShimRuntime) Pause(ctx context.Context, c *execution.Container) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Pause()") + + cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "pause", c.ID())...) + out, err := cmd.CombinedOutput() + if err != nil { + return errors.Wrapf(err, "'%s pause' failed with output: %v", s.runtime, string(out)) + } + return nil +} + +func (s *ShimRuntime) Resume(ctx context.Context, c *execution.Container) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Resume()") + + cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "resume", c.ID())...) + out, err := cmd.CombinedOutput() + if err != nil { + return errors.Wrapf(err, "'%s resume' failed with output: %v", s.runtime, string(out)) + } + return nil +} + +func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) { + log.G(s.ctx).WithFields(logrus.Fields{"container": c, "options": o}).Debug("StartProcess()") + + processOpts := newProcessOpts{ + shimBinary: s.binaryName, + runtime: s.runtime, + runtimeArgs: s.runtimeArgs, + container: c, + exec: true, + StartProcessOpts: o, + } + process, err := newProcess(ctx, processOpts) + if err != nil { + return nil, err + } + + process.status = execution.Running + s.monitorProcess(process) + + c.AddProcess(process, false) + return process, nil +} + +func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id, "signal": sig}). + Debug("SignalProcess()") + + process := c.GetProcess(id) + if process == nil { + return errors.Errorf("no such process %s", id) + } + err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal)) + if err != nil { + return errors.Wrapf(err, "failed to send %v signal to process %v", sig, process.Pid()) + } + return err +} + +func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error { + log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}). + Debug("DeleteProcess()") + + c.RemoveProcess(id) + return c.StateDir().DeleteProcess(id) +} + +// +// +// + +func (s *ShimRuntime) monitor() { + var events [128]syscall.EpollEvent + for { + n, err := syscall.EpollWait(s.epollFd, events[:], -1) + if err != nil { + if err == syscall.EINTR { + continue + } + log.G(s.ctx).Error("epollwait failed:", err) + } + + for i := 0; i < n; i++ { + fd := int(events[i].Fd) + + s.mutex.Lock() + p := s.exitChannels[fd] + delete(s.exitChannels, fd) + s.mutex.Unlock() + + if err = syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ + Events: syscall.EPOLLHUP, + Fd: int32(fd), + }); err != nil { + log.G(s.ctx).Error("epollctl deletion failed:", err) + } + + close(p.exitChan) + } + } +} + +func (s *ShimRuntime) addContainer(c *execution.Container) { + s.mutex.Lock() + s.containers[c.ID()] = c + s.mutex.Unlock() +} + +func (s *ShimRuntime) removeContainer(c *execution.Container) { + s.mutex.Lock() + delete(s.containers, c.ID()) + s.mutex.Unlock() +} + +func (s *ShimRuntime) getContainer(id string) *execution.Container { + s.mutex.Lock() + c := s.containers[id] + s.mutex.Unlock() + + return c +} + +// monitorProcess adds a process to the list of monitored process if +// we fail to do so, we closed the exitChan channel used by Wait(). +// Since service always call on Wait() for generating "exit" events, +// this will ensure the process gets killed +func (s *ShimRuntime) monitorProcess(p *process) { + if p.status == execution.Stopped { + close(p.exitChan) + return + } + + fd := int(p.exitPipe.Fd()) + event := syscall.EpollEvent{ + Fd: int32(fd), + Events: syscall.EPOLLHUP, + } + s.mutex.Lock() + s.exitChannels[fd] = p + s.mutex.Unlock() + if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { + s.mutex.Lock() + delete(s.exitChannels, fd) + s.mutex.Unlock() + close(p.exitChan) + return + } + + // TODO: take care of the OOM handler +} + +func (s *ShimRuntime) unmonitorProcess(p *process) { + s.mutex.Lock() + for fd, proc := range s.exitChannels { + if proc == p { + delete(s.exitChannels, fd) + break + } + } + s.mutex.Unlock() +} + +func (s *ShimRuntime) loadContainers() { + cs, err := ioutil.ReadDir(s.root) + if err != nil { + log.G(s.ctx).WithField("statedir", s.root). + Warn("failed to load containers, state dir cannot be listed:", err) + return + } + + for _, c := range cs { + if !c.IsDir() { + continue + } + + stateDir, err := execution.LoadStateDir(s.root, c.Name()) + if err != nil { + // We should never fail the above call unless someone + // delete the directory while we're loading + log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "statedir": s.root}). + Warn("failed to load container statedir:", err) + continue + } + bundle, err := ioutil.ReadFile(filepath.Join(string(stateDir), "bundle")) + if err != nil { + log.G(s.ctx).WithField("container", c.Name()). + Warn("failed to load container bundle path:", err) + continue + } + + container := execution.LoadContainer(stateDir, c.Name(), string(bundle), execution.Unknown) + s.addContainer(container) + + processDirs, err := stateDir.Processes() + if err != nil { + log.G(s.ctx).WithField("container", c.Name()). + Warn("failed to retrieve container processes:", err) + continue + } + + for _, procStateRoot := range processDirs { + id := filepath.Base(procStateRoot) + proc, err := loadProcess(procStateRoot, id) + if err != nil { + log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "process": id}). + Warn("failed to load process:", err) + s.removeContainer(container) + for _, p := range container.Processes() { + s.unmonitorProcess(p.(*process)) + } + break + } + proc.ctx = log.WithModule(log.WithModule(s.ctx, "container"), container.ID()) + container.AddProcess(proc, proc.ID() == initProcessID) + s.monitorProcess(proc) + } + } +} diff --git a/execution/statedir.go b/execution/statedir.go index 9856008..9a7fe2a 100644 --- a/execution/statedir.go +++ b/execution/statedir.go @@ -4,32 +4,46 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/pkg/errors" ) const processesDirName = "processes" type StateDir string +func LoadStateDir(root, id string) (StateDir, error) { + path := filepath.Join(root, id) + if _, err := os.Stat(path); err != nil { + return "", errors.Wrap(err, "could not find container statedir") + } + return StateDir(path), nil +} + func NewStateDir(root, id string) (StateDir, error) { path := filepath.Join(root, id) if err := os.Mkdir(path, 0700); err != nil { - return "", err + return "", errors.Wrap(err, "could not create container statedir") } if err := os.Mkdir(StateDir(path).processesDir(), 0700); err != nil { os.RemoveAll(path) - return "", err + return "", errors.Wrap(err, "could not create processes statedir") } return StateDir(path), nil } func (s StateDir) Delete() error { - return os.RemoveAll(string(s)) + err := os.RemoveAll(string(s)) + if err != nil { + return errors.Wrapf(err, "failed to remove statedir %s", string(s)) + } + return nil } func (s StateDir) NewProcess(id string) (dir string, err error) { dir = filepath.Join(s.processesDir(), id) if err = os.Mkdir(dir, 0700); err != nil { - return "", err + return "", errors.Wrap(err, "could not create process statedir") } return dir, nil @@ -40,14 +54,18 @@ func (s StateDir) ProcessDir(id string) string { } func (s StateDir) DeleteProcess(id string) error { - return os.RemoveAll(filepath.Join(s.processesDir(), id)) + err := os.RemoveAll(filepath.Join(s.processesDir(), id)) + if err != nil { + return errors.Wrapf(err, "failed to remove process %d statedir", id) + } + return nil } func (s StateDir) Processes() ([]string, error) { procsDir := s.processesDir() dirs, err := ioutil.ReadDir(procsDir) if err != nil { - return nil, err + return nil, errors.Wrap(err, "could not list processes statedir") } paths := make([]string, 0) diff --git a/execution/status.go b/execution/status.go index f3bc83b..74c9262 100644 --- a/execution/status.go +++ b/execution/status.go @@ -8,6 +8,7 @@ const ( Running Status = "running" Stopped Status = "stopped" Deleted Status = "deleted" + Unknown Status = "unknown" UnknownStatusCode = 255 )