From 3f2d9d19bfab51ff07f940408a197882034120b3 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 18 Jan 2017 14:38:32 -0800 Subject: [PATCH 1/2] execution: remove oci executor Signed-off-by: Kenfe-Mickael Laventure --- cmd/containerd/main.go | 8 +- execution/executors/oci/console.go | 54 ------- execution/executors/oci/io.go | 53 ------ execution/executors/oci/oci.go | 251 ----------------------------- execution/executors/oci/process.go | 102 ------------ 5 files changed, 1 insertion(+), 467 deletions(-) delete mode 100644 execution/executors/oci/console.go delete mode 100644 execution/executors/oci/io.go delete mode 100644 execution/executors/oci/oci.go delete mode 100644 execution/executors/oci/process.go diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 605e0f3..775b364 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -20,7 +20,6 @@ import ( api "github.com/docker/containerd/api/execution" "github.com/docker/containerd/events" "github.com/docker/containerd/execution" - "github.com/docker/containerd/execution/executors/oci" "github.com/docker/containerd/execution/executors/shim" "github.com/docker/containerd/log" metrics "github.com/docker/go-metrics" @@ -57,7 +56,7 @@ high performance container runtime cli.StringFlag{ Name: "runtime", Usage: "runtime for execution", - Value: "runc", + Value: "shim", }, cli.StringFlag{ Name: "socket, s", @@ -119,11 +118,6 @@ high performance container runtime runtime = context.GlobalString("runtime") ) switch runtime { - case "runc": - executor, err = oci.New(context.GlobalString("root")) - if err != nil { - return err - } case "shim": root := filepath.Join(context.GlobalString("root"), "shim") err = os.Mkdir(root, 0700) diff --git a/execution/executors/oci/console.go b/execution/executors/oci/console.go deleted file mode 100644 index 8d34c53..0000000 --- a/execution/executors/oci/console.go +++ /dev/null @@ -1,54 +0,0 @@ -package oci - -import ( - "fmt" - "os" - "syscall" - "unsafe" -) - -// newConsole returns an initialized console that can be used within a container by copying bytes -// from the master side to the slave that is attached as the tty for the container's init process. -func newConsole(uid, gid int) (*os.File, string, error) { - master, err := os.OpenFile("/dev/ptmx", syscall.O_RDWR|syscall.O_NOCTTY|syscall.O_CLOEXEC, 0) - if err != nil { - return nil, "", err - } - console, err := ptsname(master) - if err != nil { - return nil, "", err - } - if err := unlockpt(master); err != nil { - return nil, "", err - } - if err := os.Chmod(console, 0600); err != nil { - return nil, "", err - } - if err := os.Chown(console, uid, gid); err != nil { - return nil, "", err - } - return master, console, nil -} - -func ioctl(fd uintptr, flag, data uintptr) error { - if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, flag, data); err != 0 { - return err - } - return nil -} - -// unlockpt unlocks the slave pseudoterminal device corresponding to the master pseudoterminal referred to by f. -// unlockpt should be called before opening the slave side of a pty. -func unlockpt(f *os.File) error { - var u int32 - return ioctl(f.Fd(), syscall.TIOCSPTLCK, uintptr(unsafe.Pointer(&u))) -} - -// ptsname retrieves the name of the first available pts for the given master. -func ptsname(f *os.File) (string, error) { - var n int32 - if err := ioctl(f.Fd(), syscall.TIOCGPTN, uintptr(unsafe.Pointer(&n))); err != nil { - return "", err - } - return fmt.Sprintf("/dev/pts/%d", n), nil -} diff --git a/execution/executors/oci/io.go b/execution/executors/oci/io.go deleted file mode 100644 index 03b70de..0000000 --- a/execution/executors/oci/io.go +++ /dev/null @@ -1,53 +0,0 @@ -package oci - -import ( - "io" - "os" - - "github.com/crosbymichael/go-runc" -) - -type OIO struct { - master *os.File // master holds a fd to the created pty if any - console string // console holds the path to the slave linked to master - rio runc.IO // rio holds the open fifos for stdios -} - -func newOIO(stdin, stdout, stderr string, console bool) (o OIO, err error) { - defer func() { - if err != nil { - o.cleanup() - } - }() - - if o.rio.Stdin, err = os.OpenFile(stdin, os.O_RDONLY, 0); err != nil { - return - } - if o.rio.Stdout, err = os.OpenFile(stdout, os.O_WRONLY, 0); err != nil { - return - } - if o.rio.Stderr, err = os.OpenFile(stderr, os.O_WRONLY, 0); err != nil { - return - } - - if console { - o.master, o.console, err = newConsole(0, 0) - if err != nil { - return - } - go io.Copy(o.master, o.rio.Stdin) - go func() { - io.Copy(o.rio.Stdout, o.master) - o.master.Close() - }() - } - - return -} - -func (o OIO) cleanup() { - if o.master != nil { - o.master.Close() - } - o.rio.Close() -} diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go deleted file mode 100644 index 620df49..0000000 --- a/execution/executors/oci/oci.go +++ /dev/null @@ -1,251 +0,0 @@ -package oci - -import ( - "context" - "errors" - "fmt" - "os" - "path/filepath" - "syscall" - - "github.com/crosbymichael/go-runc" - "github.com/docker/containerd/execution" - "github.com/docker/containerd/sys" -) - -const ( - initProcessID = "init" -) - -const ( - PidFilename = "pid" - StartTimeFilename = "starttime" -) - -var ( - ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") -) - -func New(root string) (*OCIRuntime, error) { - err := sys.SetSubreaper(1) - if err != nil { - return nil, err - } - go func() { - syscall.Wait4(-1, nil, 0, nil) - }() - return &OCIRuntime{ - root: root, - runc: &runc.Runc{ - Root: filepath.Join(root, "runc"), - }, - ios: make(map[string]OIO), - }, nil -} - -type OCIRuntime struct { - root string - runc *runc.Runc - ios map[string]OIO // ios tracks created process io for cleanup purpose on delete -} - -func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (container *execution.Container, err error) { - if o.Bundle == "" { - return nil, errors.New("bundle path cannot be an empty string") - } - oio, err := newOIO(o.Stdin, o.Stdout, o.Stderr, o.Console) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - oio.cleanup() - } - }() - - if container, err = execution.NewContainer(r.root, id, o.Bundle); err != nil { - return nil, err - } - defer func(c *execution.Container) { - if err != nil { - c.StateDir().Delete() - } - }(container) - - initStateDir, err := container.StateDir().NewProcess(initProcessID) - if err != nil { - return nil, err - } - pidFile := filepath.Join(initStateDir, PidFilename) - err = r.runc.Create(ctx, id, o.Bundle, &runc.CreateOpts{ - PidFile: pidFile, - Console: oio.console, - IO: oio.rio, - }) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - r.runc.Kill(ctx, id, int(syscall.SIGKILL)) - r.runc.Delete(ctx, id) - } - }() - - process, err := newProcess(initProcessID, initStateDir, execution.Created) - if err != nil { - return nil, err - } - - container.AddProcess(process, true) - - r.ios[id] = oio - - return container, nil -} - -func (r *OCIRuntime) Start(ctx context.Context, c *execution.Container) error { - return r.runc.Start(ctx, c.ID()) -} - -func (r *OCIRuntime) Status(ctx context.Context, c *execution.Container) (execution.Status, error) { - state, err := r.runc.State(ctx, c.ID()) - if err != nil { - return "", err - } - return execution.Status(state.Status), nil -} - -func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { - container := execution.LoadContainer( - execution.StateDir(filepath.Join(r.root, runcC.ID)), - runcC.ID, - runcC.Bundle, - execution.Status(runcC.Status), - ) - - dirs, err := container.StateDir().Processes() - if err != nil { - return nil, err - } - for _, d := range dirs { - process, err := newProcess(filepath.Base(d), d, execution.Running) - if err != nil { - return nil, err - } - container.AddProcess(process, process.Pid() == int64(runcC.Pid)) - } - - return container, nil -} - -func (r *OCIRuntime) List(ctx context.Context) ([]*execution.Container, error) { - runcCs, err := r.runc.List(ctx) - if err != nil { - return nil, err - } - - var containers []*execution.Container - for _, c := range runcCs { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - container, err := r.load(c) - if err != nil { - return nil, err - } - containers = append(containers, container) - } - } - - return containers, nil -} - -func (r *OCIRuntime) Load(ctx context.Context, id string) (*execution.Container, error) { - runcC, err := r.runc.State(ctx, id) - if err != nil { - return nil, err - } - - return r.load(runcC) -} - -func (r *OCIRuntime) Delete(ctx context.Context, c *execution.Container) error { - id := c.ID() - if err := r.runc.Delete(ctx, id); err != nil { - return err - } - c.StateDir().Delete() - r.ios[id].cleanup() - delete(r.ios, id) - return nil -} - -func (r *OCIRuntime) Pause(ctx context.Context, c *execution.Container) error { - return r.runc.Pause(ctx, c.ID()) -} - -func (r *OCIRuntime) Resume(ctx context.Context, c *execution.Container) error { - return r.runc.Resume(ctx, c.ID()) -} - -func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) { - oio, err := newOIO(o.Stdin, o.Stdout, o.Stderr, o.Console) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - oio.cleanup() - } - }() - - procStateDir, err := c.StateDir().NewProcess(o.ID) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - c.StateDir().DeleteProcess(o.ID) - } - }() - - pidFile := filepath.Join(procStateDir, PidFilename) - if err := r.runc.Exec(ctx, c.ID(), o.Spec, &runc.ExecOpts{ - PidFile: pidFile, - Detach: false, - Console: oio.console, - Cwd: o.Spec.Cwd, - IO: oio.rio, - }); err != nil { - return nil, err - } - - process, err := newProcess(o.ID, procStateDir, execution.Running) - if err != nil { - return nil, err - } - - c.AddProcess(process, false) - - r.ios[fmt.Sprintf("%s-%s", c.ID(), process.ID())] = oio - - return process, nil -} - -func (r *OCIRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error { - process := c.GetProcess(id) - if process == nil { - return fmt.Errorf("Make a Process Not Found error") - } - return syscall.Kill(int(process.Pid()), sig.(syscall.Signal)) -} - -func (r *OCIRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error { - ioID := fmt.Sprintf("%s-%s", c.ID(), id) - r.ios[ioID].cleanup() - delete(r.ios, ioID) - c.RemoveProcess(id) - return c.StateDir().DeleteProcess(id) -} diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go deleted file mode 100644 index 6e6095d..0000000 --- a/execution/executors/oci/process.go +++ /dev/null @@ -1,102 +0,0 @@ -package oci - -import ( - "fmt" - "io/ioutil" - "os" - "path/filepath" - "syscall" - - "github.com/crosbymichael/go-runc" - "github.com/docker/containerd/execution" - starttime "github.com/opencontainers/runc/libcontainer/system" -) - -func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) { - pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) - if err != nil { - return nil, err - } - if err := syscall.Kill(pid, 0); err != nil { - if err == syscall.ESRCH { - status = execution.Stopped - } else { - return nil, err - } - } - if status != execution.Stopped { - stime, err := starttime.GetProcessStartTime(pid) - switch { - case os.IsNotExist(err): - status = execution.Stopped - case err != nil: - return nil, err - default: - b, err := ioutil.ReadFile(filepath.Join(stateDir, StartTimeFilename)) - switch { - case os.IsNotExist(err): - err = ioutil.WriteFile(filepath.Join(stateDir, StartTimeFilename), []byte(stime), 0600) - if err != nil { - return nil, err - } - case err != nil: - return nil, err - case string(b) != stime: - status = execution.Stopped - } - } - } - return &process{ - id: id, - pid: pid, - status: status, - exitCode: execution.UnknownStatusCode, - }, nil -} - -type process struct { - id string - pid int - status execution.Status - exitCode uint32 -} - -func (p *process) ID() string { - return p.id -} - -func (p *process) Pid() int64 { - return int64(p.pid) -} - -func (p *process) Wait() (uint32, error) { - if p.status != execution.Stopped { - var wstatus syscall.WaitStatus - _, err := syscall.Wait4(p.pid, &wstatus, 0, nil) - if err != nil { - // This process doesn't belong to us - p.exitCode = execution.UnknownStatusCode - return p.exitCode, nil - } - // TODO: implement kill-all if we are the init pid? - p.status = execution.Stopped - p.exitCode = uint32(wstatus.ExitStatus()) - } - return p.exitCode, nil - -} - -func (p *process) Signal(s os.Signal) error { - if p.status != execution.Stopped { - sig, ok := s.(syscall.Signal) - if !ok { - return fmt.Errorf("invalid signal %v", s) - } - return syscall.Kill(p.pid, sig) - } - return execution.ErrProcessNotFound -} - -func (p *process) Status() execution.Status { - return p.status -} From bd6057c8e1b9885ed6a72ca18f31e733819c6beb Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 18 Jan 2017 20:24:53 -0800 Subject: [PATCH 2/2] execution: remove statedir type Signed-off-by: Kenfe-Mickael Laventure --- execution/container.go | 196 ++++++++++++++++++++-------- execution/executors/shim/process.go | 188 ++++++++++++++------------ execution/executors/shim/shim.go | 114 ++++++++-------- execution/statedir.go | 82 ------------ execution/status.go | 1 + 5 files changed, 308 insertions(+), 273 deletions(-) delete mode 100644 execution/statedir.go diff --git a/execution/container.go b/execution/container.go index 89feb68..c425540 100644 --- a/execution/container.go +++ b/execution/container.go @@ -1,91 +1,179 @@ package execution -import "fmt" +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "sync" -func NewContainer(stateRoot, id, bundle string) (*Container, error) { - stateDir, err := NewStateDir(stateRoot, id) - if err != nil { - return nil, err - } - return &Container{ + "github.com/docker/containerd/log" + "github.com/pkg/errors" +) + +const ( + InitProcessID = "init" + processesDirName = "processes" + bundleFileName = "bundle" +) + +func LoadContainer(ctx context.Context, stateDir, id string) (c *Container, err error) { + c = &Container{ id: id, - bundle: bundle, stateDir: stateDir, - status: Created, - processes: make(map[string]Process), - }, nil + processes: make(map[string]Process, 1), + ctx: ctx, + status: Unknown, + } + + data, err := ioutil.ReadFile(filepath.Join(stateDir, bundleFileName)) + if err != nil { + err = errors.Wrapf(err, "failed to read bundle path") + return + } + c.bundle = string(data) + + return } -func LoadContainer(dir StateDir, id, bundle string, status Status) *Container { - return &Container{ +func NewContainer(ctx context.Context, stateDir, id, bundle string) (c *Container, err error) { + c = &Container{ id: id, - stateDir: dir, + stateDir: stateDir, bundle: bundle, - status: status, - processes: make(map[string]Process), + processes: make(map[string]Process, 1), + status: Created, + ctx: ctx, } + defer func() { + if err != nil { + c.Cleanup() + c = nil + } + }() + + if err = os.Mkdir(stateDir, 0700); err != nil { + err = errors.Wrap(err, "failed to create container state dir") + return + } + + bundleFile := filepath.Join(stateDir, bundleFileName) + if err = ioutil.WriteFile(bundleFile, []byte(bundle), 0600); err != nil { + err = errors.Wrap(err, "failed to store bundle path") + return + } + + processesDir := filepath.Join(stateDir, processesDirName) + if err = os.Mkdir(processesDir, 0700); err != nil { + err = errors.Wrap(err, "failed to create processes statedir") + return + } + + return } type Container struct { - id string - bundle string - stateDir StateDir - initPid int64 - status Status - + id string + stateDir string + bundle string processes map[string]Process + status Status + + ctx context.Context + mu sync.Mutex } func (c *Container) ID() string { return c.id } -func (c *Container) Status() Status { - for _, p := range c.processes { - if p.Pid() == c.initPid { - c.status = p.Status() - break - } - } - return c.status -} - func (c *Container) Bundle() string { return c.bundle } -func (c *Container) StateDir() StateDir { - return c.stateDir -} - func (c *Container) Wait() (uint32, error) { - for _, p := range c.processes { - if p.Pid() == c.initPid { - return p.Wait() - } - } - return 0, fmt.Errorf("no init process") + initProcess := c.GetProcess(InitProcessID) + return initProcess.Wait() } -func (c *Container) AddProcess(p Process, isInit bool) { - if isInit { - c.initPid = p.Pid() - } +func (c *Container) Status() Status { + initProcess := c.GetProcess(InitProcessID) + return initProcess.Status() +} + +func (c *Container) AddProcess(p Process) { + c.mu.Lock() c.processes[p.ID()] = p + c.mu.Unlock() +} + +func (c *Container) RemoveProcess(id string) error { + if _, ok := c.processes[id]; !ok { + return errors.Errorf("no such process %s", id) + } + + c.mu.Lock() + delete(c.processes, id) + c.mu.Unlock() + + processStateDir := filepath.Join(c.stateDir, processesDirName, id) + err := os.RemoveAll(processStateDir) + if err != nil { + return errors.Wrap(err, "failed to remove process state dir") + } + + return nil } func (c *Container) GetProcess(id string) Process { + c.mu.Lock() + defer c.mu.Unlock() return c.processes[id] } -func (c *Container) RemoveProcess(id string) { - delete(c.processes, id) +func (c *Container) Processes() []Process { + var procs []Process + + c.mu.Lock() + for _, p := range c.processes { + procs = append(procs, p) + } + c.mu.Unlock() + + return procs } -func (c *Container) Processes() []Process { - var out []Process - for _, p := range c.processes { - out = append(out, p) - } - return out +// ProcessStateDir returns the path of the state dir for a given +// process id. The process doesn't have to exist for this to succeed. +func (c *Container) ProcessStateDir(id string) string { + return filepath.Join(c.stateDir, processesDirName, id) +} + +// ProcessesStateDir returns a map matching process ids to their state +// directory +func (c *Container) ProcessesStateDir() (map[string]string, error) { + root := filepath.Join(c.stateDir, processesDirName) + dirs, err := ioutil.ReadDir(root) + if err != nil { + return nil, errors.Wrapf(err, "failed to list processes state dirs") + } + + procs := make(map[string]string, 1) + for _, d := range dirs { + if d.IsDir() { + procs[d.Name()] = filepath.Join(root, d.Name()) + } + } + + return procs, nil +} + +func (c *Container) Cleanup() { + if err := os.RemoveAll(c.stateDir); err != nil { + log.G(c.ctx).Warnf("failed to remove container state dir: %v", err) + } +} + +func (c *Container) Context() context.Context { + return c.ctx } diff --git a/execution/executors/shim/process.go b/execution/executors/shim/process.go index b1045cc..2e16cab 100644 --- a/execution/executors/shim/process.go +++ b/execution/executors/shim/process.go @@ -27,34 +27,37 @@ type newProcessOpts struct { runtimeArgs []string container *execution.Container exec bool + stateDir string execution.StartProcessOpts } -func newProcess(ctx context.Context, o newProcessOpts) (*process, error) { - procStateDir, err := o.container.StateDir().NewProcess(o.ID) - if err != nil { - return nil, err +func newProcess(ctx context.Context, o newProcessOpts) (p *process, err error) { + p = &process{ + id: o.ID, + stateDir: o.stateDir, + exitChan: make(chan struct{}), + ctx: ctx, } defer func() { if err != nil { - o.container.StateDir().DeleteProcess(o.ID) + p.cleanup() + p = nil } }() - exitPipe, controlPipe, err := getControlPipes(procStateDir) - if err != nil { - return nil, err + if err = os.Mkdir(o.stateDir, 0700); err != nil { + err = errors.Wrap(err, "failed to create process state dir") + return } - defer func() { - if err != nil { - exitPipe.Close() - controlPipe.Close() - } - }() - cmd, err := newShim(o, procStateDir) + p.exitPipe, p.controlPipe, err = getControlPipes(o.stateDir) if err != nil { - return nil, err + return + } + + cmd, err := newShimProcess(o) + if err != nil { + return } defer func() { if err != nil { @@ -75,70 +78,52 @@ func newProcess(ctx context.Context, o newProcessOpts) (*process, error) { close(abortCh) }() - process := &process{ - root: procStateDir, - id: o.ID, - exitChan: make(chan struct{}), - exitPipe: exitPipe, - controlPipe: controlPipe, - } - - pid, stime, status, err := waitForPid(ctx, abortCh, procStateDir) + p.pid, p.startTime, p.status, err = waitUntilReady(ctx, abortCh, o.stateDir) if err != nil { - return nil, err + return } - process.pid = int64(pid) - process.status = status - process.startTime = stime - return process, nil + return } -func loadProcess(root, id string) (*process, error) { - pid, err := runc.ReadPidFile(filepath.Join(root, pidFilename)) - if err != nil { - return nil, err - } - - stime, err := ioutil.ReadFile(filepath.Join(root, startTimeFilename)) - if err != nil { - return nil, err - } - - path := filepath.Join(root, exitPipeFilename) - exitPipe, err := os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return nil, err +func loadProcess(ctx context.Context, stateDir, id string) (p *process, err error) { + p = &process{ + id: id, + stateDir: stateDir, + exitChan: make(chan struct{}), + status: execution.Running, + ctx: ctx, } defer func() { if err != nil { - exitPipe.Close() + p.cleanup() + p = nil } }() - path = filepath.Join(root, controlPipeFilename) - controlPipe, err := os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) + p.pid, err = getPidFromFile(filepath.Join(stateDir, pidFilename)) if err != nil { - return nil, err + err = errors.Wrap(err, "failed to read pid") + return } - defer func() { - if err != nil { - controlPipe.Close() - } - }() - p := &process{ - root: root, - id: id, - pid: int64(pid), - exitChan: make(chan struct{}), - exitPipe: exitPipe, - controlPipe: controlPipe, - startTime: string(stime), - // TODO: status may need to be stored on disk to handle - // Created state for init (i.e. a Start is needed to run the - // container) - status: execution.Running, + p.startTime, err = getStartTimeFromFile(filepath.Join(stateDir, startTimeFilename)) + if err != nil { + return + } + + path := filepath.Join(stateDir, exitPipeFilename) + p.exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + err = errors.Wrapf(err, "failed to open exit pipe") + return + } + + path = filepath.Join(stateDir, controlPipeFilename) + p.controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0) + if err != nil { + err = errors.Wrapf(err, "failed to open control pipe") + return } markAsStopped := func(p *process) (*process, error) { @@ -146,30 +131,32 @@ func loadProcess(root, id string) (*process, error) { return p, nil } - if err = syscall.Kill(pid, 0); err != nil { + if err = syscall.Kill(int(p.pid), 0); err != nil { if err == syscall.ESRCH { return markAsStopped(p) } - return nil, err + err = errors.Wrapf(err, "failed to check if process is still alive") + return } - cstime, err := starttime.GetProcessStartTime(pid) + cstime, err := starttime.GetProcessStartTime(int(p.pid)) if err != nil { if os.IsNotExist(err) { return markAsStopped(p) } - return nil, err + err = errors.Wrapf(err, "failed retrieve current process start time") + return } if p.startTime != cstime { return markAsStopped(p) } - return p, nil + return } type process struct { - root string + stateDir string id string pid int64 exitChan chan struct{} @@ -211,7 +198,7 @@ func (p *process) Wait() (uint32, error) { return uint32(128 + int(syscall.SIGKILL)), nil } - data, err := ioutil.ReadFile(filepath.Join(p.root, exitStatusFilename)) + data, err := ioutil.ReadFile(filepath.Join(p.stateDir, exitStatusFilename)) if err != nil { return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status") } @@ -278,7 +265,19 @@ func (p *process) isAlive() bool { return true } -func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int, stime string, status execution.Status, err error) { +func (p *process) cleanup() { + for _, f := range []*os.File{p.exitPipe, p.controlPipe} { + if f != nil { + f.Close() + } + } + + if err := os.RemoveAll(p.stateDir); err != nil { + log.G(p.ctx).Warnf("failed to remove process state dir: %v", err) + } +} + +func waitUntilReady(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int64, stime string, status execution.Status, err error) { status = execution.Unknown for { select { @@ -293,7 +292,7 @@ func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root strin return default: } - pid, err = runc.ReadPidFile(filepath.Join(root, pidFilename)) + pid, err = getPidFromFile(filepath.Join(root, pidFilename)) if err == nil { break } else if !os.IsNotExist(err) { @@ -301,7 +300,7 @@ func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root strin } } status = execution.Created - stime, err = starttime.GetProcessStartTime(pid) + stime, err = starttime.GetProcessStartTime(int(pid)) switch { case os.IsNotExist(err): status = execution.Stopped @@ -328,9 +327,9 @@ func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root strin return pid, stime, status, nil } -func newShim(o newProcessOpts, workDir string) (*exec.Cmd, error) { +func newShimProcess(o newProcessOpts) (*exec.Cmd, error) { cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime) - cmd.Dir = workDir + cmd.Dir = o.stateDir cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } @@ -348,12 +347,11 @@ func newShim(o newProcessOpts, workDir string) (*exec.Cmd, error) { RootGID: int(o.Spec.User.GID), } - f, err := os.Create(filepath.Join(workDir, "process.json")) + f, err := os.Create(filepath.Join(o.stateDir, "process.json")) if err != nil { return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID()) } defer f.Close() - if err := json.NewEncoder(f).Encode(state); err != nil { return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID()) } @@ -368,19 +366,39 @@ func newShim(o newProcessOpts, workDir string) (*exec.Cmd, error) { func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) { path := filepath.Join(root, exitPipeFilename) if err = unix.Mkfifo(path, 0700); err != nil { - return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim exit fifo") + err = errors.Wrap(err, "failed to create shim exit fifo") + return } if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil { - return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim exit fifo") + err = errors.Wrap(err, "failed to open shim exit fifo") + return } path = filepath.Join(root, controlPipeFilename) if err = unix.Mkfifo(path, 0700); err != nil { - return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim control fifo") + err = errors.Wrap(err, "failed to create shim control fifo") + return } if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil { - return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim control fifo") + err = errors.Wrap(err, "failed to open shim control fifo") + return } - return exitPipe, controlPipe, nil + return +} + +func getPidFromFile(path string) (int64, error) { + pid, err := runc.ReadPidFile(path) + if err != nil { + return -1, err + } + return int64(pid), nil +} + +func getStartTimeFromFile(path string) (string, error) { + stime, err := ioutil.ReadFile(path) + if err != nil { + return "", errors.Wrapf(err, "failed to read start time") + } + return string(stime), nil } diff --git a/execution/executors/shim/shim.go b/execution/executors/shim/shim.go index fccbf30..8271c83 100644 --- a/execution/executors/shim/shim.go +++ b/execution/executors/shim/shim.go @@ -1,6 +1,7 @@ package shim import ( + "bytes" "context" "encoding/json" "io/ioutil" @@ -24,7 +25,6 @@ const ( startTimeFilename = "starttime" exitPipeFilename = "exit" controlPipeFilename = "control" - initProcessID = "init" exitStatusFilename = "exitStatus" ) @@ -93,21 +93,17 @@ func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateO return nil, execution.ErrContainerExists } - container, err := execution.NewContainer(s.root, id, o.Bundle) + containerCtx := log.WithModule(log.WithModule(ctx, "container"), id) + container, err := execution.NewContainer(containerCtx, filepath.Join(s.root, id), id, o.Bundle) if err != nil { return nil, err } defer func() { if err != nil { - container.StateDir().Delete() + container.Cleanup() } }() - err = ioutil.WriteFile(filepath.Join(string(container.StateDir()), "bundle"), []byte(o.Bundle), 0600) - if err != nil { - return nil, errors.Wrap(err, "failed to save bundle path to disk") - } - // extract Process spec from bundle's config.json var spec specs.Spec f, err := os.Open(filepath.Join(o.Bundle, "config.json")) @@ -125,8 +121,9 @@ func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateO runtimeArgs: s.runtimeArgs, container: container, exec: false, + stateDir: container.ProcessStateDir(execution.InitProcessID), StartProcessOpts: execution.StartProcessOpts{ - ID: initProcessID, + ID: execution.InitProcessID, Spec: spec.Process, Console: o.Console, Stdin: o.Stdin, @@ -135,14 +132,14 @@ func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateO }, } - process, err := newProcess(ctx, processOpts) + processCtx := log.WithModule(log.WithModule(containerCtx, "process"), execution.InitProcessID) + process, err := newProcess(processCtx, processOpts) if err != nil { return nil, err } - process.ctx = log.WithModule(log.WithModule(s.ctx, "container"), id) s.monitorProcess(process) - container.AddProcess(process, true) + container.AddProcess(process) s.addContainer(container) @@ -194,7 +191,7 @@ func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error return errors.Errorf("cannot delete a container in the '%s' state", c.Status()) } - c.StateDir().Delete() + c.Cleanup() s.removeContainer(c) return nil } @@ -232,7 +229,9 @@ func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, exec: true, StartProcessOpts: o, } - process, err := newProcess(ctx, processOpts) + + processCtx := log.WithModule(log.WithModule(c.Context(), "process"), execution.InitProcessID) + process, err := newProcess(processCtx, processOpts) if err != nil { return nil, err } @@ -240,7 +239,7 @@ func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, process.status = execution.Running s.monitorProcess(process) - c.AddProcess(process, false) + c.AddProcess(process) return process, nil } @@ -250,11 +249,11 @@ func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, process := c.GetProcess(id) if process == nil { - return errors.Errorf("no such process %s", id) + return errors.Errorf("container %s has no process named %s", c.ID(), id) } err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal)) if err != nil { - return errors.Wrapf(err, "failed to send %v signal to process %v", sig, process.Pid()) + return errors.Wrapf(err, "failed to send %v signal to container %s process %v", sig, c.ID(), process.Pid()) } return err } @@ -263,13 +262,14 @@ func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}). Debug("DeleteProcess()") - c.RemoveProcess(id) - return c.StateDir().DeleteProcess(id) -} + if p := c.GetProcess(id); p != nil { + p.(*process).cleanup() -// -// -// + return c.RemoveProcess(id) + } + + return errors.Errorf("container %s has no process named %s", c.ID(), id) +} func (s *ShimRuntime) monitor() { var events [128]syscall.EpollEvent @@ -375,46 +375,56 @@ func (s *ShimRuntime) loadContainers() { continue } - stateDir, err := execution.LoadStateDir(s.root, c.Name()) + stateDir := filepath.Join(s.root, c.Name()) + containerCtx := log.WithModule(log.WithModule(s.ctx, "container"), c.Name()) + container, err := execution.LoadContainer(containerCtx, stateDir, c.Name()) if err != nil { - // We should never fail the above call unless someone - // delete the directory while we're loading - log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "statedir": s.root}). - Warn("failed to load container statedir:", err) - continue - } - bundle, err := ioutil.ReadFile(filepath.Join(string(stateDir), "bundle")) - if err != nil { - log.G(s.ctx).WithField("container", c.Name()). - Warn("failed to load container bundle path:", err) + log.G(s.ctx).WithField("container-id", c.Name()).Warn(err) continue } - container := execution.LoadContainer(stateDir, c.Name(), string(bundle), execution.Unknown) - s.addContainer(container) - - processDirs, err := stateDir.Processes() + processDirs, err := container.ProcessesStateDir() if err != nil { - log.G(s.ctx).WithField("container", c.Name()). - Warn("failed to retrieve container processes:", err) + log.G(s.ctx).WithField("container-id", c.Name()).Warn(err) continue } - for _, procStateRoot := range processDirs { - id := filepath.Base(procStateRoot) - proc, err := loadProcess(procStateRoot, id) + for processID, processStateDir := range processDirs { + processCtx := log.WithModule(log.WithModule(containerCtx, "process"), processID) + var p *process + p, err = loadProcess(processCtx, processStateDir, processID) if err != nil { - log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "process": id}). - Warn("failed to load process:", err) - s.removeContainer(container) - for _, p := range container.Processes() { - s.unmonitorProcess(p.(*process)) - } + log.G(s.ctx).WithFields(logrus.Fields{"container-id": c.Name(), "process": processID}).Warn(err) break } - proc.ctx = log.WithModule(log.WithModule(s.ctx, "container"), container.ID()) - container.AddProcess(proc, proc.ID() == initProcessID) - s.monitorProcess(proc) + if processID == execution.InitProcessID && p.status == execution.Running { + p.status = s.loadContainerStatus(container.ID()) + } + container.AddProcess(p) + } + + // if successfull, add the container to our list + if err == nil { + for _, p := range container.Processes() { + s.monitorProcess(p.(*process)) + } + s.addContainer(container) + log.G(s.ctx).Infof("restored container %s", container.ID()) } } } + +func (s *ShimRuntime) loadContainerStatus(id string) execution.Status { + cmd := exec.Command(s.runtime, append(s.runtimeArgs, "state", id)...) + out, err := cmd.CombinedOutput() + if err != nil { + return execution.Unknown + } + + var st struct{ Status string } + if err := json.NewDecoder(bytes.NewReader(out)).Decode(&st); err != nil { + return execution.Unknown + } + + return execution.Status(st.Status) +} diff --git a/execution/statedir.go b/execution/statedir.go deleted file mode 100644 index 9a7fe2a..0000000 --- a/execution/statedir.go +++ /dev/null @@ -1,82 +0,0 @@ -package execution - -import ( - "io/ioutil" - "os" - "path/filepath" - - "github.com/pkg/errors" -) - -const processesDirName = "processes" - -type StateDir string - -func LoadStateDir(root, id string) (StateDir, error) { - path := filepath.Join(root, id) - if _, err := os.Stat(path); err != nil { - return "", errors.Wrap(err, "could not find container statedir") - } - return StateDir(path), nil -} - -func NewStateDir(root, id string) (StateDir, error) { - path := filepath.Join(root, id) - if err := os.Mkdir(path, 0700); err != nil { - return "", errors.Wrap(err, "could not create container statedir") - } - if err := os.Mkdir(StateDir(path).processesDir(), 0700); err != nil { - os.RemoveAll(path) - return "", errors.Wrap(err, "could not create processes statedir") - } - return StateDir(path), nil -} - -func (s StateDir) Delete() error { - err := os.RemoveAll(string(s)) - if err != nil { - return errors.Wrapf(err, "failed to remove statedir %s", string(s)) - } - return nil -} - -func (s StateDir) NewProcess(id string) (dir string, err error) { - dir = filepath.Join(s.processesDir(), id) - if err = os.Mkdir(dir, 0700); err != nil { - return "", errors.Wrap(err, "could not create process statedir") - } - - return dir, nil -} - -func (s StateDir) ProcessDir(id string) string { - return filepath.Join(s.processesDir(), id) -} - -func (s StateDir) DeleteProcess(id string) error { - err := os.RemoveAll(filepath.Join(s.processesDir(), id)) - if err != nil { - return errors.Wrapf(err, "failed to remove process %d statedir", id) - } - return nil -} - -func (s StateDir) Processes() ([]string, error) { - procsDir := s.processesDir() - dirs, err := ioutil.ReadDir(procsDir) - if err != nil { - return nil, errors.Wrap(err, "could not list processes statedir") - } - - paths := make([]string, 0) - for _, d := range dirs { - if d.IsDir() { - paths = append(paths, filepath.Join(procsDir, d.Name())) - } - } - return paths, nil -} - -func (s StateDir) processesDir() string { - return filepath.Join(string(s), processesDirName) -} diff --git a/execution/status.go b/execution/status.go index 74c9262..afec1bf 100644 --- a/execution/status.go +++ b/execution/status.go @@ -5,6 +5,7 @@ type Status string const ( Created Status = "created" Paused Status = "paused" + Pausing Status = "pausing" Running Status = "running" Stopped Status = "stopped" Deleted Status = "deleted"