Add working shim exec driver for start

Still need to implement a working Wait() on the process using epoll
because of the non-blocking exit fifo

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-10-06 11:39:12 -07:00
parent c76f883ccd
commit 90e4f130c8
6 changed files with 223 additions and 76 deletions

View file

@ -69,12 +69,12 @@ func start(log *os.File) error {
// open the exit pipe // open the exit pipe
f, err := os.OpenFile("exit", syscall.O_WRONLY, 0) f, err := os.OpenFile("exit", syscall.O_WRONLY, 0)
if err != nil { if err != nil {
return err return fmt.Errorf("open exit fifo %s", err)
} }
defer f.Close() defer f.Close()
control, err := os.OpenFile("control", syscall.O_RDWR, 0) control, err := os.OpenFile("control", syscall.O_RDWR, 0)
if err != nil { if err != nil {
return err return fmt.Errorf("open control fifo %s", err)
} }
defer control.Close() defer control.Close()
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2)) p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
@ -150,7 +150,7 @@ func start(log *os.File) error {
term.SetWinsize(p.console.Fd(), &ws) term.SetWinsize(p.console.Fd(), &ws)
case 2: case 2:
// signal // signal
if err := syscall.Kill(p.pid(), msg.Width); err != nil { if err := syscall.Kill(p.pid(), syscall.Signal(msg.Width)); err != nil {
writeMessage(log, "warn", err) writeMessage(log, "warn", err)
} }
} }

View file

@ -39,9 +39,9 @@ type checkpoint struct {
type processState struct { type processState struct {
specs.ProcessSpec specs.ProcessSpec
Exec bool `json:"exec"` Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"` Stdin string `json:"stdin"`
Stdout string `json:"containerdStdout"` Stdout string `json:"stdout"`
Stderr string `json:"containerdStderr"` Stderr string `json:"stderr"`
RuntimeArgs []string `json:"runtimeArgs"` RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"` NoPivotRoot bool `json:"noPivotRoot"`
CheckpointPath string `json:"checkpoint"` CheckpointPath string `json:"checkpoint"`
@ -74,7 +74,7 @@ func newProcess(id, bundle, runtimeName string) (*process, error) {
} }
s, err := loadProcess() s, err := loadProcess()
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("load process from json %s", err)
} }
p.state = s p.state = s
if s.CheckpointPath != "" { if s.CheckpointPath != "" {
@ -86,7 +86,7 @@ func newProcess(id, bundle, runtimeName string) (*process, error) {
p.checkpointPath = s.CheckpointPath p.checkpointPath = s.CheckpointPath
} }
if err := p.openIO(); err != nil { if err := p.openIO(); err != nil {
return nil, err return nil, fmt.Errorf("open IO for container %s", err)
} }
return p, nil return p, nil
} }

View file

@ -1,21 +1,26 @@
package main package main
import ( import (
"flag"
"fmt" "fmt"
"os" "os"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/shim"
"github.com/docker/containerkit" "github.com/docker/containerkit"
"github.com/docker/containerkit/oci"
"github.com/docker/containerkit/osutils" "github.com/docker/containerkit/osutils"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
) )
func runContainer() error { func runContainer() error {
// create a new runc runtime that implements the ExecutionDriver interface // create a new runtime runtime that implements the ExecutionDriver interface
runc, err := oci.New(oci.Opts{ runtime, err := shim.New(shim.Opts{
Root: "/run/runc", Root: "/run/cshim/test",
Name: "runc", Name: "containerd-shim",
RuntimeName: "runc",
RuntimeRoot: "/run/runc",
Timeout: 5 * time.Second,
}) })
if err != nil { if err != nil {
return err return err
@ -23,14 +28,14 @@ func runContainer() error {
dockerContainer := &testConfig{} dockerContainer := &testConfig{}
// create a new container // create a new container
container, err := containerkit.NewContainer(dockerContainer, NewBindDriver(), runc) container, err := containerkit.NewContainer(dockerContainer, NewBindDriver(), runtime)
if err != nil { if err != nil {
return err return err
} }
// setup some stdio for our container // setup some stdio for our container
container.Stdin = os.Stdin container.Stdin = Stdin()
container.Stdout = os.Stdout container.Stdout = Stdout()
container.Stderr = os.Stderr container.Stderr = Stderr()
// go ahead and set the container in the create state and have it ready to start // go ahead and set the container in the create state and have it ready to start
logrus.Info("create container") logrus.Info("create container")
@ -44,6 +49,7 @@ func runContainer() error {
return err return err
} }
if exec {
// start 10 exec processes giving the go var i to exec to stdout // start 10 exec processes giving the go var i to exec to stdout
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
process, err := container.NewProcess(&specs.Process{ process, err := container.NewProcess(&specs.Process{
@ -64,18 +70,19 @@ func runContainer() error {
if err := process.Start(); err != nil { if err := process.Start(); err != nil {
return err return err
} }
procStatus, err := process.Wait() procStatus, err := process.Wait()
if err != nil { if err != nil {
return err return err
} }
logrus.Infof("process %d returned with %d", i, procStatus) logrus.Infof("process %d returned with %d", i, procStatus)
} }
}
container, err = containerkit.LoadContainer(dockerContainer, runc) if load {
if err != nil { if container, err = containerkit.LoadContainer(dockerContainer, runtime); err != nil {
return err return err
} }
}
// wait for it to exit and get the exit status // wait for it to exit and get the exit status
logrus.Info("wait container") logrus.Info("wait container")
@ -93,8 +100,16 @@ func runContainer() error {
return nil return nil
} }
var (
exec bool
load bool
)
// "Hooks do optional work. Drivers do mandatory work" // "Hooks do optional work. Drivers do mandatory work"
func main() { func main() {
flag.BoolVar(&exec, "exec", false, "run the execs")
flag.BoolVar(&load, "load", false, "reload the container")
flag.Parse()
if err := osutils.SetSubreaper(1); err != nil { if err := osutils.SetSubreaper(1); err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }

View file

@ -1,8 +1,12 @@
package main package main
import ( import (
"os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"syscall"
"golang.org/x/sys/unix"
"github.com/docker/containerkit" "github.com/docker/containerkit"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
@ -177,3 +181,48 @@ func (t *testConfig) Spec(m *containerkit.Mount) (*specs.Spec, error) {
}, },
}, nil }, nil
} }
func Stdin() *os.File {
abs, err := filepath.Abs("stdin")
if err != nil {
panic(err)
}
if err := unix.Mkfifo(abs, 0755); err != nil && !os.IsExist(err) {
panic(err)
}
f, err := os.OpenFile(abs, syscall.O_RDWR, 0)
if err != nil {
panic(err)
}
return f
}
func Stdout() *os.File {
abs, err := filepath.Abs("stdout")
if err != nil {
panic(err)
}
if err := unix.Mkfifo(abs, 0755); err != nil && !os.IsExist(err) {
panic(err)
}
f, err := os.OpenFile(abs, syscall.O_RDWR, 0)
if err != nil {
panic(err)
}
return f
}
func Stderr() *os.File {
abs, err := filepath.Abs("stderr")
if err != nil {
panic(err)
}
if err := unix.Mkfifo(abs, 0755); err != nil && !os.IsExist(err) {
panic(err)
}
f, err := os.OpenFile(abs, syscall.O_RDWR, 0)
if err != nil {
panic(err)
}
return f
}

View file

@ -32,6 +32,9 @@ var (
const UnknownStatus = 255 const UnknownStatus = 255
func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerkit.Container, cmd *exec.Cmd) (*process, error) { func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerkit.Container, cmd *exec.Cmd) (*process, error) {
if err := os.Mkdir(root, 0711); err != nil {
return nil, err
}
var ( var (
spec = c.Spec() spec = c.Spec()
stdin, stdout, stderr string stdin, stdout, stderr string
@ -98,7 +101,6 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki
return p, nil return p, nil
} }
// TODO: control and exit fifo
type process struct { type process struct {
root string root string
cmd *exec.Cmd cmd *exec.Cmd
@ -106,7 +108,7 @@ type process struct {
success bool success bool
startTime string startTime string
mu sync.Mutex mu sync.Mutex
containerPid int pid int
exit *os.File exit *os.File
control *os.File control *os.File
@ -129,6 +131,8 @@ type processState struct {
Checkpoint string `json:"checkpoint"` Checkpoint string `json:"checkpoint"`
NoPivotRoot bool `json:"noPivotRoot"` NoPivotRoot bool `json:"noPivotRoot"`
RuntimeArgs []string `json:"runtimeArgs"` RuntimeArgs []string `json:"runtimeArgs"`
Root string `json:"root"`
StartTime string `json:"startTime"`
// Stdin fifo filepath // Stdin fifo filepath
Stdin string `json:"stdin"` Stdin string `json:"stdin"`
// Stdout fifo filepath // Stdout fifo filepath
@ -148,6 +152,8 @@ func (p *process) MarshalJSON() ([]byte, error) {
Stdin: p.stdin, Stdin: p.stdin,
Stdout: p.stdout, Stdout: p.stdout,
Stderr: p.stderr, Stderr: p.stderr,
Root: p.root,
StartTime: p.startTime,
} }
return json.Marshal(ps) return json.Marshal(ps)
} }
@ -166,17 +172,35 @@ func (p *process) UnmarshalJSON(b []byte) error {
p.stdin = ps.Stdin p.stdin = ps.Stdin
p.stdout = ps.Stdout p.stdout = ps.Stdout
p.stderr = ps.Stderr p.stderr = ps.Stderr
// TODO: restore pid/exit/control and stuff ? p.root = ps.Root
p.startTime = ps.StartTime
pid, err := readPid(filepath.Join(p.root, "pid"))
if err != nil {
return err
}
p.pid = pid
exit, err := getExitPipe(filepath.Join(p.root, "exit"))
if err != nil {
return err
}
control, err := getControlPipe(filepath.Join(p.root, "control"))
if err != nil {
return err
}
p.exit, p.control = exit, control
return nil return nil
} }
func (p *process) Pid() int { func (p *process) Pid() int {
return p.containerPid return p.pid
}
func (p *process) FD() int {
return int(p.exit.Fd())
} }
func (p *process) Wait() (rst uint32, rerr error) { func (p *process) Wait() (rst uint32, rerr error) {
var b []byte if _, err := ioutil.ReadAll(p.exit); err != nil {
if _, err := p.exit.Read(b); err != nil {
return 255, err return 255, err
} }
data, err := ioutil.ReadFile(filepath.Join(p.root, "exitStatus")) data, err := ioutil.ReadFile(filepath.Join(p.root, "exitStatus"))
@ -227,15 +251,15 @@ func (p *process) checkExited() {
} }
if same, _ := p.same(); same && p.hasPid() { if same, _ := p.same(); same && p.hasPid() {
// The process changed its PR_SET_PDEATHSIG, so force kill it // The process changed its PR_SET_PDEATHSIG, so force kill it
logrus.Infof("containerd: (pid %v) has become an orphan, killing it", p.containerPid) logrus.Infof("containerd: (pid %v) has become an orphan, killing it", p.pid)
if err := unix.Kill(p.containerPid, syscall.SIGKILL); err != nil && err != syscall.ESRCH { if err := unix.Kill(p.pid, syscall.SIGKILL); err != nil && err != syscall.ESRCH {
logrus.Errorf("containerd: unable to SIGKILL (pid %v): %v", p.containerPid, err) logrus.Errorf("containerd: unable to SIGKILL (pid %v): %v", p.pid, err)
close(p.done) close(p.done)
return return
} }
// wait for the container process to exit // wait for the container process to exit
for { for {
if err := unix.Kill(p.containerPid, 0); err != nil { if err := unix.Kill(p.pid, 0); err != nil {
break break
} }
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
@ -246,14 +270,14 @@ func (p *process) checkExited() {
func (p *process) hasPid() bool { func (p *process) hasPid() bool {
p.mu.Lock() p.mu.Lock()
r := p.containerPid > 0 r := p.pid > 0
p.mu.Unlock() p.mu.Unlock()
return r return r
} }
func (p *process) setPid(pid int) { func (p *process) setPid(pid int) {
p.mu.Lock() p.mu.Lock()
p.containerPid = pid p.pid = pid
p.mu.Unlock() p.mu.Unlock()
} }
@ -274,10 +298,18 @@ func (p *process) waitForCreate(timeout time.Duration) error {
p.setPid(resp.pid) p.setPid(resp.pid)
started, err := readProcessStartTime(resp.pid) started, err := readProcessStartTime(resp.pid)
if err != nil { if err != nil {
logrus.Warnf("containerd: unable to save starttime: %v", err) logrus.Warnf("shim: unable to save starttime: %v", err)
} }
// TODO: save start time to disk or process state file
p.startTime = started p.startTime = started
f, err := os.Create(filepath.Join(p.root, "process.json"))
if err != nil {
logrus.Warnf("shim: unable to save starttime: %v", err)
return nil
}
defer f.Close()
if err := json.NewEncoder(f).Encode(p); err != nil {
logrus.Warnf("shim: unable to save starttime: %v", err)
}
return nil return nil
case <-time.After(timeout): case <-time.After(timeout):
p.cmd.Process.Kill() p.cmd.Process.Kill()
@ -294,9 +326,9 @@ func (p *process) readContainerPid(r chan pidResponse) {
if os.IsNotExist(err) || err == errInvalidPidInt { if os.IsNotExist(err) || err == errInvalidPidInt {
if serr := checkErrorLogs(p.cmd, if serr := checkErrorLogs(p.cmd,
filepath.Join(p.root, "shim-log.json"), filepath.Join(p.root, "shim-log.json"),
filepath.Join(p.root, "log.json")); serr != nil { filepath.Join(p.root, "log.json")); serr != nil && !os.IsNotExist(serr) {
r <- pidResponse{ r <- pidResponse{
err: err, err: serr,
} }
break break
} }
@ -316,8 +348,8 @@ func (p *process) readContainerPid(r chan pidResponse) {
} }
func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) { func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) {
if err := unix.Kill(p.containerPid, 0); err == syscall.ESRCH { if err := unix.Kill(p.pid, 0); err == syscall.ESRCH {
logrus.Warnf("containerd: (pid %d) does not exist", p.containerPid) logrus.Warnf("containerd: (pid %d) does not exist", p.pid)
// The process died while containerd was down (probably of // The process died while containerd was down (probably of
// SIGKILL, but no way to be sure) // SIGKILL, but no way to be sure)
return UnknownStatus, writeExitStatus(filepath.Join(p.root, "exitStatus"), UnknownStatus) return UnknownStatus, writeExitStatus(filepath.Join(p.root, "exitStatus"), UnknownStatus)
@ -330,18 +362,18 @@ func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) {
// without having to go through all this process again // without having to go through all this process again
return UnknownStatus, writeExitStatus(filepath.Join(p.root, "exitStatus"), UnknownStatus) return UnknownStatus, writeExitStatus(filepath.Join(p.root, "exitStatus"), UnknownStatus)
} }
ppid, err := readProcStatField(p.containerPid, 4) ppid, err := readProcStatField(p.pid, 4)
if err != nil { if err != nil {
return rst, fmt.Errorf("could not check process ppid: %v (%v)", err, rerr) return rst, fmt.Errorf("could not check process ppid: %v (%v)", err, rerr)
} }
if ppid == "1" { if ppid == "1" {
if err := unix.Kill(p.containerPid, syscall.SIGKILL); err != nil && err != syscall.ESRCH { if err := unix.Kill(p.pid, syscall.SIGKILL); err != nil && err != syscall.ESRCH {
return UnknownStatus, fmt.Errorf( return UnknownStatus, fmt.Errorf(
"containerd: unable to SIGKILL (pid %v): %v", p.containerPid, err) "containerd: unable to SIGKILL (pid %v): %v", p.pid, err)
} }
// wait for the process to die // wait for the process to die
for { for {
if err := unix.Kill(p.containerPid, 0); err == syscall.ESRCH { if err := unix.Kill(p.pid, 0); err == syscall.ESRCH {
break break
} }
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
@ -415,7 +447,7 @@ func readProcStatField(pid int, field int) (string, error) {
func readPid(pidFile string) (int, error) { func readPid(pidFile string) (int, error) {
data, err := ioutil.ReadFile(pidFile) data, err := ioutil.ReadFile(pidFile)
if err != nil { if err != nil {
return -1, nil return -1, err
} }
i, err := strconv.Atoi(string(data)) i, err := strconv.Atoi(string(data))
if err != nil { if err != nil {

View file

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -40,16 +41,21 @@ type Opts struct {
Name string Name string
RuntimeName string RuntimeName string
RuntimeArgs []string RuntimeArgs []string
RuntimeRoot string
NoPivotRoot bool NoPivotRoot bool
Root string Root string
Timeout time.Duration Timeout time.Duration
} }
func New(opts Opts) (*Shim, error) { func New(opts Opts) (*Shim, error) {
if err := os.MkdirAll(filepath.Dir(opts.Root), 0711); err != nil {
return nil, err
}
if err := os.Mkdir(opts.Root, 0711); err != nil { if err := os.Mkdir(opts.Root, 0711); err != nil {
return nil, err return nil, err
} }
r, err := oci.New(oci.Opts{ r, err := oci.New(oci.Opts{
Root: opts.RuntimeRoot,
Name: opts.RuntimeName, Name: opts.RuntimeName,
Args: opts.RuntimeArgs, Args: opts.RuntimeArgs,
}) })
@ -85,7 +91,26 @@ func Load(root string) (*Shim, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO: read processes into memory dirs, err := ioutil.ReadDir(root)
if err != nil {
return nil, err
}
for _, d := range dirs {
if !d.IsDir() {
continue
}
name := d.Name()
if f, err = os.Open(filepath.Join(root, name, "process.json")); err != nil {
return nil, err
}
var p process
err = json.NewDecoder(f).Decode(&p)
f.Close()
if err != nil {
return nil, err
}
s.processes[name] = &p
}
return &s, nil return &s, nil
} }
@ -152,10 +177,12 @@ func (s *Shim) UnmarshalJSON(b []byte) error {
return err return err
} }
s.runtime = r s.runtime = r
s.processes = make(map[string]*process)
return nil return nil
} }
func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate, error) { func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate, error) {
s.bundle = c.Path()
var ( var (
root = filepath.Join(s.root, "init") root = filepath.Join(s.root, "init")
cmd = s.command(c.ID(), c.Path(), s.runtime.Name()) cmd = s.command(c.ID(), c.Path(), s.runtime.Name())
@ -174,8 +201,15 @@ func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate,
s.pmu.Lock() s.pmu.Lock()
s.processes["init"] = p s.processes["init"] = p
s.pmu.Unlock() s.pmu.Unlock()
f, err := os.Create(filepath.Join(s.root, "state.json"))
if err != nil {
return nil, err
}
err = json.NewEncoder(f).Encode(s)
f.Close()
// ~TODO: oom and stats stuff here // ~TODO: oom and stats stuff here
return p, nil return p, err
} }
func (s *Shim) Start(c *containerkit.Container) error { func (s *Shim) Start(c *containerkit.Container) error {
@ -215,6 +249,23 @@ func (s *Shim) Start(c *containerkit.Container) error {
return nil return nil
} }
func (s *Shim) Delete(c *containerkit.Container) error {
if err := s.runtime.Delete(c); err != nil {
return err
}
return os.RemoveAll(s.root)
}
var errnotimpl = errors.New("NOT IMPL RIGHT NOW, CHILL")
func (s *Shim) Exec(c *containerkit.Container, p *containerkit.Process) (containerkit.ProcessDelegate, error) {
return nil, errnotimpl
}
func (s *Shim) Load(id string) (containerkit.ProcessDelegate, error) {
return nil, errnotimpl
}
func (s *Shim) getContainerInit(c *containerkit.Container) (*process, error) { func (s *Shim) getContainerInit(c *containerkit.Container) (*process, error) {
s.pmu.Lock() s.pmu.Lock()
p, ok := s.processes["init"] p, ok := s.processes["init"]