From 8fbdf1c0d7dbcebe0f3122b4bf35da16eb6adabc Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Mon, 6 Feb 2017 08:38:11 -0800 Subject: [PATCH] Remove execution/executors Signed-off-by: Kenfe-Mickael Laventure --- execution/executors/shim/process.go | 429 --------------------------- execution/executors/shim/shim.go | 431 ---------------------------- execution/supervisor.go | 12 - 3 files changed, 872 deletions(-) delete mode 100644 execution/executors/shim/process.go delete mode 100644 execution/executors/shim/shim.go delete mode 100644 execution/supervisor.go diff --git a/execution/executors/shim/process.go b/execution/executors/shim/process.go deleted file mode 100644 index d777735..0000000 --- a/execution/executors/shim/process.go +++ /dev/null @@ -1,429 +0,0 @@ -package shim - -import ( - "context" - "encoding/json" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strconv" - "sync" - "syscall" - - "github.com/Sirupsen/logrus" - "github.com/docker/containerd/execution" - "github.com/docker/containerd/log" - "github.com/pkg/errors" - "golang.org/x/sys/unix" - - runc "github.com/crosbymichael/go-runc" - starttime "github.com/opencontainers/runc/libcontainer/system" -) - -type newProcessOpts struct { - shimBinary string - runtime string - runtimeArgs []string - container *execution.Container - exec bool - stateDir string - execution.StartProcessOpts -} - -func validateNewProcessOpts(o newProcessOpts) error { - if o.shimBinary == "" { - return errors.New("shim binary not specified") - } - if o.runtime == "" { - return errors.New("runtime not specified") - } - if o.container == nil { - return errors.New("container not specified") - } - if o.container.ID() == "" { - return errors.New("container id not specified") - } - if o.container.Bundle() == "" { - return errors.New("bundle not specified") - } - if o.stateDir == "" { - return errors.New("state dir not specified") - } - return nil -} - -func newProcess(ctx context.Context, o newProcessOpts) (p *process, err error) { - if err = validateNewProcessOpts(o); err != nil { - return - } - p = &process{ - id: o.ID, - stateDir: o.stateDir, - exitChan: make(chan struct{}), - ctx: ctx, - } - defer func() { - if err != nil { - p.cleanup() - p = nil - } - }() - - if err = os.Mkdir(o.stateDir, 0700); err != nil { - err = errors.Wrap(err, "failed to create process state dir") - return - } - - p.exitPipe, p.controlPipe, err = getControlPipes(o.stateDir) - if err != nil { - return - } - - cmd, err := newShimProcess(o) - if err != nil { - return - } - defer func() { - if err != nil { - cmd.Process.Kill() - cmd.Wait() - } - }() - - abortCh := make(chan syscall.WaitStatus, 1) - go func() { - var shimStatus syscall.WaitStatus - if err := cmd.Wait(); err != nil { - shimStatus = execution.UnknownStatusCode - } else { - shimStatus = cmd.ProcessState.Sys().(syscall.WaitStatus) - } - abortCh <- shimStatus - close(abortCh) - }() - - p.pid, p.startTime, p.status, err = waitUntilReady(ctx, abortCh, o.stateDir) - if err != nil { - return - } - - return -} - -func loadProcess(ctx context.Context, stateDir, id string) (p *process, err error) { - p = &process{ - id: id, - stateDir: stateDir, - exitChan: make(chan struct{}), - status: execution.Running, - ctx: ctx, - } - defer func() { - if err != nil { - p.cleanup() - p = nil - } - }() - - p.pid, err = getPidFromFile(filepath.Join(stateDir, pidFilename)) - if err != nil { - err = errors.Wrap(err, "failed to read pid") - return - } - - p.startTime, err = getStartTimeFromFile(filepath.Join(stateDir, startTimeFilename)) - if err != nil { - return - } - - path := filepath.Join(stateDir, exitPipeFilename) - p.exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) - if err != nil { - err = errors.Wrapf(err, "failed to open exit pipe") - return - } - - path = filepath.Join(stateDir, controlPipeFilename) - p.controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) - if err != nil { - err = errors.Wrapf(err, "failed to open control pipe") - return - } - - markAsStopped := func(p *process) (*process, error) { - p.setStatus(execution.Stopped) - return p, nil - } - - if err = syscall.Kill(int(p.pid), 0); err != nil { - if err == syscall.ESRCH { - return markAsStopped(p) - } - err = errors.Wrapf(err, "failed to check if process is still alive") - return - } - - cstime, err := starttime.GetProcessStartTime(int(p.pid)) - if err != nil { - if os.IsNotExist(err) { - return markAsStopped(p) - } - err = errors.Wrapf(err, "failed retrieve current process start time") - return - } - - if p.startTime != cstime { - return markAsStopped(p) - } - - return -} - -type process struct { - stateDir string - id string - pid int64 - exitChan chan struct{} - exitPipe *os.File - controlPipe *os.File - startTime string - status execution.Status - ctx context.Context - mu sync.Mutex -} - -func (p *process) ID() string { - return p.id -} - -func (p *process) Pid() int64 { - return p.pid -} - -func (p *process) Wait() (uint32, error) { - <-p.exitChan - - log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). - Debugf("wait is over") - - // Cleanup those fds - p.exitPipe.Close() - p.controlPipe.Close() - - // If the container process is still alive, it means the shim crashed - // and the child process had updated it PDEATHSIG to something - // else than SIGKILL. Or that epollCtl failed - if p.isAlive() { - err := syscall.Kill(int(p.pid), syscall.SIGKILL) - if err != nil { - return execution.UnknownStatusCode, errors.Wrap(err, "failed to kill process") - } - - return uint32(128 + int(syscall.SIGKILL)), nil - } - - data, err := ioutil.ReadFile(filepath.Join(p.stateDir, exitStatusFilename)) - if err != nil { - return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status") - } - - if len(data) == 0 { - return execution.UnknownStatusCode, errors.New(execution.ErrProcessNotExited.Error()) - } - - status, err := strconv.Atoi(string(data)) - if err != nil { - return execution.UnknownStatusCode, errors.Wrapf(err, "failed to parse exit status") - } - - p.setStatus(execution.Stopped) - return uint32(status), nil -} - -func (p *process) Signal(sig os.Signal) error { - err := syscall.Kill(int(p.pid), sig.(syscall.Signal)) - if err != nil { - return errors.Wrap(err, "failed to signal process") - } - return nil -} - -func (p *process) Status() execution.Status { - p.mu.Lock() - s := p.status - p.mu.Unlock() - return s -} - -func (p *process) setStatus(s execution.Status) { - p.mu.Lock() - p.status = s - p.mu.Unlock() -} - -func (p *process) isAlive() bool { - if err := syscall.Kill(int(p.pid), 0); err != nil { - if err == syscall.ESRCH { - return false - } - log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). - Warnf("kill(0) failed: %v", err) - return false - } - - // check that we have the same starttime - stime, err := starttime.GetProcessStartTime(int(p.pid)) - if err != nil { - if os.IsNotExist(err) { - return false - } - log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}). - Warnf("failed to get process start time: %v", err) - return false - } - - if p.startTime != stime { - return false - } - - return true -} - -func (p *process) cleanup() { - for _, f := range []*os.File{p.exitPipe, p.controlPipe} { - if f != nil { - f.Close() - } - } - - if err := os.RemoveAll(p.stateDir); err != nil { - log.G(p.ctx).Warnf("failed to remove process state dir: %v", err) - } -} - -func waitUntilReady(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int64, stime string, status execution.Status, err error) { - status = execution.Unknown - for { - select { - case <-ctx.Done(): - return - case wait := <-abortCh: - if wait.Signaled() { - err = errors.Errorf("shim died prematurely: %v", wait.Signal()) - return - } - err = errors.Errorf("shim exited prematurely with exit code %v", wait.ExitStatus()) - return - default: - } - pid, err = getPidFromFile(filepath.Join(root, pidFilename)) - if err == nil { - break - } else if !os.IsNotExist(err) { - return - } - } - status = execution.Created - stime, err = starttime.GetProcessStartTime(int(pid)) - switch { - case os.IsNotExist(err): - status = execution.Stopped - case err != nil: - return - default: - var b []byte - path := filepath.Join(root, startTimeFilename) - b, err = ioutil.ReadFile(path) - switch { - case os.IsNotExist(err): - err = ioutil.WriteFile(path, []byte(stime), 0600) - if err != nil { - return - } - case err != nil: - err = errors.Wrapf(err, "failed to get start time for pid %d", pid) - return - case string(b) != stime: - status = execution.Stopped - } - } - - return pid, stime, status, nil -} - -func newShimProcess(o newProcessOpts) (*exec.Cmd, error) { - cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime) - cmd.Dir = o.stateDir - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - - state := processState{ - Process: o.Spec, - Exec: o.exec, - Stdin: o.Stdin, - Stdout: o.Stdout, - Stderr: o.Stderr, - RuntimeArgs: o.runtimeArgs, - NoPivotRoot: false, - CheckpointPath: "", - RootUID: int(o.Spec.User.UID), - RootGID: int(o.Spec.User.GID), - } - - f, err := os.Create(filepath.Join(o.stateDir, "process.json")) - if err != nil { - return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID()) - } - defer f.Close() - if err := json.NewEncoder(f).Encode(state); err != nil { - return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID()) - } - - if err := cmd.Start(); err != nil { - return nil, errors.Wrapf(err, "failed to start shim for container %s", o.container.ID()) - } - - return cmd, nil -} - -func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) { - path := filepath.Join(root, exitPipeFilename) - if err = unix.Mkfifo(path, 0700); err != nil { - err = errors.Wrap(err, "failed to create shim exit fifo") - return - } - if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil { - err = errors.Wrap(err, "failed to open shim exit fifo") - return - } - - path = filepath.Join(root, controlPipeFilename) - if err = unix.Mkfifo(path, 0700); err != nil { - err = errors.Wrap(err, "failed to create shim control fifo") - return - } - if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil { - err = errors.Wrap(err, "failed to open shim control fifo") - return - } - - return -} - -func getPidFromFile(path string) (int64, error) { - pid, err := runc.ReadPidFile(path) - if err != nil { - return -1, err - } - return int64(pid), nil -} - -func getStartTimeFromFile(path string) (string, error) { - stime, err := ioutil.ReadFile(path) - if err != nil { - return "", errors.Wrapf(err, "failed to read start time") - } - return string(stime), nil -} diff --git a/execution/executors/shim/shim.go b/execution/executors/shim/shim.go deleted file mode 100644 index 8bc060e..0000000 --- a/execution/executors/shim/shim.go +++ /dev/null @@ -1,431 +0,0 @@ -package shim - -import ( - "bytes" - "context" - "encoding/json" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "sync" - "syscall" - - "github.com/Sirupsen/logrus" - "github.com/docker/containerd/execution" - "github.com/docker/containerd/log" - "github.com/opencontainers/runtime-spec/specs-go" - "github.com/pkg/errors" -) - -const ( - DefaultShimBinary = "containerd-shim" - - pidFilename = "pid" - startTimeFilename = "starttime" - exitPipeFilename = "exit" - controlPipeFilename = "control" - exitStatusFilename = "exitStatus" -) - -func New(ctx context.Context, root, shim, runtime string, runtimeArgs []string) (*ShimRuntime, error) { - fd, err := syscall.EpollCreate1(0) - if err != nil { - return nil, errors.Wrap(err, "epollcreate1 failed") - } - s := &ShimRuntime{ - ctx: ctx, - epollFd: fd, - root: root, - binaryName: shim, - runtime: runtime, - runtimeArgs: runtimeArgs, - exitChannels: make(map[int]*process), - containers: make(map[string]*execution.Container), - } - - s.loadContainers() - - go s.monitor() - - return s, nil -} - -type ShimRuntime struct { - ctx context.Context - - mutex sync.Mutex - exitChannels map[int]*process - containers map[string]*execution.Container - - epollFd int - root string - binaryName string - runtime string - runtimeArgs []string -} - -type ProcessOpts struct { - Bundle string - Terminal bool - Stdin string - Stdout string - Stderr string -} - -type processState struct { - specs.Process - Exec bool `json:"exec"` - Stdin string `json:"containerdStdin"` - Stdout string `json:"containerdStdout"` - Stderr string `json:"containerdStderr"` - RuntimeArgs []string `json:"runtimeArgs"` - NoPivotRoot bool `json:"noPivotRoot"` - CheckpointPath string `json:"checkpoint"` - RootUID int `json:"rootUID"` - RootGID int `json:"rootGID"` -} - -func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (*execution.Container, error) { - log.G(s.ctx).WithFields(logrus.Fields{"container-id": id, "options": o}).Debug("Create()") - - if s.getContainer(id) != nil { - return nil, execution.ErrContainerExists - } - - containerCtx := log.WithModule(log.WithModule(ctx, "container"), id) - container, err := execution.NewContainer(containerCtx, filepath.Join(s.root, id), id, o.Bundle) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - container.Cleanup() - } - }() - - // extract Process spec from bundle's config.json - var spec specs.Spec - f, err := os.Open(filepath.Join(o.Bundle, "config.json")) - if err != nil { - return nil, errors.Wrap(err, "failed to open config.json") - } - defer f.Close() - if err := json.NewDecoder(f).Decode(&spec); err != nil { - return nil, errors.Wrap(err, "failed to decode container OCI specs") - } - - processOpts := newProcessOpts{ - shimBinary: s.binaryName, - runtime: s.runtime, - runtimeArgs: s.runtimeArgs, - container: container, - exec: false, - stateDir: container.ProcessStateDir(execution.InitProcessID), - StartProcessOpts: execution.StartProcessOpts{ - ID: execution.InitProcessID, - Spec: spec.Process, - Console: o.Console, - Stdin: o.Stdin, - Stdout: o.Stdout, - Stderr: o.Stderr, - }, - } - - processCtx := log.WithModule(log.WithModule(containerCtx, "process"), execution.InitProcessID) - process, err := newProcess(processCtx, processOpts) - if err != nil { - return nil, err - } - - s.monitorProcess(process) - container.AddProcess(process) - - s.addContainer(container) - - return container, nil -} - -func (s *ShimRuntime) Start(ctx context.Context, c *execution.Container) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Start()") - - cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "start", c.ID())...) - out, err := cmd.CombinedOutput() - if err != nil { - return errors.Wrapf(err, "'%s start' failed with output: %v", s.runtime, string(out)) - } - return nil -} - -func (s *ShimRuntime) List(ctx context.Context) ([]*execution.Container, error) { - log.G(s.ctx).Debug("List()") - - containers := make([]*execution.Container, 0) - s.mutex.Lock() - for _, c := range s.containers { - containers = append(containers, c) - } - s.mutex.Unlock() - - return containers, nil -} - -func (s *ShimRuntime) Load(ctx context.Context, id string) (*execution.Container, error) { - log.G(s.ctx).WithFields(logrus.Fields{"container-id": id}).Debug("Start()") - - s.mutex.Lock() - c, ok := s.containers[id] - s.mutex.Unlock() - - if !ok { - return nil, errors.New(execution.ErrContainerNotFound.Error()) - } - - return c, nil -} - -func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Delete()") - - if c.Status() != execution.Stopped { - return errors.Errorf("cannot delete a container in the '%s' state", c.Status()) - } - - c.Cleanup() - s.removeContainer(c) - return nil -} - -func (s *ShimRuntime) Pause(ctx context.Context, c *execution.Container) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Pause()") - - cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "pause", c.ID())...) - out, err := cmd.CombinedOutput() - if err != nil { - return errors.Wrapf(err, "'%s pause' failed with output: %v", s.runtime, string(out)) - } - return nil -} - -func (s *ShimRuntime) Resume(ctx context.Context, c *execution.Container) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Resume()") - - cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "resume", c.ID())...) - out, err := cmd.CombinedOutput() - if err != nil { - return errors.Wrapf(err, "'%s resume' failed with output: %v", s.runtime, string(out)) - } - return nil -} - -func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) { - log.G(s.ctx).WithFields(logrus.Fields{"container": c, "options": o}).Debug("StartProcess()") - - processOpts := newProcessOpts{ - shimBinary: s.binaryName, - runtime: s.runtime, - runtimeArgs: s.runtimeArgs, - container: c, - exec: true, - stateDir: c.ProcessStateDir(o.ID), - StartProcessOpts: o, - } - - processCtx := log.WithModule(log.WithModule(c.Context(), "process"), execution.InitProcessID) - process, err := newProcess(processCtx, processOpts) - if err != nil { - return nil, err - } - - process.status = execution.Running - s.monitorProcess(process) - - c.AddProcess(process) - return process, nil -} - -func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id, "signal": sig}). - Debug("SignalProcess()") - - process := c.GetProcess(id) - if process == nil { - return errors.Errorf("container %s has no process named %s", c.ID(), id) - } - err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal)) - if err != nil { - return errors.Wrapf(err, "failed to send %v signal to container %s process %v", sig, c.ID(), process.Pid()) - } - return err -} - -func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error { - log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}). - Debug("DeleteProcess()") - - if p := c.GetProcess(id); p != nil { - p.(*process).cleanup() - - return c.RemoveProcess(id) - } - - return errors.Errorf("container %s has no process named %s", c.ID(), id) -} - -func (s *ShimRuntime) monitor() { - var events [128]syscall.EpollEvent - for { - n, err := syscall.EpollWait(s.epollFd, events[:], -1) - if err != nil { - if err == syscall.EINTR { - continue - } - log.G(s.ctx).Error("epollwait failed:", err) - } - - for i := 0; i < n; i++ { - fd := int(events[i].Fd) - - s.mutex.Lock() - p := s.exitChannels[fd] - delete(s.exitChannels, fd) - s.mutex.Unlock() - - if err = syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ - Events: syscall.EPOLLHUP, - Fd: int32(fd), - }); err != nil { - log.G(s.ctx).Error("epollctl deletion failed:", err) - } - - close(p.exitChan) - } - } -} - -func (s *ShimRuntime) addContainer(c *execution.Container) { - s.mutex.Lock() - s.containers[c.ID()] = c - s.mutex.Unlock() -} - -func (s *ShimRuntime) removeContainer(c *execution.Container) { - s.mutex.Lock() - delete(s.containers, c.ID()) - s.mutex.Unlock() -} - -func (s *ShimRuntime) getContainer(id string) *execution.Container { - s.mutex.Lock() - c := s.containers[id] - s.mutex.Unlock() - - return c -} - -// monitorProcess adds a process to the list of monitored process if -// we fail to do so, we closed the exitChan channel used by Wait(). -// Since service always call on Wait() for generating "exit" events, -// this will ensure the process gets killed -func (s *ShimRuntime) monitorProcess(p *process) { - if p.status == execution.Stopped { - close(p.exitChan) - return - } - - fd := int(p.exitPipe.Fd()) - event := syscall.EpollEvent{ - Fd: int32(fd), - Events: syscall.EPOLLHUP, - } - s.mutex.Lock() - s.exitChannels[fd] = p - s.mutex.Unlock() - if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { - s.mutex.Lock() - delete(s.exitChannels, fd) - s.mutex.Unlock() - close(p.exitChan) - return - } - - // TODO: take care of the OOM handler -} - -func (s *ShimRuntime) unmonitorProcess(p *process) { - s.mutex.Lock() - for fd, proc := range s.exitChannels { - if proc == p { - delete(s.exitChannels, fd) - break - } - } - s.mutex.Unlock() -} - -func (s *ShimRuntime) loadContainers() { - cs, err := ioutil.ReadDir(s.root) - if err != nil { - log.G(s.ctx).WithField("statedir", s.root). - Warn("failed to load containers, state dir cannot be listed:", err) - return - } - - for _, c := range cs { - if !c.IsDir() { - continue - } - - stateDir := filepath.Join(s.root, c.Name()) - containerCtx := log.WithModule(log.WithModule(s.ctx, "container"), c.Name()) - container, err := execution.LoadContainer(containerCtx, stateDir, c.Name()) - if err != nil { - log.G(s.ctx).WithField("container-id", c.Name()).Warn(err) - continue - } - - processDirs, err := container.ProcessesStateDir() - if err != nil { - log.G(s.ctx).WithField("container-id", c.Name()).Warn(err) - continue - } - - for processID, processStateDir := range processDirs { - processCtx := log.WithModule(log.WithModule(containerCtx, "process"), processID) - var p *process - p, err = loadProcess(processCtx, processStateDir, processID) - if err != nil { - log.G(s.ctx).WithFields(logrus.Fields{"container-id": c.Name(), "process": processID}).Warn(err) - break - } - if processID == execution.InitProcessID && p.status == execution.Running { - p.status = s.loadContainerStatus(container.ID()) - } - container.AddProcess(p) - } - - // if successfull, add the container to our list - if err == nil { - for _, p := range container.Processes() { - s.monitorProcess(p.(*process)) - } - s.addContainer(container) - log.G(s.ctx).Infof("restored container %s", container.ID()) - } - } -} - -func (s *ShimRuntime) loadContainerStatus(id string) execution.Status { - cmd := exec.Command(s.runtime, append(s.runtimeArgs, "state", id)...) - out, err := cmd.CombinedOutput() - if err != nil { - return execution.Unknown - } - - var st struct{ Status string } - if err := json.NewDecoder(bytes.NewReader(out)).Decode(&st); err != nil { - return execution.Unknown - } - - return execution.Status(st.Status) -} diff --git a/execution/supervisor.go b/execution/supervisor.go deleted file mode 100644 index 35b9347..0000000 --- a/execution/supervisor.go +++ /dev/null @@ -1,12 +0,0 @@ -package execution - -type Supervisor struct { -} - -type waiter interface { - Wait() (uint32, error) -} - -func (s *Supervisor) Monitor(w waiter, cb func(uint32, error)) { - go cb(w.Wait()) -}