Merge pull request #616 from crosbymichael/runtime-opts

Runtime configs and global reaper
This commit is contained in:
Phil Estes 2017-03-10 14:25:19 -05:00 committed by GitHub
commit 7b06baa1f2
11 changed files with 246 additions and 97 deletions

View file

@ -11,11 +11,12 @@ import (
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
runc "github.com/crosbymichael/go-runc"
"github.com/docker/containerd"
shimapi "github.com/docker/containerd/api/services/shim"
"github.com/docker/containerd/linux/shim"
"github.com/docker/containerd/reaper"
"github.com/docker/containerd/sys"
"github.com/docker/containerd/utils"
"github.com/urfave/cli"
)
@ -78,6 +79,9 @@ func main() {
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
// make sure runc is setup to use the monitor
// for waiting on processes
runc.Monitor = reaper.Default
// set the shim as the subreaper for all orphaned processes created by the container
if err := sys.SetSubreaper(1); err != nil {
return nil, err
@ -108,7 +112,7 @@ func handleSignals(signals chan os.Signal, server *grpc.Server, service *shim.Se
logrus.WithField("signal", s).Debug("received signal")
switch s {
case syscall.SIGCHLD:
exits, err := utils.Reap(false)
exits, err := reaper.Reap()
if err != nil {
logrus.WithError(err).Error("reap exit status")
}

View file

@ -44,6 +44,8 @@ type config struct {
Snapshotter string `toml:"snapshotter"`
// Plugins provides plugin specific configuration for the initialization of a plugin
Plugins map[string]toml.Primitive `toml:"plugins"`
// Enable containerd as a subreaper
Subreaper bool `toml:"subreaper"`
md toml.MetaData
}
@ -58,6 +60,8 @@ func (c *config) decodePlugin(name string, v interface{}) error {
type grpcConfig struct {
Socket string `toml:"socket"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
}
type debug struct {

View file

@ -21,7 +21,9 @@ import (
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
"github.com/docker/containerd/plugin"
"github.com/docker/containerd/reaper"
"github.com/docker/containerd/snapshot"
"github.com/docker/containerd/sys"
"github.com/docker/containerd/utils"
metrics "github.com/docker/go-metrics"
"github.com/pkg/errors"
@ -83,7 +85,13 @@ func main() {
// start the signal handler as soon as we can to make sure that
// we don't miss any signals during boot
signals := make(chan os.Signal, 2048)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGCHLD)
if conf.Subreaper {
log.G(global).Info("setting subreaper...")
if err := sys.SetSubreaper(1); err != nil {
return err
}
}
log.G(global).Info("starting containerd boot...")
// load all plugins into containerd
@ -330,6 +338,9 @@ func serveGRPC(server *grpc.Server) error {
if err != nil {
return err
}
if err := os.Chown(path, conf.GRPC.Uid, conf.GRPC.Gid); err != nil {
return err
}
go func() {
defer l.Close()
if err := server.Serve(l); err != nil {
@ -360,6 +371,10 @@ func handleSignals(signals chan os.Signal, server *grpc.Server) error {
for s := range signals {
log.G(global).WithField("signal", s).Debug("received signal")
switch s {
case syscall.SIGCHLD:
if _, err := reaper.Reap(); err != nil {
log.G(global).WithError(err).Error("reap containerd processes")
}
default:
server.Stop()
return nil

View file

@ -193,11 +193,15 @@ var runCommand = cli.Command{
if err != nil {
return err
}
abs, err := filepath.Abs(context.String("rootfs"))
if err != nil {
return err
}
// for ctr right now just do a bind mount
rootfs := []*mount.Mount{
{
Type: "bind",
Source: context.String("rootfs"),
Source: abs,
Options: []string{
"rw",
"rbind",

View file

@ -22,23 +22,35 @@ import (
const (
runtimeName = "linux"
configFilename = "config.json"
defaultRuntime = "runc"
)
func init() {
plugin.Register(runtimeName, &plugin.Registration{
Type: plugin.RuntimePlugin,
Init: New,
Type: plugin.RuntimePlugin,
Init: New,
Config: &Config{},
})
}
type Config struct {
// Runtime is a path or name of an OCI runtime used by the shim
Runtime string `toml:"runtime"`
}
func New(ic *plugin.InitContext) (interface{}, error) {
path := filepath.Join(ic.State, runtimeName)
if err := os.MkdirAll(path, 0700); err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
if cfg.Runtime == "" {
cfg.Runtime = defaultRuntime
}
c, cancel := context.WithCancel(ic.Context)
return &Runtime{
root: path,
runtime: cfg.Runtime,
events: make(chan *containerd.Event, 2048),
eventsContext: c,
eventsCancel: cancel,
@ -46,7 +58,8 @@ func New(ic *plugin.InitContext) (interface{}, error) {
}
type Runtime struct {
root string
root string
runtime string
events chan *containerd.Event
eventsContext context.Context
@ -70,7 +83,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts containerd.CreateO
sopts := &shim.CreateRequest{
ID: id,
Bundle: path,
Runtime: "runc",
Runtime: r.runtime,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,

View file

@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/grpclog"
"github.com/docker/containerd/api/services/shim"
"github.com/docker/containerd/reaper"
"github.com/docker/containerd/utils"
"github.com/pkg/errors"
)
@ -41,11 +42,9 @@ func newShim(path string) (shim.ShimClient, error) {
Cloneflags: syscall.CLONE_NEWNS,
Setpgid: true,
}
if err := cmd.Start(); err != nil {
if err := reaper.Default.Start(cmd); err != nil {
return nil, errors.Wrapf(err, "failed to start shim")
}
// since we are currently the parent go ahead and make sure we wait on the shim
go cmd.Wait()
return connectShim(socket)
}

101
reaper/reaper.go Normal file
View file

@ -0,0 +1,101 @@
package reaper
import (
"bytes"
"fmt"
"os/exec"
"sync"
"github.com/docker/containerd/utils"
)
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() ([]utils.Exit, error) {
exits, err := utils.Reap(false)
for _, e := range exits {
Default.mu.Lock()
c, ok := Default.cmds[e.Pid]
Default.mu.Unlock()
if !ok {
continue
}
// after we get an exit, call wait on the go process to make sure all
// pipes are closed and finalizers are run on the process
c.c.Wait()
c.exitCh <- e.Status
Default.mu.Lock()
delete(Default.cmds, e.Pid)
Default.mu.Unlock()
}
return exits, err
}
var Default = &Monitor{
cmds: make(map[int]*cmd),
}
type Monitor struct {
mu sync.Mutex
cmds map[int]*cmd
}
func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
if err := m.Run(c); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
if err := m.Run(c); err != nil {
return nil, err
}
return b.Bytes(), nil
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) error {
rc := &cmd{
c: c,
exitCh: make(chan int, 1),
}
m.mu.Lock()
// start the process
if err := rc.c.Start(); err != nil {
m.mu.Unlock()
return err
}
m.cmds[rc.c.Process.Pid] = rc
m.mu.Unlock()
return nil
}
// Run runs and waits for the command to finish
func (m *Monitor) Run(c *exec.Cmd) error {
if err := m.Start(c); err != nil {
return err
}
_, err := m.Wait(c)
return err
}
func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
m.mu.Lock()
rc, ok := m.cmds[c.Process.Pid]
m.mu.Unlock()
if !ok {
return 255, fmt.Errorf("process does not exist")
}
return <-rc.exitCh, nil
}
type cmd struct {
c *exec.Cmd
exitCh chan int
}

View file

@ -1,64 +1,32 @@
# go-runc client for runc; master as of 01/20/2017
github.com/crosbymichael/go-runc 706de6f422f397fb70b8c98f9b8c8eab2de32ae2
# console pkg;
github.com/crosbymichael/go-runc bd9aef7cf4402a3a8728e3ef83dcca6a5a1be899
github.com/crosbymichael/console 4bf9d88357031b516b3794a2594b6d060a29c59c
# go-metrics client to prometheus; master as of 12/16/2016
github.com/docker/go-metrics 0f35294225552d968a13f9c5bc71a3fa44b2eb87
# prometheus client; latest release as of 12/16/2016
github.com/prometheus/client_golang v0.8.0
# prometheus client model; master as of 12/16/2016
github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
# prometheus common library; master as of 12/16/2016
github.com/prometheus/common 195bde7883f7c39ea62b0d92ab7359b5327065cb
# prometheus procfs; master as of 12/16/2016
github.com/prometheus/procfs fcdb11ccb4389efb1b210b7ffb623ab71c5fdd60
# beorn7/perks; master as of 12/16/2016
github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
# matttproud/golang_protobuf_extensions; latest tagged release as of 12/16/2016
github.com/matttproud/golang_protobuf_extensions v1.0.0
# go-units from Docker; latest release as of 12/16/2016
github.com/docker/go-units v0.3.1
# gogo/protobuf - master as of 2/15/2016 (latest tagged release doesn't have needed change)
github.com/gogo/protobuf d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8
# golang support for protobufs - master as of 12/16/2016
github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93
# runc, latest release as of 12/16/2016
github.com/opencontainers/runc ce450bcc6c135cae93ee2a99d41a308c179ff6dc
# OCI runtime spec, latest release as of 12/16/2016
github.com/opencontainers/runtime-spec v1.0.0-rc3
# logrus, latest release as of 12/16/2016
github.com/Sirupsen/logrus v0.11.0
# go-btrfs from stevvooe; master as of 1/11/2017
github.com/stevvooe/go-btrfs 8539a1d04898663b8eda14982e24b74e7a12388e
# testify go testing support; latest release as of 12/16/2016
github.com/stretchr/testify v1.1.4
# go-spew (required by testify, and also by ctr); latest release as of 1/12/2017
github.com/davecgh/go-spew v1.1.0
# go-difflib (required by testify); latest release as of 1/12/2017
github.com/pmezard/go-difflib v1.0.0
# Go pkg for handling fifos; master as of 12/16/2016
github.com/tonistiigi/fifo fe870ccf293940774c2b44e23f6c71fff8f7547d
# client application library; latest release as of 12/16/2016
github.com/urfave/cli v1.19.1
# extended Golang net package; upstream reported githash as of 12/16/2016
golang.org/x/net 8b4af36cd21a1f85a7484b49feb7c79363106d8e
# Go gRPC support; latest release as of 12/16/2016
google.golang.org/grpc v1.0.5
# pkg/errors; latest release as of 12/16/2016
github.com/pkg/errors v0.8.0
# lockfile; master as of 1/12/2017
github.com/nightlyone/lockfile 1d49c987357a327b5b03aa84cbddd582c328615d
# docker (for docker/pkg/archive, which is required by snapshot); latest experimental release as of 1/12/2017
github.com/docker/docker v1.13.0-rc6
# go-digest; master as of 1/12/2017
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
# sys/unix; master as of 1/12/2017
golang.org/x/sys/unix d75a52659825e75fff6158388dddc6a5b04f9ba5
# image-spec master as of 1/17/2017
github.com/opencontainers/image-spec a431dbcf6a74fca2e0e040b819a836dbe3fb23ca
# continuity master as of 2/1/2017
github.com/stevvooe/continuity 1530f13d23b34e2ccaf33881fefecc7e28e3577b
# sync master as of 12/5/2016
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
github.com/BurntSushi/toml v0.2.0-21-g9906417

50
vendor/github.com/crosbymichael/go-runc/monitor.go generated vendored Normal file
View file

@ -0,0 +1,50 @@
package runc
import (
"os/exec"
"syscall"
)
var Monitor ProcessMonitor = &defaultMonitor{}
// ProcessMonitor is an interface for process monitoring
//
// It allows daemons using go-runc to have a SIGCHLD handler
// to handle exits without introducing races between the handler
// and go's exec.Cmd
// These methods should match the methods exposed by exec.Cmd to provide
// a consistent experience for the caller
type ProcessMonitor interface {
Output(*exec.Cmd) ([]byte, error)
CombinedOutput(*exec.Cmd) ([]byte, error)
Run(*exec.Cmd) error
Start(*exec.Cmd) error
Wait(*exec.Cmd) (int, error)
}
type defaultMonitor struct {
}
func (m *defaultMonitor) Output(c *exec.Cmd) ([]byte, error) {
return c.Output()
}
func (m *defaultMonitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
return c.CombinedOutput()
}
func (m *defaultMonitor) Run(c *exec.Cmd) error {
return c.Run()
}
func (m *defaultMonitor) Start(c *exec.Cmd) error {
return c.Start()
}
func (m *defaultMonitor) Wait(c *exec.Cmd) (int, error) {
status, err := c.Process.Wait()
if err != nil {
return -1, err
}
return status.Sys().(syscall.WaitStatus).ExitStatus(), nil
}

View file

@ -40,7 +40,7 @@ type Runc struct {
// List returns all containers created inside the provided runc root directory
func (r *Runc) List(context context.Context) ([]*Container, error) {
data, err := r.command(context, "list", "--format=json").Output()
data, err := Monitor.Output(r.command(context, "list", "--format=json"))
if err != nil {
return nil, err
}
@ -53,7 +53,7 @@ func (r *Runc) List(context context.Context) ([]*Container, error) {
// State returns the state for the container provided by id
func (r *Runc) State(context context.Context, id string) (*Container, error) {
data, err := r.command(context, "state", id).CombinedOutput()
data, err := Monitor.CombinedOutput(r.command(context, "state", id))
if err != nil {
return nil, fmt.Errorf("%s: %s", err, data)
}
@ -72,6 +72,7 @@ type CreateOpts struct {
Detach bool
NoPivot bool
NoNewKeyring bool
ExtraFiles []*os.File
}
func (o *CreateOpts) args() (out []string, err error) {
@ -94,6 +95,9 @@ func (o *CreateOpts) args() (out []string, err error) {
if o.Detach {
out = append(out, "--detach")
}
if o.ExtraFiles != nil {
out = append(out, "--preserve-fds", strconv.Itoa(len(o.ExtraFiles)))
}
return out, nil
}
@ -111,14 +115,16 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp
if opts != nil && opts.IO != nil {
opts.Set(cmd)
}
cmd.ExtraFiles = opts.ExtraFiles
if cmd.Stdout == nil && cmd.Stderr == nil {
data, err := cmd.CombinedOutput()
data, err := Monitor.CombinedOutput(cmd)
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}
if err := cmd.Start(); err != nil {
if err := Monitor.Start(cmd); err != nil {
return err
}
if opts != nil && opts.IO != nil {
@ -128,36 +134,26 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp
}
}
}
return cmd.Wait()
_, err := Monitor.Wait(cmd)
return err
}
// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
return runOrError(r.command(context, "start", id))
return r.runOrError(r.command(context, "start", id))
}
type ExecOpts struct {
IO
PidFile string
Uid int
Gid int
Cwd string
Tty bool
ConsoleSocket *ConsoleSocket
Detach bool
}
func (o *ExecOpts) args() (out []string, err error) {
out = append(out, "--user", fmt.Sprintf("%d:%d", o.Uid, o.Gid))
if o.Tty {
out = append(out, "--tty")
}
if o.ConsoleSocket != nil {
out = append(out, "--console-socket", o.ConsoleSocket.Path())
}
if o.Cwd != "" {
out = append(out, "--cwd", o.Cwd)
}
if o.Detach {
out = append(out, "--detach")
}
@ -174,7 +170,7 @@ func (o *ExecOpts) args() (out []string, err error) {
// Exec executres and additional process inside the container based on a full
// OCI Process specification
func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts *ExecOpts) error {
f, err := ioutil.TempFile("", "-process")
f, err := ioutil.TempFile("", "runc-process")
if err != nil {
return err
}
@ -197,13 +193,13 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts
opts.Set(cmd)
}
if cmd.Stdout == nil && cmd.Stderr == nil {
data, err := cmd.CombinedOutput()
data, err := Monitor.CombinedOutput(cmd)
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}
if err := cmd.Start(); err != nil {
if err := Monitor.Start(cmd); err != nil {
return err
}
if opts != nil && opts.IO != nil {
@ -213,7 +209,8 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts
}
}
}
return cmd.Wait()
_, err = Monitor.Wait(cmd)
return err
}
// Run runs the create, start, delete lifecycle of the container
@ -231,19 +228,15 @@ func (r *Runc) Run(context context.Context, id, bundle string, opts *CreateOpts)
if opts != nil {
opts.Set(cmd)
}
if err := cmd.Start(); err != nil {
if err := Monitor.Start(cmd); err != nil {
return -1, err
}
status, err := cmd.Process.Wait()
if err != nil {
return -1, err
}
return status.Sys().(syscall.WaitStatus).ExitStatus(), nil
return Monitor.Wait(cmd)
}
// Delete deletes the container
func (r *Runc) Delete(context context.Context, id string) error {
return runOrError(r.command(context, "delete", id))
return r.runOrError(r.command(context, "delete", id))
}
// KillOpts specifies options for killing a container and its processes
@ -266,7 +259,7 @@ func (r *Runc) Kill(context context.Context, id string, sig int, opts *KillOpts)
if opts != nil {
args = append(args, opts.args()...)
}
return runOrError(r.command(context, append(args, id, strconv.Itoa(sig))...))
return r.runOrError(r.command(context, append(args, id, strconv.Itoa(sig))...))
}
// Stats return the stats for a container like cpu, memory, and io
@ -278,9 +271,9 @@ func (r *Runc) Stats(context context.Context, id string) (*Stats, error) {
}
defer func() {
rd.Close()
cmd.Wait()
Monitor.Wait(cmd)
}()
if err := cmd.Start(); err != nil {
if err := Monitor.Start(cmd); err != nil {
return nil, err
}
var e Event
@ -297,7 +290,7 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
if err := Monitor.Start(cmd); err != nil {
rd.Close()
return nil, err
}
@ -309,7 +302,7 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration
defer func() {
close(c)
rd.Close()
cmd.Wait()
Monitor.Wait(cmd)
}()
for {
var e Event
@ -330,17 +323,17 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration
// Pause the container with the provided id
func (r *Runc) Pause(context context.Context, id string) error {
return runOrError(r.command(context, "pause", id))
return r.runOrError(r.command(context, "pause", id))
}
// Resume the container with the provided id
func (r *Runc) Resume(context context.Context, id string) error {
return runOrError(r.command(context, "resume", id))
return r.runOrError(r.command(context, "resume", id))
}
// Ps lists all the processes inside the container returning their pids
func (r *Runc) Ps(context context.Context, id string) ([]int, error) {
data, err := r.command(context, "ps", "--format", "json", id).CombinedOutput()
data, err := Monitor.CombinedOutput(r.command(context, "ps", "--format", "json", id))
if err != nil {
return nil, fmt.Errorf("%s: %s", err, data)
}
@ -380,3 +373,18 @@ func (r *Runc) command(context context.Context, args ...string) *exec.Cmd {
}
return cmd
}
// runOrError will run the provided command. If an error is
// encountered and neither Stdout or Stderr was set the error and the
// stderr of the command will be returned in the format of <error>:
// <stderr>
func (r *Runc) runOrError(cmd *exec.Cmd) error {
if cmd.Stdout != nil || cmd.Stderr != nil {
return Monitor.Run(cmd)
}
data, err := Monitor.CombinedOutput(cmd)
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}

View file

@ -1,9 +1,7 @@
package runc
import (
"fmt"
"io/ioutil"
"os/exec"
"strconv"
"syscall"
)
@ -18,21 +16,6 @@ func ReadPidFile(path string) (int, error) {
return strconv.Atoi(string(data))
}
// runOrError will run the provided command. If an error is
// encountered and neither Stdout or Stderr was set the error and the
// stderr of the command will be returned in the format of <error>:
// <stderr>
func runOrError(cmd *exec.Cmd) error {
if cmd.Stdout != nil || cmd.Stderr != nil {
return cmd.Run()
}
data, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%s: %s", err, data)
}
return nil
}
const exitSignalOffset = 128
// exitStatus returns the correct exit status for a process based on if it