execution: add shim runtime
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
parent
37a113f9fb
commit
5b40adf9af
9 changed files with 852 additions and 21 deletions
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/docker/containerd/events"
|
"github.com/docker/containerd/events"
|
||||||
"github.com/docker/containerd/execution"
|
"github.com/docker/containerd/execution"
|
||||||
"github.com/docker/containerd/execution/executors/oci"
|
"github.com/docker/containerd/execution/executors/oci"
|
||||||
|
"github.com/docker/containerd/execution/executors/shim"
|
||||||
"github.com/docker/containerd/log"
|
"github.com/docker/containerd/log"
|
||||||
metrics "github.com/docker/go-metrics"
|
metrics "github.com/docker/go-metrics"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -103,6 +104,16 @@ high performance container runtime
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get events publisher
|
||||||
|
nec, err := getNATSPublisher(context)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer nec.Close()
|
||||||
|
ctx := log.WithModule(gocontext.Background(), "containerd")
|
||||||
|
ctx = log.WithModule(ctx, "execution")
|
||||||
|
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
|
||||||
|
|
||||||
var (
|
var (
|
||||||
executor execution.Executor
|
executor execution.Executor
|
||||||
runtime = context.GlobalString("runtime")
|
runtime = context.GlobalString("runtime")
|
||||||
|
@ -113,20 +124,20 @@ high performance container runtime
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
case "shim":
|
||||||
|
root := filepath.Join(context.GlobalString("root"), "shim")
|
||||||
|
err = os.Mkdir(root, 0700)
|
||||||
|
if err != nil && !os.IsExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
executor, err = shim.New(log.WithModule(ctx, "shim"), root, "containerd-shim", "runc", nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("oci: runtime %q not implemented", runtime)
|
return fmt.Errorf("oci: runtime %q not implemented", runtime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get events publisher
|
|
||||||
nec, err := getNATSPublisher(context)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer nec.Close()
|
|
||||||
|
|
||||||
ctx := log.WithModule(gocontext.Background(), "containerd")
|
|
||||||
ctx = log.WithModule(ctx, "execution")
|
|
||||||
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
|
|
||||||
execService, err := execution.New(ctx, executor)
|
execService, err := execution.New(ctx, executor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -16,12 +16,11 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadContainer(dir StateDir, id, bundle string, status Status, initPid int64) *Container {
|
func LoadContainer(dir StateDir, id, bundle string, status Status) *Container {
|
||||||
return &Container{
|
return &Container{
|
||||||
id: id,
|
id: id,
|
||||||
stateDir: dir,
|
stateDir: dir,
|
||||||
bundle: bundle,
|
bundle: bundle,
|
||||||
initPid: initPid,
|
|
||||||
status: status,
|
status: status,
|
||||||
processes: make(map[string]Process),
|
processes: make(map[string]Process),
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,5 +3,8 @@ package execution
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrProcessNotFound = fmt.Errorf("process not found")
|
ErrProcessNotFound = fmt.Errorf("process not found")
|
||||||
|
ErrProcessNotExited = fmt.Errorf("process has not exited")
|
||||||
|
ErrContainerNotFound = fmt.Errorf("container not found")
|
||||||
|
ErrContainerExists = fmt.Errorf("container already exists")
|
||||||
)
|
)
|
||||||
|
|
|
@ -28,7 +28,6 @@ type Executor interface {
|
||||||
Create(ctx context.Context, id string, o CreateOpts) (*Container, error)
|
Create(ctx context.Context, id string, o CreateOpts) (*Container, error)
|
||||||
Pause(context.Context, *Container) error
|
Pause(context.Context, *Container) error
|
||||||
Resume(context.Context, *Container) error
|
Resume(context.Context, *Container) error
|
||||||
Status(context.Context, *Container) (Status, error)
|
|
||||||
List(context.Context) ([]*Container, error)
|
List(context.Context) ([]*Container, error)
|
||||||
Load(ctx context.Context, id string) (*Container, error)
|
Load(ctx context.Context, id string) (*Container, error)
|
||||||
Delete(context.Context, *Container) error
|
Delete(context.Context, *Container) error
|
||||||
|
|
|
@ -122,7 +122,6 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) {
|
||||||
runcC.ID,
|
runcC.ID,
|
||||||
runcC.Bundle,
|
runcC.Bundle,
|
||||||
execution.Status(runcC.Status),
|
execution.Status(runcC.Status),
|
||||||
int64(runcC.Pid),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
dirs, err := container.StateDir().Processes()
|
dirs, err := container.StateDir().Processes()
|
||||||
|
|
381
execution/executors/shim/process.go
Normal file
381
execution/executors/shim/process.go
Normal file
|
@ -0,0 +1,381 @@
|
||||||
|
package shim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/docker/containerd/execution"
|
||||||
|
"github.com/docker/containerd/log"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
|
||||||
|
runc "github.com/crosbymichael/go-runc"
|
||||||
|
starttime "github.com/opencontainers/runc/libcontainer/system"
|
||||||
|
)
|
||||||
|
|
||||||
|
type newProcessOpts struct {
|
||||||
|
shimBinary string
|
||||||
|
runtime string
|
||||||
|
runtimeArgs []string
|
||||||
|
container *execution.Container
|
||||||
|
exec bool
|
||||||
|
execution.StartProcessOpts
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProcess(ctx context.Context, o newProcessOpts) (*process, error) {
|
||||||
|
procStateDir, err := o.container.StateDir().NewProcess(o.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
exitPipe, controlPipe, err := getControlPipes(procStateDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
exitPipe.Close()
|
||||||
|
controlPipe.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cmd, err := newShim(o, procStateDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
cmd.Process.Kill()
|
||||||
|
cmd.Wait()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
abortCh := make(chan syscall.WaitStatus, 1)
|
||||||
|
go func() {
|
||||||
|
var shimStatus syscall.WaitStatus
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
shimStatus = execution.UnknownStatusCode
|
||||||
|
} else {
|
||||||
|
shimStatus = cmd.ProcessState.Sys().(syscall.WaitStatus)
|
||||||
|
}
|
||||||
|
abortCh <- shimStatus
|
||||||
|
close(abortCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
process := &process{
|
||||||
|
root: procStateDir,
|
||||||
|
id: o.ID,
|
||||||
|
exitChan: make(chan struct{}),
|
||||||
|
exitPipe: exitPipe,
|
||||||
|
controlPipe: controlPipe,
|
||||||
|
}
|
||||||
|
|
||||||
|
pid, stime, status, err := waitForPid(ctx, abortCh, procStateDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
process.pid = int64(pid)
|
||||||
|
process.status = status
|
||||||
|
process.startTime = stime
|
||||||
|
|
||||||
|
return process, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadProcess(root, id string) (*process, error) {
|
||||||
|
pid, err := runc.ReadPidFile(filepath.Join(root, pidFilename))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stime, err := ioutil.ReadFile(filepath.Join(root, startTimeFilename))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
path := filepath.Join(root, exitPipeFilename)
|
||||||
|
exitPipe, err := os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
exitPipe.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
path = filepath.Join(root, controlPipeFilename)
|
||||||
|
controlPipe, err := os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
controlPipe.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
p := &process{
|
||||||
|
root: root,
|
||||||
|
id: id,
|
||||||
|
pid: int64(pid),
|
||||||
|
exitChan: make(chan struct{}),
|
||||||
|
exitPipe: exitPipe,
|
||||||
|
controlPipe: controlPipe,
|
||||||
|
startTime: string(stime),
|
||||||
|
// TODO: status may need to be stored on disk to handle
|
||||||
|
// Created state for init (i.e. a Start is needed to run the
|
||||||
|
// container)
|
||||||
|
status: execution.Running,
|
||||||
|
}
|
||||||
|
|
||||||
|
markAsStopped := func(p *process) (*process, error) {
|
||||||
|
p.setStatus(execution.Stopped)
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = syscall.Kill(pid, 0); err != nil {
|
||||||
|
if err == syscall.ESRCH {
|
||||||
|
return markAsStopped(p)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cstime, err := starttime.GetProcessStartTime(pid)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return markAsStopped(p)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.startTime != cstime {
|
||||||
|
return markAsStopped(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type process struct {
|
||||||
|
root string
|
||||||
|
id string
|
||||||
|
pid int64
|
||||||
|
exitChan chan struct{}
|
||||||
|
exitPipe *os.File
|
||||||
|
controlPipe *os.File
|
||||||
|
startTime string
|
||||||
|
status execution.Status
|
||||||
|
ctx context.Context
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) ID() string {
|
||||||
|
return p.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Pid() int64 {
|
||||||
|
return p.pid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Wait() (uint32, error) {
|
||||||
|
<-p.exitChan
|
||||||
|
|
||||||
|
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
|
||||||
|
Debugf("wait is over")
|
||||||
|
|
||||||
|
// Cleanup those fds
|
||||||
|
p.exitPipe.Close()
|
||||||
|
p.controlPipe.Close()
|
||||||
|
|
||||||
|
// If the container process is still alive, it means the shim crashed
|
||||||
|
// and the child process had updated it PDEATHSIG to something
|
||||||
|
// else than SIGKILL. Or that epollCtl failed
|
||||||
|
if p.isAlive() {
|
||||||
|
err := syscall.Kill(int(p.pid), syscall.SIGKILL)
|
||||||
|
if err != nil {
|
||||||
|
return execution.UnknownStatusCode, errors.Wrap(err, "failed to kill process")
|
||||||
|
}
|
||||||
|
|
||||||
|
return uint32(128 + int(syscall.SIGKILL)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := ioutil.ReadFile(filepath.Join(p.root, exitStatusFilename))
|
||||||
|
if err != nil {
|
||||||
|
return execution.UnknownStatusCode, errors.Wrap(err, "failed to read process exit status")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
return execution.UnknownStatusCode, errors.New(execution.ErrProcessNotExited.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := strconv.Atoi(string(data))
|
||||||
|
if err != nil {
|
||||||
|
return execution.UnknownStatusCode, errors.Wrapf(err, "failed to parse exit status")
|
||||||
|
}
|
||||||
|
|
||||||
|
p.setStatus(execution.Stopped)
|
||||||
|
return uint32(status), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Signal(sig os.Signal) error {
|
||||||
|
err := syscall.Kill(int(p.pid), sig.(syscall.Signal))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to signal process")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Status() execution.Status {
|
||||||
|
p.mu.Lock()
|
||||||
|
s := p.status
|
||||||
|
p.mu.Unlock()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) setStatus(s execution.Status) {
|
||||||
|
p.mu.Lock()
|
||||||
|
p.status = s
|
||||||
|
p.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) isAlive() bool {
|
||||||
|
if err := syscall.Kill(int(p.pid), 0); err != nil {
|
||||||
|
if err == syscall.ESRCH {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
|
||||||
|
Warnf("kill(0) failed: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that we have the same startttime
|
||||||
|
stime, err := starttime.GetProcessStartTime(int(p.pid))
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
log.G(p.ctx).WithFields(logrus.Fields{"process-id": p.ID(), "pid": p.pid}).
|
||||||
|
Warnf("failed to get process start time: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.startTime != stime {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForPid(ctx context.Context, abortCh chan syscall.WaitStatus, root string) (pid int, stime string, status execution.Status, err error) {
|
||||||
|
status = execution.Unknown
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case wait := <-abortCh:
|
||||||
|
if wait.Signaled() {
|
||||||
|
err = errors.Errorf("shim died prematurarily: %v", wait.Signal())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = errors.Errorf("shim exited prematurarily with exit code %v", wait.ExitStatus())
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
pid, err = runc.ReadPidFile(filepath.Join(root, pidFilename))
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
} else if !os.IsNotExist(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
status = execution.Created
|
||||||
|
stime, err = starttime.GetProcessStartTime(pid)
|
||||||
|
switch {
|
||||||
|
case os.IsNotExist(err):
|
||||||
|
status = execution.Stopped
|
||||||
|
case err != nil:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
var b []byte
|
||||||
|
path := filepath.Join(root, startTimeFilename)
|
||||||
|
b, err = ioutil.ReadFile(path)
|
||||||
|
switch {
|
||||||
|
case os.IsNotExist(err):
|
||||||
|
err = ioutil.WriteFile(path, []byte(stime), 0600)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case err != nil:
|
||||||
|
err = errors.Wrapf(err, "failed to get start time for pid %d", pid)
|
||||||
|
return
|
||||||
|
case string(b) != stime:
|
||||||
|
status = execution.Stopped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pid, stime, status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newShim(o newProcessOpts, workDir string) (*exec.Cmd, error) {
|
||||||
|
cmd := exec.Command(o.shimBinary, o.container.ID(), o.container.Bundle(), o.runtime)
|
||||||
|
cmd.Dir = workDir
|
||||||
|
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||||
|
Setpgid: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
state := processState{
|
||||||
|
Process: o.Spec,
|
||||||
|
Exec: o.exec,
|
||||||
|
Stdin: o.Stdin,
|
||||||
|
Stdout: o.Stdout,
|
||||||
|
Stderr: o.Stderr,
|
||||||
|
RuntimeArgs: o.runtimeArgs,
|
||||||
|
NoPivotRoot: false,
|
||||||
|
CheckpointPath: "",
|
||||||
|
RootUID: int(o.Spec.User.UID),
|
||||||
|
RootGID: int(o.Spec.User.GID),
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Create(filepath.Join(workDir, "process.json"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to create shim's process.json for container %s", o.container.ID())
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
if err := json.NewEncoder(f).Encode(state); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to create shim's processState for container %s", o.container.ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to start shim for container %s", o.container.ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getControlPipes(root string) (exitPipe *os.File, controlPipe *os.File, err error) {
|
||||||
|
path := filepath.Join(root, exitPipeFilename)
|
||||||
|
if err = unix.Mkfifo(path, 0700); err != nil {
|
||||||
|
return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim exit fifo")
|
||||||
|
}
|
||||||
|
if exitPipe, err = os.OpenFile(path, syscall.O_RDONLY|syscall.O_NONBLOCK, 0); err != nil {
|
||||||
|
return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim exit fifo")
|
||||||
|
}
|
||||||
|
|
||||||
|
path = filepath.Join(root, controlPipeFilename)
|
||||||
|
if err = unix.Mkfifo(path, 0700); err != nil {
|
||||||
|
return exitPipe, controlPipe, errors.Wrap(err, "failed to create shim control fifo")
|
||||||
|
}
|
||||||
|
if controlPipe, err = os.OpenFile(path, syscall.O_RDWR|syscall.O_NONBLOCK, 0); err != nil {
|
||||||
|
return exitPipe, controlPipe, errors.Wrap(err, "failed to open shim control fifo")
|
||||||
|
}
|
||||||
|
|
||||||
|
return exitPipe, controlPipe, nil
|
||||||
|
}
|
420
execution/executors/shim/shim.go
Normal file
420
execution/executors/shim/shim.go
Normal file
|
@ -0,0 +1,420 @@
|
||||||
|
package shim
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/docker/containerd/execution"
|
||||||
|
"github.com/docker/containerd/log"
|
||||||
|
"github.com/opencontainers/runtime-spec/specs-go"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultShimBinary = "containerd-shim"
|
||||||
|
|
||||||
|
pidFilename = "pid"
|
||||||
|
startTimeFilename = "starttime"
|
||||||
|
exitPipeFilename = "exit"
|
||||||
|
controlPipeFilename = "control"
|
||||||
|
initProcessID = "init"
|
||||||
|
exitStatusFilename = "exitStatus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(ctx context.Context, root, shim, runtime string, runtimeArgs []string) (*ShimRuntime, error) {
|
||||||
|
fd, err := syscall.EpollCreate1(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "epollcreate1 failed")
|
||||||
|
}
|
||||||
|
s := &ShimRuntime{
|
||||||
|
ctx: ctx,
|
||||||
|
epollFd: fd,
|
||||||
|
root: root,
|
||||||
|
binaryName: shim,
|
||||||
|
runtime: runtime,
|
||||||
|
runtimeArgs: runtimeArgs,
|
||||||
|
exitChannels: make(map[int]*process),
|
||||||
|
containers: make(map[string]*execution.Container),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.loadContainers()
|
||||||
|
|
||||||
|
go s.monitor()
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ShimRuntime struct {
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
mutex sync.Mutex
|
||||||
|
exitChannels map[int]*process
|
||||||
|
containers map[string]*execution.Container
|
||||||
|
|
||||||
|
epollFd int
|
||||||
|
root string
|
||||||
|
binaryName string
|
||||||
|
runtime string
|
||||||
|
runtimeArgs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProcessOpts struct {
|
||||||
|
Bundle string
|
||||||
|
Terminal bool
|
||||||
|
Stdin string
|
||||||
|
Stdout string
|
||||||
|
Stderr string
|
||||||
|
}
|
||||||
|
|
||||||
|
type processState struct {
|
||||||
|
specs.Process
|
||||||
|
Exec bool `json:"exec"`
|
||||||
|
Stdin string `json:"containerdStdin"`
|
||||||
|
Stdout string `json:"containerdStdout"`
|
||||||
|
Stderr string `json:"containerdStderr"`
|
||||||
|
RuntimeArgs []string `json:"runtimeArgs"`
|
||||||
|
NoPivotRoot bool `json:"noPivotRoot"`
|
||||||
|
CheckpointPath string `json:"checkpoint"`
|
||||||
|
RootUID int `json:"rootUID"`
|
||||||
|
RootGID int `json:"rootGID"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Create(ctx context.Context, id string, o execution.CreateOpts) (*execution.Container, error) {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id, "options": o}).Debug("Create()")
|
||||||
|
|
||||||
|
if s.getContainer(id) != nil {
|
||||||
|
return nil, execution.ErrContainerExists
|
||||||
|
}
|
||||||
|
|
||||||
|
container, err := execution.NewContainer(s.root, id, o.Bundle)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
container.StateDir().Delete()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(filepath.Join(string(container.StateDir()), "bundle"), []byte(o.Bundle), 0600)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to save bundle path to disk")
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract Process spec from bundle's config.json
|
||||||
|
var spec specs.Spec
|
||||||
|
f, err := os.Open(filepath.Join(o.Bundle, "config.json"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to open config.json")
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
if err := json.NewDecoder(f).Decode(&spec); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to decode container OCI specs")
|
||||||
|
}
|
||||||
|
|
||||||
|
processOpts := newProcessOpts{
|
||||||
|
shimBinary: s.binaryName,
|
||||||
|
runtime: s.runtime,
|
||||||
|
runtimeArgs: s.runtimeArgs,
|
||||||
|
container: container,
|
||||||
|
exec: false,
|
||||||
|
StartProcessOpts: execution.StartProcessOpts{
|
||||||
|
ID: initProcessID,
|
||||||
|
Spec: spec.Process,
|
||||||
|
Console: o.Console,
|
||||||
|
Stdin: o.Stdin,
|
||||||
|
Stdout: o.Stdout,
|
||||||
|
Stderr: o.Stderr,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
process, err := newProcess(ctx, processOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
process.ctx = log.WithModule(log.WithModule(s.ctx, "container"), id)
|
||||||
|
|
||||||
|
s.monitorProcess(process)
|
||||||
|
container.AddProcess(process, true)
|
||||||
|
|
||||||
|
s.addContainer(container)
|
||||||
|
|
||||||
|
return container, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Start(ctx context.Context, c *execution.Container) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Start()")
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "start", c.ID())...)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "'%s start' failed with output: %v", s.runtime, string(out))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) List(ctx context.Context) ([]*execution.Container, error) {
|
||||||
|
log.G(s.ctx).Debug("List()")
|
||||||
|
|
||||||
|
containers := make([]*execution.Container, 0)
|
||||||
|
s.mutex.Lock()
|
||||||
|
for _, c := range s.containers {
|
||||||
|
containers = append(containers, c)
|
||||||
|
}
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
return containers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Load(ctx context.Context, id string) (*execution.Container, error) {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container-id": id}).Debug("Start()")
|
||||||
|
|
||||||
|
s.mutex.Lock()
|
||||||
|
c, ok := s.containers[id]
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New(execution.ErrContainerNotFound.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Delete(ctx context.Context, c *execution.Container) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Delete()")
|
||||||
|
|
||||||
|
if c.Status() != execution.Stopped {
|
||||||
|
return errors.Errorf("cannot delete a container in the '%s' state", c.Status())
|
||||||
|
}
|
||||||
|
|
||||||
|
c.StateDir().Delete()
|
||||||
|
s.removeContainer(c)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Pause(ctx context.Context, c *execution.Container) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Pause()")
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "pause", c.ID())...)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "'%s pause' failed with output: %v", s.runtime, string(out))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) Resume(ctx context.Context, c *execution.Container) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c}).Debug("Resume()")
|
||||||
|
|
||||||
|
cmd := exec.CommandContext(ctx, s.runtime, append(s.runtimeArgs, "resume", c.ID())...)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "'%s resume' failed with output: %v", s.runtime, string(out))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) StartProcess(ctx context.Context, c *execution.Container, o execution.StartProcessOpts) (p execution.Process, err error) {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "options": o}).Debug("StartProcess()")
|
||||||
|
|
||||||
|
processOpts := newProcessOpts{
|
||||||
|
shimBinary: s.binaryName,
|
||||||
|
runtime: s.runtime,
|
||||||
|
runtimeArgs: s.runtimeArgs,
|
||||||
|
container: c,
|
||||||
|
exec: true,
|
||||||
|
StartProcessOpts: o,
|
||||||
|
}
|
||||||
|
process, err := newProcess(ctx, processOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
process.status = execution.Running
|
||||||
|
s.monitorProcess(process)
|
||||||
|
|
||||||
|
c.AddProcess(process, false)
|
||||||
|
return process, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) SignalProcess(ctx context.Context, c *execution.Container, id string, sig os.Signal) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id, "signal": sig}).
|
||||||
|
Debug("SignalProcess()")
|
||||||
|
|
||||||
|
process := c.GetProcess(id)
|
||||||
|
if process == nil {
|
||||||
|
return errors.Errorf("no such process %s", id)
|
||||||
|
}
|
||||||
|
err := syscall.Kill(int(process.Pid()), sig.(syscall.Signal))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to send %v signal to process %v", sig, process.Pid())
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) DeleteProcess(ctx context.Context, c *execution.Container, id string) error {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c, "process-id": id}).
|
||||||
|
Debug("DeleteProcess()")
|
||||||
|
|
||||||
|
c.RemoveProcess(id)
|
||||||
|
return c.StateDir().DeleteProcess(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
func (s *ShimRuntime) monitor() {
|
||||||
|
var events [128]syscall.EpollEvent
|
||||||
|
for {
|
||||||
|
n, err := syscall.EpollWait(s.epollFd, events[:], -1)
|
||||||
|
if err != nil {
|
||||||
|
if err == syscall.EINTR {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.G(s.ctx).Error("epollwait failed:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
fd := int(events[i].Fd)
|
||||||
|
|
||||||
|
s.mutex.Lock()
|
||||||
|
p := s.exitChannels[fd]
|
||||||
|
delete(s.exitChannels, fd)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
if err = syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
|
||||||
|
Events: syscall.EPOLLHUP,
|
||||||
|
Fd: int32(fd),
|
||||||
|
}); err != nil {
|
||||||
|
log.G(s.ctx).Error("epollctl deletion failed:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(p.exitChan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) addContainer(c *execution.Container) {
|
||||||
|
s.mutex.Lock()
|
||||||
|
s.containers[c.ID()] = c
|
||||||
|
s.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) removeContainer(c *execution.Container) {
|
||||||
|
s.mutex.Lock()
|
||||||
|
delete(s.containers, c.ID())
|
||||||
|
s.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) getContainer(id string) *execution.Container {
|
||||||
|
s.mutex.Lock()
|
||||||
|
c := s.containers[id]
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitorProcess adds a process to the list of monitored process if
|
||||||
|
// we fail to do so, we closed the exitChan channel used by Wait().
|
||||||
|
// Since service always call on Wait() for generating "exit" events,
|
||||||
|
// this will ensure the process gets killed
|
||||||
|
func (s *ShimRuntime) monitorProcess(p *process) {
|
||||||
|
if p.status == execution.Stopped {
|
||||||
|
close(p.exitChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fd := int(p.exitPipe.Fd())
|
||||||
|
event := syscall.EpollEvent{
|
||||||
|
Fd: int32(fd),
|
||||||
|
Events: syscall.EPOLLHUP,
|
||||||
|
}
|
||||||
|
s.mutex.Lock()
|
||||||
|
s.exitChannels[fd] = p
|
||||||
|
s.mutex.Unlock()
|
||||||
|
if err := syscall.EpollCtl(s.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
|
||||||
|
s.mutex.Lock()
|
||||||
|
delete(s.exitChannels, fd)
|
||||||
|
s.mutex.Unlock()
|
||||||
|
close(p.exitChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: take care of the OOM handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) unmonitorProcess(p *process) {
|
||||||
|
s.mutex.Lock()
|
||||||
|
for fd, proc := range s.exitChannels {
|
||||||
|
if proc == p {
|
||||||
|
delete(s.exitChannels, fd)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ShimRuntime) loadContainers() {
|
||||||
|
cs, err := ioutil.ReadDir(s.root)
|
||||||
|
if err != nil {
|
||||||
|
log.G(s.ctx).WithField("statedir", s.root).
|
||||||
|
Warn("failed to load containers, state dir cannot be listed:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cs {
|
||||||
|
if !c.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stateDir, err := execution.LoadStateDir(s.root, c.Name())
|
||||||
|
if err != nil {
|
||||||
|
// We should never fail the above call unless someone
|
||||||
|
// delete the directory while we're loading
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "statedir": s.root}).
|
||||||
|
Warn("failed to load container statedir:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bundle, err := ioutil.ReadFile(filepath.Join(string(stateDir), "bundle"))
|
||||||
|
if err != nil {
|
||||||
|
log.G(s.ctx).WithField("container", c.Name()).
|
||||||
|
Warn("failed to load container bundle path:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
container := execution.LoadContainer(stateDir, c.Name(), string(bundle), execution.Unknown)
|
||||||
|
s.addContainer(container)
|
||||||
|
|
||||||
|
processDirs, err := stateDir.Processes()
|
||||||
|
if err != nil {
|
||||||
|
log.G(s.ctx).WithField("container", c.Name()).
|
||||||
|
Warn("failed to retrieve container processes:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, procStateRoot := range processDirs {
|
||||||
|
id := filepath.Base(procStateRoot)
|
||||||
|
proc, err := loadProcess(procStateRoot, id)
|
||||||
|
if err != nil {
|
||||||
|
log.G(s.ctx).WithFields(logrus.Fields{"container": c.Name(), "process": id}).
|
||||||
|
Warn("failed to load process:", err)
|
||||||
|
s.removeContainer(container)
|
||||||
|
for _, p := range container.Processes() {
|
||||||
|
s.unmonitorProcess(p.(*process))
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
proc.ctx = log.WithModule(log.WithModule(s.ctx, "container"), container.ID())
|
||||||
|
container.AddProcess(proc, proc.ID() == initProcessID)
|
||||||
|
s.monitorProcess(proc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,32 +4,46 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const processesDirName = "processes"
|
const processesDirName = "processes"
|
||||||
|
|
||||||
type StateDir string
|
type StateDir string
|
||||||
|
|
||||||
|
func LoadStateDir(root, id string) (StateDir, error) {
|
||||||
|
path := filepath.Join(root, id)
|
||||||
|
if _, err := os.Stat(path); err != nil {
|
||||||
|
return "", errors.Wrap(err, "could not find container statedir")
|
||||||
|
}
|
||||||
|
return StateDir(path), nil
|
||||||
|
}
|
||||||
|
|
||||||
func NewStateDir(root, id string) (StateDir, error) {
|
func NewStateDir(root, id string) (StateDir, error) {
|
||||||
path := filepath.Join(root, id)
|
path := filepath.Join(root, id)
|
||||||
if err := os.Mkdir(path, 0700); err != nil {
|
if err := os.Mkdir(path, 0700); err != nil {
|
||||||
return "", err
|
return "", errors.Wrap(err, "could not create container statedir")
|
||||||
}
|
}
|
||||||
if err := os.Mkdir(StateDir(path).processesDir(), 0700); err != nil {
|
if err := os.Mkdir(StateDir(path).processesDir(), 0700); err != nil {
|
||||||
os.RemoveAll(path)
|
os.RemoveAll(path)
|
||||||
return "", err
|
return "", errors.Wrap(err, "could not create processes statedir")
|
||||||
}
|
}
|
||||||
return StateDir(path), nil
|
return StateDir(path), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StateDir) Delete() error {
|
func (s StateDir) Delete() error {
|
||||||
return os.RemoveAll(string(s))
|
err := os.RemoveAll(string(s))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to remove statedir %s", string(s))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StateDir) NewProcess(id string) (dir string, err error) {
|
func (s StateDir) NewProcess(id string) (dir string, err error) {
|
||||||
dir = filepath.Join(s.processesDir(), id)
|
dir = filepath.Join(s.processesDir(), id)
|
||||||
if err = os.Mkdir(dir, 0700); err != nil {
|
if err = os.Mkdir(dir, 0700); err != nil {
|
||||||
return "", err
|
return "", errors.Wrap(err, "could not create process statedir")
|
||||||
}
|
}
|
||||||
|
|
||||||
return dir, nil
|
return dir, nil
|
||||||
|
@ -40,14 +54,18 @@ func (s StateDir) ProcessDir(id string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StateDir) DeleteProcess(id string) error {
|
func (s StateDir) DeleteProcess(id string) error {
|
||||||
return os.RemoveAll(filepath.Join(s.processesDir(), id))
|
err := os.RemoveAll(filepath.Join(s.processesDir(), id))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to remove process %d statedir", id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StateDir) Processes() ([]string, error) {
|
func (s StateDir) Processes() ([]string, error) {
|
||||||
procsDir := s.processesDir()
|
procsDir := s.processesDir()
|
||||||
dirs, err := ioutil.ReadDir(procsDir)
|
dirs, err := ioutil.ReadDir(procsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, "could not list processes statedir")
|
||||||
}
|
}
|
||||||
|
|
||||||
paths := make([]string, 0)
|
paths := make([]string, 0)
|
||||||
|
|
|
@ -8,6 +8,7 @@ const (
|
||||||
Running Status = "running"
|
Running Status = "running"
|
||||||
Stopped Status = "stopped"
|
Stopped Status = "stopped"
|
||||||
Deleted Status = "deleted"
|
Deleted Status = "deleted"
|
||||||
|
Unknown Status = "unknown"
|
||||||
|
|
||||||
UnknownStatusCode = 255
|
UnknownStatusCode = 255
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue