Merge pull request #462 from crosbymichael/shim

shim: GRPC service
This commit is contained in:
Kenfe-Mickaël Laventure 2017-01-26 16:11:45 -08:00 committed by GitHub
commit 24c2810899
40 changed files with 6741 additions and 846 deletions

View file

@ -1,82 +0,0 @@
// +build !solaris
package main
import (
"fmt"
"os"
"syscall"
"unsafe"
)
// NewConsole returns an initialized console that can be used within a container by copying bytes
// from the master side to the slave that is attached as the tty for the container's init process.
func newConsole(uid, gid int) (*os.File, string, error) {
master, err := os.OpenFile("/dev/ptmx", syscall.O_RDWR|syscall.O_NOCTTY|syscall.O_CLOEXEC, 0)
if err != nil {
return nil, "", err
}
if err = saneTerminal(master); err != nil {
return nil, "", err
}
console, err := ptsname(master)
if err != nil {
return nil, "", err
}
if err := unlockpt(master); err != nil {
return nil, "", err
}
if err := os.Chmod(console, 0600); err != nil {
return nil, "", err
}
if err := os.Chown(console, uid, gid); err != nil {
return nil, "", err
}
return master, console, nil
}
// saneTerminal sets the necessary tty_ioctl(4)s to ensure that a pty pair
// created by us acts normally. In particular, a not-very-well-known default of
// Linux unix98 ptys is that they have +onlcr by default. While this isn't a
// problem for terminal emulators, because we relay data from the terminal we
// also relay that funky line discipline.
func saneTerminal(terminal *os.File) error {
// Go doesn't have a wrapper for any of the termios ioctls.
var termios syscall.Termios
if err := ioctl(terminal.Fd(), syscall.TCGETS, uintptr(unsafe.Pointer(&termios))); err != nil {
return fmt.Errorf("ioctl(tty, tcgets): %s", err.Error())
}
// Set -onlcr so we don't have to deal with \r.
termios.Oflag &^= syscall.ONLCR
if err := ioctl(terminal.Fd(), syscall.TCSETS, uintptr(unsafe.Pointer(&termios))); err != nil {
return fmt.Errorf("ioctl(tty, tcsets): %s", err.Error())
}
return nil
}
func ioctl(fd uintptr, flag, data uintptr) error {
if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, flag, data); err != 0 {
return err
}
return nil
}
// unlockpt unlocks the slave pseudoterminal device corresponding to the master pseudoterminal referred to by f.
// unlockpt should be called before opening the slave side of a pty.
func unlockpt(f *os.File) error {
var u int32
return ioctl(f.Fd(), syscall.TIOCSPTLCK, uintptr(unsafe.Pointer(&u)))
}
// ptsname retrieves the name of the first available pts for the given master.
func ptsname(f *os.File) (string, error) {
var n int32
if err := ioctl(f.Fd(), syscall.TIOCGPTN, uintptr(unsafe.Pointer(&n))); err != nil {
return "", err
}
return fmt.Sprintf("/dev/pts/%d", n), nil
}

View file

@ -1,14 +0,0 @@
// +build solaris
package main
import (
"errors"
"os"
)
// NewConsole returns an initialized console that can be used within a container by copying bytes
// from the master side to the slave that is attached as the tty for the container's init process.
func newConsole(uid, gid int) (*os.File, string, error) {
return nil, "", errors.New("newConsole not implemented on Solaris")
}

View file

@ -1,210 +1,129 @@
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd"
apishim "github.com/docker/containerd/api/shim"
"github.com/docker/containerd/shim"
"github.com/docker/containerd/sys"
"github.com/docker/docker/pkg/term"
"github.com/docker/containerd/utils"
"github.com/urfave/cli"
)
func writeMessage(f *os.File, level string, err error) {
fmt.Fprintf(f, `{"level": "%s","msg": "%s"}`, level, err)
f.Sync()
}
const usage = `
__ _ __ __ _
_________ ____ / /_____ _(_)___ ___ _________/ / _____/ /_ (_)___ ___
/ ___/ __ \/ __ \/ __/ __ ` + "`" + `/ / __ \/ _ \/ ___/ __ /_____/ ___/ __ \/ / __ ` + "`" + `__ \
/ /__/ /_/ / / / / /_/ /_/ / / / / / __/ / / /_/ /_____(__ ) / / / / / / / / /
\___/\____/_/ /_/\__/\__,_/_/_/ /_/\___/_/ \__,_/ /____/_/ /_/_/_/ /_/ /_/
shim for container lifecycle and reconnection
`
type controlMessage struct {
Type int
Width int
Height int
}
// containerd-shim is a small shim that sits in front of a runtime implementation
// that allows it to be reparented to init and handle reattach from the caller.
//
// the cwd of the shim should be the path to the state directory where the shim
// can locate fifos and other information.
// Arg0: id of the container
// Arg1: bundle path
// Arg2: runtime binary
func main() {
flag.Parse()
cwd, err := os.Getwd()
if err != nil {
panic(err)
app := cli.NewApp()
app.Name = "containerd-shim"
app.Version = containerd.Version
app.Usage = usage
app.Flags = []cli.Flag{
cli.BoolFlag{
Name: "debug",
Usage: "enable debug output in logs",
},
}
f, err := os.OpenFile(filepath.Join(cwd, "shim-log.json"), os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0666)
if err != nil {
panic(err)
}
if err := start(f); err != nil {
// this means that the runtime failed starting the container and will have the
// proper error messages in the runtime log so we should to treat this as a
// shim failure because the sim executed properly
if err == errRuntime {
f.Close()
return
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
}
// log the error instead of writing to stderr because the shim will have
// /dev/null as it's stdio because it is supposed to be reparented to system
// init and will not have anyone to read from it
writeMessage(f, "error", err)
f.Close()
return nil
}
app.Action = func(context *cli.Context) error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
signals, err := setupSignals()
if err != nil {
return err
}
var (
server = grpc.NewServer()
sv = shim.NewService()
)
logrus.Debug("registering grpc server")
apishim.RegisterShimServer(server, sv)
if err := serve(server, "shim.sock"); err != nil {
return err
}
return handleSignals(signals, server, sv)
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
}
func start(log *os.File) error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
// set the shim as the subreaper for all orphaned processes created by the container
if err := sys.SetSubreaper(1); err != nil {
return err
return nil, err
}
// open the exit pipe
f, err := os.OpenFile("exit", syscall.O_WRONLY, 0)
return signals, nil
}
// serve serves the grpc API over a unix socket at the provided path
// this function does not block
func serve(server *grpc.Server, path string) error {
l, err := utils.CreateUnixSocket(path)
if err != nil {
return err
}
defer f.Close()
control, err := os.OpenFile("control", syscall.O_RDWR, 0)
if err != nil {
return err
}
defer control.Close()
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
return err
}
defer func() {
if err := p.Close(); err != nil {
writeMessage(log, "warn", err)
}
}()
if err := p.create(); err != nil {
p.delete()
return err
}
msgC := make(chan controlMessage, 32)
logrus.WithField("socket", path).Debug("serving api on unix socket")
go func() {
for {
var m controlMessage
if _, err := fmt.Fscanf(control, "%d %d %d\n", &m.Type, &m.Width, &m.Height); err != nil {
continue
}
msgC <- m
defer l.Close()
if err := server.Serve(l); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") {
l.Close()
logrus.WithError(err).Fatal("containerd-shim: GRPC server failure")
}
}()
if runtime.GOOS == "solaris" {
return nil
}
var exitShim bool
for !exitShim {
select {
case s := <-signals:
switch s {
case syscall.SIGCHLD:
exits, _ := Reap(false)
for _, e := range exits {
// check to see if runtime is one of the processes that has exited
if e.Pid == p.pid() {
exitShim = true
writeInt("exitStatus", e.Status)
}
}
}
case msg := <-msgC:
switch msg.Type {
case 0:
// close stdin
if p.stdinCloser != nil {
p.stdinCloser.Close()
}
case 1:
if p.console == nil {
continue
}
ws := term.Winsize{
Width: uint16(msg.Width),
Height: uint16(msg.Height),
}
term.SetWinsize(p.console.Fd(), &ws)
}
}
}
// runtime has exited so the shim can also exit
// kill all processes in the container incase it was not running in
// its own PID namespace
p.killAll()
// wait for all the processes and IO to finish
p.Wait()
// delete the container from the runtime
p.delete()
// the close of the exit fifo will happen when the shim exits
return nil
}
func writeInt(path string, i int) error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
_, err = fmt.Fprintf(f, "%d", i)
return err
}
// Exit is the wait4 information from an exited process
type Exit struct {
Pid int
Status int
}
// Reap reaps all child processes for the calling process and returns their
// exit information
func Reap(wait bool) (exits []Exit, err error) {
var (
ws syscall.WaitStatus
rus syscall.Rusage
)
flag := syscall.WNOHANG
if wait {
flag = 0
}
for {
pid, err := syscall.Wait4(-1, &ws, flag, &rus)
if err != nil {
if err == syscall.ECHILD {
return exits, nil
func handleSignals(signals chan os.Signal, server *grpc.Server, service *shim.Service) error {
for s := range signals {
logrus.WithField("signal", s).Debug("received signal")
switch s {
case syscall.SIGCHLD:
exits, err := utils.Reap(false)
if err != nil {
logrus.WithError(err).Error("reap exit status")
}
return exits, err
for _, e := range exits {
logrus.WithFields(logrus.Fields{
"status": e.Status,
"pid": e.Pid,
}).Debug("process exited")
if err := service.ProcessExit(e); err != nil {
return err
}
}
case syscall.SIGTERM, syscall.SIGINT:
// TODO: should we forward signals to the processes if they are still running?
// i.e. machine reboot
server.Stop()
return nil
}
if pid <= 0 {
return exits, nil
}
exits = append(exits, Exit{
Pid: pid,
Status: exitStatus(ws),
})
}
}
const exitSignalOffset = 128
// exitStatus returns the correct exit status for a process based on if it
// was signaled or exited cleanly
func exitStatus(status syscall.WaitStatus) int {
if status.Signaled() {
return exitSignalOffset + int(status.Signal())
}
return status.ExitStatus()
return nil
}

View file

@ -1,295 +0,0 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"sync"
"syscall"
"time"
)
var errRuntime = errors.New("shim: runtime execution error")
type checkpoint struct {
// Timestamp is the time that checkpoint happened
Created time.Time `json:"created"`
// Name is the name of the checkpoint
Name string `json:"name"`
// TCP checkpoints open tcp connections
TCP bool `json:"tcp"`
// UnixSockets persists unix sockets in the checkpoint
UnixSockets bool `json:"unixSockets"`
// Shell persists tty sessions in the checkpoint
Shell bool `json:"shell"`
// Exit exits the container after the checkpoint is finished
Exit bool `json:"exit"`
// EmptyNS tells CRIU not to restore a particular namespace
EmptyNS []string `json:"emptyNS,omitempty"`
}
type processState struct {
Terminal bool `json:"terminal"`
Exec bool `json:"exec"`
Stdin string `json:"containerdStdin"`
Stdout string `json:"containerdStdout"`
Stderr string `json:"containerdStderr"`
RuntimeArgs []string `json:"runtimeArgs"`
NoPivotRoot bool `json:"noPivotRoot"`
CheckpointPath string `json:"checkpoint"`
RootUID int `json:"rootUID"`
RootGID int `json:"rootGID"`
}
type process struct {
sync.WaitGroup
id string
bundle string
stdio *stdio
exec bool
containerPid int
checkpoint *checkpoint
checkpointPath string
shimIO *IO
stdinCloser io.Closer
console *os.File
consolePath string
state *processState
runtime string
}
func newProcess(id, bundle, runtimeName string) (*process, error) {
p := &process{
id: id,
bundle: bundle,
runtime: runtimeName,
}
s, err := loadProcess()
if err != nil {
return nil, err
}
p.state = s
if s.CheckpointPath != "" {
cpt, err := loadCheckpoint(s.CheckpointPath)
if err != nil {
return nil, err
}
p.checkpoint = cpt
p.checkpointPath = s.CheckpointPath
}
if err := p.openIO(); err != nil {
return nil, err
}
return p, nil
}
func loadProcess() (*processState, error) {
f, err := os.Open("process.json")
if err != nil {
return nil, err
}
defer f.Close()
var s processState
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func loadCheckpoint(checkpointPath string) (*checkpoint, error) {
f, err := os.Open(filepath.Join(checkpointPath, "config.json"))
if err != nil {
return nil, err
}
defer f.Close()
var cpt checkpoint
if err := json.NewDecoder(f).Decode(&cpt); err != nil {
return nil, err
}
return &cpt, nil
}
func (p *process) create() error {
cwd, err := os.Getwd()
if err != nil {
return err
}
logPath := filepath.Join(cwd, "log.json")
args := append([]string{
"--log", logPath,
"--log-format", "json",
}, p.state.RuntimeArgs...)
if p.state.Exec {
args = append(args, "exec",
"-d",
"--process", filepath.Join(cwd, "process.json"),
"--console", p.consolePath,
)
} else if p.checkpoint != nil {
args = append(args, "restore",
"-d",
"--image-path", p.checkpointPath,
"--work-path", filepath.Join(p.checkpointPath, "criu.work", "restore-"+time.Now().Format(time.RFC3339)),
)
add := func(flags ...string) {
args = append(args, flags...)
}
if p.checkpoint.Shell {
add("--shell-job")
}
if p.checkpoint.TCP {
add("--tcp-established")
}
if p.checkpoint.UnixSockets {
add("--ext-unix-sk")
}
if p.state.NoPivotRoot {
add("--no-pivot")
}
for _, ns := range p.checkpoint.EmptyNS {
add("--empty-ns", ns)
}
} else {
args = append(args, "create",
"--bundle", p.bundle,
"--console", p.consolePath,
)
if p.state.NoPivotRoot {
args = append(args, "--no-pivot")
}
}
args = append(args,
"--pid-file", filepath.Join(cwd, "pid"),
p.id,
)
cmd := exec.Command(p.runtime, args...)
cmd.Dir = p.bundle
cmd.Stdin = p.stdio.stdin
cmd.Stdout = p.stdio.stdout
cmd.Stderr = p.stdio.stderr
// Call out to setPDeathSig to set SysProcAttr as elements are platform specific
cmd.SysProcAttr = setPDeathSig()
if err := cmd.Start(); err != nil {
if exErr, ok := err.(*exec.Error); ok {
if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
return fmt.Errorf("%s not installed on system", p.runtime)
}
}
return err
}
if runtime.GOOS != "solaris" {
// Since current logic dictates that we need a pid at the end of p.create
// we need to call runtime start as well on Solaris hence we need the
// pipes to stay open.
p.stdio.stdout.Close()
p.stdio.stderr.Close()
}
if err := cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
return errRuntime
}
return err
}
data, err := ioutil.ReadFile("pid")
if err != nil {
return err
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return err
}
p.containerPid = pid
return nil
}
func (p *process) pid() int {
return p.containerPid
}
func (p *process) delete() error {
if !p.state.Exec {
cmd := exec.Command(p.runtime, append(p.state.RuntimeArgs, "delete", p.id)...)
cmd.SysProcAttr = setPDeathSig()
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %v", out, err)
}
}
return nil
}
// IO holds all 3 standard io Reader/Writer (stdin,stdout,stderr)
type IO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
func (p *process) initializeIO(rootuid int) (i *IO, err error) {
var fds []uintptr
i = &IO{}
// cleanup in case of an error
defer func() {
if err != nil {
for _, fd := range fds {
syscall.Close(int(fd))
}
}
}()
// STDIN
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
fds = append(fds, r.Fd(), w.Fd())
p.stdio.stdin, i.Stdin = r, w
// STDOUT
if r, w, err = os.Pipe(); err != nil {
return nil, err
}
fds = append(fds, r.Fd(), w.Fd())
p.stdio.stdout, i.Stdout = w, r
// STDERR
if r, w, err = os.Pipe(); err != nil {
return nil, err
}
fds = append(fds, r.Fd(), w.Fd())
p.stdio.stderr, i.Stderr = w, r
// change ownership of the pipes in case we are in a user namespace
for _, fd := range fds {
if err := syscall.Fchown(int(fd), rootuid, rootuid); err != nil {
return nil, err
}
}
return i, nil
}
func (p *process) Close() error {
return p.stdio.Close()
}
type stdio struct {
stdin *os.File
stdout *os.File
stderr *os.File
}
func (s *stdio) Close() error {
err := s.stdin.Close()
if oerr := s.stdout.Close(); err == nil {
err = oerr
}
if oerr := s.stderr.Close(); err == nil {
err = oerr
}
return err
}

View file

@ -1,131 +0,0 @@
// +build !solaris
package main
import (
"fmt"
"io"
"os/exec"
"syscall"
"time"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
// setPDeathSig sets the parent death signal to SIGKILL so that if the
// shim dies the container process also dies.
func setPDeathSig() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
}
}
// openIO opens the pre-created fifo's for use with the container
// in RDWR so that they remain open if the other side stops listening
func (p *process) openIO() error {
p.stdio = &stdio{}
var (
uid = p.state.RootUID
gid = p.state.RootGID
)
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
stdinCloser, err := fifo.OpenFifo(ctx, p.state.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return err
}
p.stdinCloser = stdinCloser
if p.state.Terminal {
master, console, err := newConsole(uid, gid)
if err != nil {
return err
}
p.console = master
p.consolePath = console
stdin, err := fifo.OpenFifo(ctx, p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
go io.Copy(master, stdin)
stdoutw, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return err
}
stdoutr, err := fifo.OpenFifo(ctx, p.state.Stdout, syscall.O_RDONLY, 0)
if err != nil {
return err
}
p.Add(1)
go func() {
io.Copy(stdoutw, master)
master.Close()
stdoutr.Close()
stdoutw.Close()
p.Done()
}()
return nil
}
i, err := p.initializeIO(uid)
if err != nil {
return err
}
p.shimIO = i
// non-tty
for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){
p.state.Stdout: func(wc io.WriteCloser, rc io.Closer) {
p.Add(1)
go func() {
io.Copy(wc, i.Stdout)
p.Done()
wc.Close()
rc.Close()
}()
},
p.state.Stderr: func(wc io.WriteCloser, rc io.Closer) {
p.Add(1)
go func() {
io.Copy(wc, i.Stderr)
p.Done()
wc.Close()
rc.Close()
}()
},
} {
fw, err := fifo.OpenFifo(ctx, name, syscall.O_WRONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", name, err)
}
fr, err := fifo.OpenFifo(ctx, name, syscall.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", name, err)
}
dest(fw, fr)
}
f, err := fifo.OpenFifo(ctx, p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", p.state.Stdin, err)
}
go func() {
io.Copy(i.Stdin, f)
i.Stdin.Close()
f.Close()
}()
return nil
}
func (p *process) killAll() error {
if !p.state.Exec {
cmd := exec.Command(p.runtime, append(p.state.RuntimeArgs, "kill", "--all", p.id, "SIGKILL")...)
cmd.SysProcAttr = setPDeathSig()
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %v", out, err)
}
}
return nil
}

View file

@ -1,70 +0,0 @@
// +build solaris
package main
import (
"io"
"os"
"syscall"
)
// setPDeathSig is a no-op on Solaris as Pdeathsig is not defined.
func setPDeathSig() *syscall.SysProcAttr {
return nil
}
// TODO: Update to using fifo's package in openIO. Need to
// 1. Merge and vendor changes in the package to use sys/unix.
// 2. Figure out why context.Background is timing out.
// openIO opens the pre-created fifo's for use with the container
// in RDWR so that they remain open if the other side stops listening
func (p *process) openIO() error {
p.stdio = &stdio{}
var (
uid = p.state.RootUID
)
i, err := p.initializeIO(uid)
if err != nil {
return err
}
p.shimIO = i
// Both tty and non-tty mode are handled by the runtime using
// the following pipes
for name, dest := range map[string]func(f *os.File){
p.state.Stdout: func(f *os.File) {
p.Add(1)
go func() {
io.Copy(f, i.Stdout)
p.Done()
}()
},
p.state.Stderr: func(f *os.File) {
p.Add(1)
go func() {
io.Copy(f, i.Stderr)
p.Done()
}()
},
} {
f, err := os.OpenFile(name, syscall.O_RDWR, 0)
if err != nil {
return err
}
dest(f)
}
f, err := os.OpenFile(p.state.Stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
go func() {
io.Copy(i.Stdin, f)
i.Stdin.Close()
}()
return nil
}
func (p *process) killAll() error {
return nil
}

View file

@ -23,6 +23,7 @@ import (
"github.com/docker/containerd/execution"
"github.com/docker/containerd/execution/executors/shim"
"github.com/docker/containerd/log"
"github.com/docker/containerd/utils"
metrics "github.com/docker/go-metrics"
"github.com/urfave/cli"
@ -30,11 +31,7 @@ import (
stand "github.com/nats-io/nats-streaming-server/server"
)
func main() {
app := cli.NewApp()
app.Name = "containerd"
app.Version = containerd.Version
app.Usage = `
const usage = `
__ _ __
_________ ____ / /_____ _(_)___ ___ _________/ /
/ ___/ __ \/ __ \/ __/ __ ` + "`" + `/ / __ \/ _ \/ ___/ __ /
@ -43,6 +40,12 @@ func main() {
high performance container runtime
`
func main() {
app := cli.NewApp()
app.Name = "containerd"
app.Version = containerd.Version
app.Usage = usage
app.Flags = []cli.Flag{
cli.BoolFlag{
Name: "debug",
@ -98,7 +101,7 @@ high performance container runtime
if path == "" {
return fmt.Errorf("--socket path cannot be empty")
}
l, err := createUnixSocket(path)
l, err := utils.CreateUnixSocket(path)
if err != nil {
return err
}
@ -171,16 +174,6 @@ high performance container runtime
}
}
func createUnixSocket(path string) (net.Listener, error) {
if err := os.MkdirAll(filepath.Dir(path), 0660); err != nil {
return nil, err
}
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
return net.Listen("unix", path)
}
func serveMetrics(address string) {
m := http.NewServeMux()
m.Handle("/metrics", metrics.Handler())

View file

@ -40,6 +40,7 @@ containerd client
deleteCommand,
listCommand,
inspectCommand,
shimCommand,
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
@ -48,7 +49,7 @@ containerd client
return nil
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
fmt.Fprintf(os.Stderr, "ctr: %s\n", err)
os.Exit(1)
}
}

296
cmd/ctr/shim.go Normal file
View file

@ -0,0 +1,296 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"strconv"
"time"
gocontext "context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"github.com/Sirupsen/logrus"
"github.com/crosbymichael/console"
"github.com/docker/containerd/api/shim"
"github.com/urfave/cli"
)
var fifoFlags = []cli.Flag{
cli.StringFlag{
Name: "stdin",
Usage: "specify the path to the stdin fifo",
},
cli.StringFlag{
Name: "stdout",
Usage: "specify the path to the stdout fifo",
},
cli.StringFlag{
Name: "stderr",
Usage: "specify the path to the stderr fifo",
},
cli.BoolFlag{
Name: "tty,t",
Usage: "enable tty support",
},
}
var shimCommand = cli.Command{
Name: "shim",
Usage: "interact with a shim directly",
Subcommands: []cli.Command{
shimCreateCommand,
shimStartCommand,
shimDeleteCommand,
shimEventsCommand,
shimStateCommand,
shimExecCommand,
},
}
var shimCreateCommand = cli.Command{
Name: "create",
Usage: "create a container with a shim",
Flags: append(fifoFlags,
cli.StringFlag{
Name: "bundle",
Usage: "bundle path for the container",
},
cli.StringFlag{
Name: "runtime",
Value: "runc",
Usage: "runtime to use for the container",
},
cli.BoolFlag{
Name: "attach,a",
Usage: "stay attached to the container and open the fifos",
},
),
Action: func(context *cli.Context) error {
id := context.Args().First()
if id == "" {
return fmt.Errorf("container id must be provided")
}
service, err := getShimService()
if err != nil {
return err
}
tty := context.Bool("tty")
wg, err := prepareStdio(context.String("stdin"), context.String("stdout"), context.String("stderr"), tty)
if err != nil {
return err
}
r, err := service.Create(gocontext.Background(), &shim.CreateRequest{
ID: id,
Bundle: context.String("bundle"),
Runtime: context.String("runtime"),
Stdin: context.String("stdin"),
Stdout: context.String("stdout"),
Stderr: context.String("stderr"),
Terminal: tty,
})
if err != nil {
return err
}
fmt.Printf("container created with id %s and pid %d\n", id, r.Pid)
if context.Bool("attach") {
if tty {
current := console.Current()
defer current.Reset()
if err := current.SetRaw(); err != nil {
return err
}
size, err := current.Size()
if err != nil {
return err
}
if _, err := service.Pty(gocontext.Background(), &shim.PtyRequest{
Pid: r.Pid,
Width: uint32(size.Width),
Height: uint32(size.Height),
}); err != nil {
return err
}
}
wg.Wait()
}
return nil
},
}
var shimStartCommand = cli.Command{
Name: "start",
Usage: "start a container with a shim",
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
_, err = service.Start(gocontext.Background(), &shim.StartRequest{})
return err
},
}
var shimDeleteCommand = cli.Command{
Name: "delete",
Usage: "delete a container with a shim",
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
pid, err := strconv.Atoi(context.Args().First())
if err != nil {
return err
}
r, err := service.Delete(gocontext.Background(), &shim.DeleteRequest{
Pid: uint32(pid),
})
if err != nil {
return err
}
fmt.Printf("container deleted and returned exit status %d\n", r.ExitStatus)
return nil
},
}
var shimStateCommand = cli.Command{
Name: "state",
Usage: "get the state of all the processes of the shim",
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
r, err := service.State(gocontext.Background(), &shim.StateRequest{})
if err != nil {
return err
}
data, err := json.Marshal(r)
if err != nil {
return err
}
buf := bytes.NewBuffer(nil)
if err := json.Indent(buf, data, " ", " "); err != nil {
return err
}
buf.WriteTo(os.Stdout)
return nil
},
}
var shimExecCommand = cli.Command{
Name: "exec",
Usage: "exec a new process in the shim's container",
Flags: append(fifoFlags,
cli.BoolFlag{
Name: "attach,a",
Usage: "stay attached to the container and open the fifos",
},
cli.StringSliceFlag{
Name: "env,e",
Usage: "add environment vars",
Value: &cli.StringSlice{},
},
cli.StringFlag{
Name: "cwd",
Usage: "current working directory",
},
),
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
tty := context.Bool("tty")
wg, err := prepareStdio(context.String("stdin"), context.String("stdout"), context.String("stderr"), tty)
if err != nil {
return err
}
rq := &shim.ExecRequest{
Args: []string(context.Args()),
Env: context.StringSlice("env"),
Cwd: context.String("cwd"),
Stdin: context.String("stdin"),
Stdout: context.String("stdout"),
Stderr: context.String("stderr"),
Terminal: tty,
}
r, err := service.Exec(gocontext.Background(), rq)
if err != nil {
return err
}
fmt.Printf("exec running with pid %d\n", r.Pid)
if context.Bool("attach") {
logrus.Info("attaching")
if tty {
current := console.Current()
defer current.Reset()
if err := current.SetRaw(); err != nil {
return err
}
size, err := current.Size()
if err != nil {
return err
}
if _, err := service.Pty(gocontext.Background(), &shim.PtyRequest{
Pid: r.Pid,
Width: uint32(size.Width),
Height: uint32(size.Height),
}); err != nil {
return err
}
}
wg.Wait()
}
return nil
},
}
var shimEventsCommand = cli.Command{
Name: "events",
Usage: "get events for a shim",
Action: func(context *cli.Context) error {
service, err := getShimService()
if err != nil {
return err
}
events, err := service.Events(gocontext.Background(), &shim.EventsRequest{})
if err != nil {
return err
}
for {
e, err := events.Recv()
if err != nil {
return err
}
fmt.Printf("type=%s id=%s pid=%d status=%d\n", e.Type, e.ID, e.Pid, e.ExitStatus)
}
return nil
},
}
func getShimService() (shim.ShimClient, error) {
bindSocket := "shim.sock"
// reset the logger for grpc to log to dev/null so that it does not mess with our stdio
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", bindSocket, timeout)
},
))
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...)
if err != nil {
return nil, err
}
return shim.NewShimClient(conn), nil
}

View file

@ -14,6 +14,7 @@ import (
gocontext "context"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/api/execution"
"github.com/tonistiigi/fifo"
"github.com/urfave/cli"
@ -38,6 +39,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
}(f)
go func(w io.WriteCloser) {
io.Copy(w, os.Stdin)
logrus.Info("stdin copy finished")
w.Close()
}(f)
@ -54,6 +56,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) {
io.Copy(os.Stdout, r)
r.Close()
logrus.Info("stdout copy finished")
wg.Done()
}(f)
@ -71,6 +74,7 @@ func prepareStdio(stdin, stdout, stderr string, console bool) (*sync.WaitGroup,
go func(r io.ReadCloser) {
io.Copy(os.Stderr, r)
r.Close()
logrus.Info("stderr copy finished")
wg.Done()
}(f)
}
@ -115,7 +119,6 @@ func getTempDir(id string) (string, error) {
if err != nil {
return "", err
}
tmpDir, err := ioutil.TempDir(filepath.Join(os.TempDir(), "ctr"), fmt.Sprintf("%s-", id))
if err != nil {
return "", err