From df48983fe76262e68240004384fcbc86257a82af Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Mar 2017 16:07:35 -0800 Subject: [PATCH] Add reaper code for daemon Signed-off-by: Michael Crosby --- cmd/containerd/main.go | 7 ++- linux/runtime.go | 2 +- linux/shim.go | 5 +-- reaper/reaper.go | 100 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 5 deletions(-) create mode 100644 reaper/reaper.go diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 8e2a234..69d44d0 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -21,6 +21,7 @@ import ( "github.com/docker/containerd/content" "github.com/docker/containerd/log" "github.com/docker/containerd/plugin" + "github.com/docker/containerd/reaper" "github.com/docker/containerd/snapshot" "github.com/docker/containerd/utils" metrics "github.com/docker/go-metrics" @@ -83,7 +84,7 @@ func main() { // start the signal handler as soon as we can to make sure that // we don't miss any signals during boot signals := make(chan os.Signal, 2048) - signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGCHLD) log.G(global).Info("starting containerd boot...") // load all plugins into containerd @@ -363,6 +364,10 @@ func handleSignals(signals chan os.Signal, server *grpc.Server) error { for s := range signals { log.G(global).WithField("signal", s).Debug("received signal") switch s { + case syscall.SIGCHLD: + if err := reaper.Reap(); err != nil { + log.G(global).WithError(err).Error("reap containerd processes") + } default: server.Stop() return nil diff --git a/linux/runtime.go b/linux/runtime.go index 9901e85..00a97d1 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -38,7 +38,7 @@ type Config struct { Runtime string `toml:"runtime"` } -func New(ic *containerd.InitContext) (interface{}, error) { +func New(ic *plugin.InitContext) (interface{}, error) { path := filepath.Join(ic.State, runtimeName) if err := os.MkdirAll(path, 0700); err != nil { return nil, err diff --git a/linux/shim.go b/linux/shim.go index 62cb032..a011796 100644 --- a/linux/shim.go +++ b/linux/shim.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/grpclog" "github.com/docker/containerd/api/services/shim" + "github.com/docker/containerd/reaper" "github.com/docker/containerd/utils" "github.com/pkg/errors" ) @@ -41,11 +42,9 @@ func newShim(path string) (shim.ShimClient, error) { Cloneflags: syscall.CLONE_NEWNS, Setpgid: true, } - if err := cmd.Start(); err != nil { + if err := reaper.Default.Start(cmd); err != nil { return nil, errors.Wrapf(err, "failed to start shim") } - // since we are currently the parent go ahead and make sure we wait on the shim - go cmd.Wait() return connectShim(socket) } diff --git a/reaper/reaper.go b/reaper/reaper.go new file mode 100644 index 0000000..dc4394d --- /dev/null +++ b/reaper/reaper.go @@ -0,0 +1,100 @@ +package reaper + +import ( + "bytes" + "fmt" + "os/exec" + "sync" + + "github.com/docker/containerd/utils" +) + +// Reap should be called when the process receives an SIGCHLD. Reap will reap +// all exited processes and close their wait channels +func Reap() error { + exits, err := utils.Reap(false) + for _, e := range exits { + Default.mu.Lock() + c, ok := Default.cmds[e.Pid] + Default.mu.Unlock() + if !ok { + continue + } + // after we get an exit, call wait on the go process to make sure all + // pipes are closed and finalizers are run on the process + c.c.Wait() + c.exitCh <- e.Status + Default.mu.Lock() + delete(Default.cmds, e.Pid) + Default.mu.Unlock() + } + return err +} + +var Default = &Monitor{ + cmds: make(map[int]*cmd), +} + +type Monitor struct { + mu sync.Mutex + cmds map[int]*cmd +} + +func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) { + var b bytes.Buffer + c.Stdout = &b + if err := m.Run(c); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) { + var b bytes.Buffer + c.Stdout = &b + c.Stderr = &b + if err := m.Run(c); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +// Start starts the command a registers the process with the reaper +func (m *Monitor) Start(c *exec.Cmd) error { + rc := &cmd{ + c: c, + exitCh: make(chan int, 1), + } + // make sure we register the command first before starting the process + m.mu.Lock() + // start the process + if err := rc.c.Start(); err != nil { + m.mu.Unlock() + return err + } + m.cmds[rc.c.Process.Pid] = rc + m.mu.Unlock() + return nil +} + +// Run runs and waits for the command to finish +func (m *Monitor) Run(c *exec.Cmd) error { + if err := m.Start(c); err != nil { + return err + } + _, err := m.Wait(c) + return err +} + +func (m *Monitor) Wait(c *exec.Cmd) (int, error) { + rc, ok := m.cmds[c.Process.Pid] + if !ok { + return 255, fmt.Errorf("process does not exist") + } + return <-rc.exitCh, nil +} + +type cmd struct { + c *exec.Cmd + exitCh chan int +}