Remove execution/executors

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2017-02-06 08:38:11 -08:00
parent 6f9eda1134
commit 8fbdf1c0d7
3 changed files with 0 additions and 872 deletions

View file

@ -1,429 +0,0 @@
package shim
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/log"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
runc "github.com/crosbymichael/go-runc"
starttime "github.com/opencontainers/runc/libcontainer/system"
)
type newProcessOpts struct {
shimBinary string
runtime string
runtimeArgs []string
container *execution.Container
exec bool
stateDir string
execution.StartProcessOpts
}
func validateNewProcessOpts(o newProcessOpts) error {
if o.shimBinary == "" {
return errors.New("shim binary not specified")
}
if o.runtime == "" {
return errors.New("runtime not specified")
}
if o.container == nil {
return errors.New("container not specified")
}
if o.container.ID() == "" {
return errors.New("container id not specified")
}
if o.container.Bundle() == "" {
return errors.New("bundle not specified")
}
if o.stateDir == "" {
return errors.New("state dir not specified")
}
return nil
}
func newProcess(ctx context.Context, o newProcessOpts) (p *process, err error) {
if err = validateNewProcessOpts(o); err != nil {
return
}
p = &process{
id: o.ID,
stateDir: o.stateDir,
exitChan: make(chan struct{}),
ctx: ctx,
}
defer func() {
if err != nil {
p.cleanup()
p = nil
}
}()
if err = os.Mkdir(o.stateDir, 0700); err != nil {
err = errors.Wrap(err, "failed to create process state dir")
return
}
p.exitPipe, p.controlPipe, err = getControlPipes(o.stateDir)
if err != nil {
return
}
cmd, err := newShimProcess(o)
if err != nil {
return
}
defer func() {
if err != nil {
cmd.Process.Kill()
cmd.Wait()
}
}()
abortCh := make(chan syscall.WaitStatus, 1)
go func() {
var shimStatus syscall.WaitStatus
if err := cmd.Wait(); err != nil {
shimStatus = execution.UnknownStatusCode
} else {
shimStatus = cmd.ProcessState.Sys().(syscall.WaitStatus)
}
abortCh <- shimStatus
close(abortCh)
}()
p.pid, p.startTime, p.status, err = waitUntilReady(ctx, abortCh, o.stateDir)
if err != nil {
return
}
return
}
func loadProcess(ctx context.Context, stateDir, id string) (p *process, err error) {
p = &process{
id: id,
stateDir: stateDir,
exitChan: make(chan struct{}),
status: execution.Running,
ctx: ctx,
}
defer func() {
if err != nil {
p.cleanup()
p = nil
}
}()
p.pid, err = getPidFromFile(filepath.Join(stateDir, pidFilename))
if err != nil {
err = errors.Wrap(err, "failed to read pid")
return
}
p.startTime, err = getStartTimeFromFile(filepath.Join(stateDir, startTimeFilename))
if err != nil {
return
}
path := filepath.Join(stateDir, exitPipeFilename)
p.exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
err = errors.Wrapf(err, "failed to open exit pipe")
return
}
path = filepath.Join(stateDir, controlPipeFilename)
p.controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0)
if err != nil {
err = errors.Wrapf(err, "failed to open control pipe")
return
}
markAsStopped := func(p *process) (*process, error) {
p.setStatus(execution.Stopped)
return p, nil
}
if err = syscall.Kill(int(p.pid), 0); err != nil {
if err == syscall.ESRCH {
return markAsStopped(p)
}
err = errors.Wrapf(err, "failed to check if process is still alive")
return
}
cstime, err := starttime.GetProcessStartTime(int(p.pid))
if err != nil {
if os.IsNotExist(err) {
return markAsStopped(p)
}
err = errors.Wrapf(err, "failed retrieve current process start time")
return
}
if p.startTime != cstime {
return markAsStopped(p)
}
return
}
type process struct {
stateDir string
id string
pid int64
exitChan chan struct{}
exitPipe *os.File
controlPipe *os.File
startTime string
status execution.Status
ctx context.Context
mu sync.Mutex
}
func (p *process) ID() string {
return p.id
}
func (p *process) Pid() int64 {
return p.pid
}
func (p *process) Wait() (uint32, error) {
<-p.exitChan
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Debugf("wait is over")
// Cleanup those fds
p.exitPipe.Close()
p.controlPipe.Close()
// If the container process is still alive, it means the shim crashed
// and the child process had updated it PDEATHSIG to something
// else than SIGKILL. Or that epollCtl failed
if p.isAlive() {
err := syscall.Kill(int(p.pid), syscall.SIGKILL)
if err != nil {
return execution.UnknownStatusCode, errors.Wrap(err, "failed to kill process")
}
return uint32(128 + int(syscall.SIGKILL)), nil
}
data, err := ioutil.ReadFile(filepath.Join(p.stateDir, exitStatusFilename))
if err != nil {
return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status")
}
if len(data) == 0 {
return execution.UnknownStatusCode, errors.New(execution.ErrProcessNotExited.Error())
}
status, err := strconv.Atoi(string(data))
if err != nil {
return execution.UnknownStatusCode, errors.Wrapf(err, "failed to parse exit status")
}
p.setStatus(execution.Stopped)
return uint32(status), nil
}
func (p *process) Signal(sig os.Signal) error {
err := syscall.Kill(int(p.pid), sig.(syscall.Signal))
if err != nil {
return errors.Wrap(err, "failed to signal process")
}
return nil
}
func (p *process) Status() execution.Status {
p.mu.Lock()
s := p.status
p.mu.Unlock()
return s
}
func (p *process) setStatus(s execution.Status) {
p.mu.Lock()
p.status = s
p.mu.Unlock()
}
func (p *process) isAlive() bool {
if err := syscall.Kill(int(p.pid), 0); err != nil {
if err == syscall.ESRCH {
return false
}
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Warnf("kill(0) failed: %v", err)
return false
}
// check that we have the same starttime
stime, err := starttime.GetProcessStartTime(int(p.pid))
if err != nil {
if os.IsNotExist(err) {
return false
}
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
Warnf("failed to get process start time: %v", err)
return false
}
if p.startTime != stime {
return false
}
return true
}
func (p *process) cleanup() {
for _, f := range []*os.File{p.exitPipe, p.controlPipe} {
if f != nil {
f.Close()
}
}
if err := os.RemoveAll(p.stateDir); err != nil {
log.G(p.ctx).Warnf("failed to remove process state dir: %v", err)
}
}
func waitUntilReady(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int64, stime string, status execution.Status, err error) {
status = execution.Unknown
for {
select {
case <-ctx.Done():
return
case wait := <-abortCh:
if wait.Signaled() {
err = errors.Errorf("shim died prematurely: %v", wait.Signal())
return
}
err = errors.Errorf("shim exited prematurely with exit code %v", wait.ExitStatus())
return
default:
}
pid, err = getPidFromFile(filepath.Join(root, pidFilename))
if err == nil {
break
} else if !os.IsNotExist(err) {
return
}
}
status = execution.Created
stime, err = starttime.GetProcessStartTime(int(pid))
switch {
case os.IsNotExist(err):
status = execution.Stopped
case err != nil:
return
default:
var b []byte
path := filepath.Join(root, startTimeFilename)
b, err = ioutil.ReadFile(path)
switch {
case os.IsNotExist(err):
err = ioutil.WriteFile(path, []byte(stime), 0600)
if err != nil {
return
}
case err != nil:
err = errors.Wrapf(err, "failed to get start time for pid %d", pid)
return
case string(b) != stime:
status = execution.Stopped
}
}
return pid, stime, status, nil
}
func newShimProcess(o newProcessOpts) (*exec.Cmd, error) {
cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime)
cmd.Dir = o.stateDir
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
state := processState{
Process: o.Spec,
Exec: o.exec,
Stdin: o.Stdin,
Stdout: o.Stdout,
Stderr: o.Stderr,
RuntimeArgs: o.runtimeArgs,
NoPivotRoot: false,
CheckpointPath: "",
RootUID: int(o.Spec.User.UID),
RootGID: int(o.Spec.User.GID),
}
f, err := os.Create(filepath.Join(o.stateDir, "process.json"))
if err != nil {
return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID())
}
defer f.Close()
if err := json.NewEncoder(f).Encode(state); err != nil {
return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID())
}
if err := cmd.Start(); err != nil {
return nil, errors.Wrapf(err, "failed to start shim for container %s", o.container.ID())
}
return cmd, nil
}
func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) {
path := filepath.Join(root, exitPipeFilename)
if err = unix.Mkfifo(path, 0700); err != nil {
err = errors.Wrap(err, "failed to create shim exit fifo")
return
}
if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil {
err = errors.Wrap(err, "failed to open shim exit fifo")
return
}
path = filepath.Join(root, controlPipeFilename)
if err = unix.Mkfifo(path, 0700); err != nil {
err = errors.Wrap(err, "failed to create shim control fifo")
return
}
if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil {
err = errors.Wrap(err, "failed to open shim control fifo")
return
}
return
}
func getPidFromFile(path string) (int64, error) {
pid, err := runc.ReadPidFile(path)
if err != nil {
return -1, err
}
return int64(pid), nil
}
func getStartTimeFromFile(path string) (string, error) {
stime, err := ioutil.ReadFile(path)
if err != nil {
return "", errors.Wrapf(err, "failed to read start time")
}
return string(stime), nil
}

View file

@ -1,431 +0,0 @@
package shim
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/execution"
"github.com/docker/containerd/log"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
const (
DefaultShimBinary = "containerd-shim"
pidFilename = "pid"
startTimeFilename = "starttime"
exitPipeFilename = "exit"
controlPipeFilename = "control"
exitStatusFilename = "exitStatus"
)
func New(ctx context.Context, root, shim, runtime string, runtimeArgs []string) (*ShimRuntime, error) {
fd, err := syscall.EpollCreate1(0)
if err != nil {
return nil, errors.Wrap(err, "epollcreate1 failed")
}
s := &ShimRuntime{
ctx: ctx,
epollFd: fd,
root: root,
binaryName: shim,
runtime: runtime,
runtimeArgs: runtimeArgs,
exitChannels: make(map[int]*process),
containers: make(map[string]*execution.Container),
}
s.loadContainers()
go s.monitor()
return s, nil
}
type ShimRuntime struct {
ctx context.Context
mutex sync.Mutex
exitChannels map[int]*process
containers map[string]*execution.Container
epollFd int
root string
binaryName string
runtime string
runtimeArgs []string
}
type ProcessOpts struct {
Bundle string
Terminal bool
Stdin string
Stdout string
Stderr string
}
type processState struct {
specs.Process
Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"`
Stdout string `json:"containerdStdout"`
Stderr string `json:"containerdStderr"`
RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"`
CheckpointPath string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}
func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (*execution.Container, error) {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id, "options": o}).Debug("Create()")
if s.getContainer(id) != nil {
return nil, execution.ErrContainerExists
}
containerCtx := log.WithModule(log.WithModule(ctx, "container"), id)
container, err := execution.NewContainer(containerCtx, filepath.Join(s.root, id), id, o.Bundle)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
container.Cleanup()
}
}()
// extract Process spec from bundle's config.json
var spec specs.Spec
f, err := os.Open(filepath.Join(o.Bundle, "config.json"))
if err != nil {
return nil, errors.Wrap(err, "failed to open config.json")
}
defer f.Close()
if err := json.NewDecoder(f).Decode(&spec); err != nil {
return nil, errors.Wrap(err, "failed to decode container OCI specs")
}
processOpts := newProcessOpts{
shimBinary: s.binaryName,
runtime: s.runtime,
runtimeArgs: s.runtimeArgs,
container: container,
exec: false,
stateDir: container.ProcessStateDir(execution.InitProcessID),
StartProcessOpts: execution.StartProcessOpts{
ID: execution.InitProcessID,
Spec: spec.Process,
Console: o.Console,
Stdin: o.Stdin,
Stdout: o.Stdout,
Stderr: o.Stderr,
},
}
processCtx := log.WithModule(log.WithModule(containerCtx, "process"), execution.InitProcessID)
process, err := newProcess(processCtx, processOpts)
if err != nil {
return nil, err
}
s.monitorProcess(process)
container.AddProcess(process)
s.addContainer(container)
return container, nil
}
func (s *ShimRuntime) Start(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Start()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "start", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s start' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) List(ctx context.Context) ([]*execution.Container, error) {
log.G(s.ctx).Debug("List()")
containers := make([]*execution.Container, 0)
s.mutex.Lock()
for _, c := range s.containers {
containers = append(containers, c)
}
s.mutex.Unlock()
return containers, nil
}
func (s *ShimRuntime) Load(ctx context.Context, id string) (*execution.Container, error) {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id}).Debug("Start()")
s.mutex.Lock()
c, ok := s.containers[id]
s.mutex.Unlock()
if !ok {
return nil, errors.New(execution.ErrContainerNotFound.Error())
}
return c, nil
}
func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Delete()")
if c.Status() != execution.Stopped {
return errors.Errorf("cannot delete a container in the '%s' state", c.Status())
}
c.Cleanup()
s.removeContainer(c)
return nil
}
func (s *ShimRuntime) Pause(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Pause()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "pause", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s pause' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) Resume(ctx context.Context, c *execution.Container) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Resume()")
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "resume", c.ID())...)
out, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "'%s resume' failed with output: %v", s.runtime, string(out))
}
return nil
}
func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "options": o}).Debug("StartProcess()")
processOpts := newProcessOpts{
shimBinary: s.binaryName,
runtime: s.runtime,
runtimeArgs: s.runtimeArgs,
container: c,
exec: true,
stateDir: c.ProcessStateDir(o.ID),
StartProcessOpts: o,
}
processCtx := log.WithModule(log.WithModule(c.Context(), "process"), execution.InitProcessID)
process, err := newProcess(processCtx, processOpts)
if err != nil {
return nil, err
}
process.status = execution.Running
s.monitorProcess(process)
c.AddProcess(process)
return process, nil
}
func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id, "signal": sig}).
Debug("SignalProcess()")
process := c.GetProcess(id)
if process == nil {
return errors.Errorf("container %s has no process named %s", c.ID(), id)
}
err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal))
if err != nil {
return errors.Wrapf(err, "failed to send %v signal to container %s process %v", sig, c.ID(), process.Pid())
}
return err
}
func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error {
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}).
Debug("DeleteProcess()")
if p := c.GetProcess(id); p != nil {
p.(*process).cleanup()
return c.RemoveProcess(id)
}
return errors.Errorf("container %s has no process named %s", c.ID(), id)
}
func (s *ShimRuntime) monitor() {
var events [128]syscall.EpollEvent
for {
n, err := syscall.EpollWait(s.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
log.G(s.ctx).Error("epollwait failed:", err)
}
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
s.mutex.Lock()
p := s.exitChannels[fd]
delete(s.exitChannels, fd)
s.mutex.Unlock()
if err = syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
log.G(s.ctx).Error("epollctl deletion failed:", err)
}
close(p.exitChan)
}
}
}
func (s *ShimRuntime) addContainer(c *execution.Container) {
s.mutex.Lock()
s.containers[c.ID()] = c
s.mutex.Unlock()
}
func (s *ShimRuntime) removeContainer(c *execution.Container) {
s.mutex.Lock()
delete(s.containers, c.ID())
s.mutex.Unlock()
}
func (s *ShimRuntime) getContainer(id string) *execution.Container {
s.mutex.Lock()
c := s.containers[id]
s.mutex.Unlock()
return c
}
// monitorProcess adds a process to the list of monitored process if
// we fail to do so, we closed the exitChan channel used by Wait().
// Since service always call on Wait() for generating "exit" events,
// this will ensure the process gets killed
func (s *ShimRuntime) monitorProcess(p *process) {
if p.status == execution.Stopped {
close(p.exitChan)
return
}
fd := int(p.exitPipe.Fd())
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
s.mutex.Lock()
s.exitChannels[fd] = p
s.mutex.Unlock()
if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
s.mutex.Lock()
delete(s.exitChannels, fd)
s.mutex.Unlock()
close(p.exitChan)
return
}
// TODO: take care of the OOM handler
}
func (s *ShimRuntime) unmonitorProcess(p *process) {
s.mutex.Lock()
for fd, proc := range s.exitChannels {
if proc == p {
delete(s.exitChannels, fd)
break
}
}
s.mutex.Unlock()
}
func (s *ShimRuntime) loadContainers() {
cs, err := ioutil.ReadDir(s.root)
if err != nil {
log.G(s.ctx).WithField("statedir", s.root).
Warn("failed to load containers, state dir cannot be listed:", err)
return
}
for _, c := range cs {
if !c.IsDir() {
continue
}
stateDir := filepath.Join(s.root, c.Name())
containerCtx := log.WithModule(log.WithModule(s.ctx, "container"), c.Name())
container, err := execution.LoadContainer(containerCtx, stateDir, c.Name())
if err != nil {
log.G(s.ctx).WithField("container-id", c.Name()).Warn(err)
continue
}
processDirs, err := container.ProcessesStateDir()
if err != nil {
log.G(s.ctx).WithField("container-id", c.Name()).Warn(err)
continue
}
for processID, processStateDir := range processDirs {
processCtx := log.WithModule(log.WithModule(containerCtx, "process"), processID)
var p *process
p, err = loadProcess(processCtx, processStateDir, processID)
if err != nil {
log.G(s.ctx).WithFields(logrus.Fields{"container-id": c.Name(), "process": processID}).Warn(err)
break
}
if processID == execution.InitProcessID && p.status == execution.Running {
p.status = s.loadContainerStatus(container.ID())
}
container.AddProcess(p)
}
// if successfull, add the container to our list
if err == nil {
for _, p := range container.Processes() {
s.monitorProcess(p.(*process))
}
s.addContainer(container)
log.G(s.ctx).Infof("restored container %s", container.ID())
}
}
}
func (s *ShimRuntime) loadContainerStatus(id string) execution.Status {
cmd := exec.Command(s.runtime, append(s.runtimeArgs, "state", id)...)
out, err := cmd.CombinedOutput()
if err != nil {
return execution.Unknown
}
var st struct{ Status string }
if err := json.NewDecoder(bytes.NewReader(out)).Decode(&st); err != nil {
return execution.Unknown
}
return execution.Status(st.Status)
}

View file

@ -1,12 +0,0 @@
package execution
type Supervisor struct {
}
type waiter interface {
Wait() (uint32, error)
}
func (s *Supervisor) Monitor(w waiter, cb func(uint32, error)) {
go cb(w.Wait())
}