From a861ae9d180d31e91a4ee9c257aa838b07215ae3 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 6 Oct 2016 15:06:34 -0700 Subject: [PATCH] Fix monitor with process events Monitor was receiving multiple events for the process Signed-off-by: Michael Crosby --- containerd-shim/main.go | 5 +-- example/main.go | 63 ++++++++++++++++-------------- example/utils.go | 12 +++--- monitor/monitor_linux.go | 24 ++++++++++-- shim/process.go | 83 ++++++++++++++++++++++++++-------------- shim/shim.go | 69 +++++++++++++++++++++++++-------- 6 files changed, 172 insertions(+), 84 deletions(-) diff --git a/containerd-shim/main.go b/containerd-shim/main.go index 9372287..48e66fe 100644 --- a/containerd-shim/main.go +++ b/containerd-shim/main.go @@ -83,7 +83,7 @@ func start(log *os.File) error { } defer func() { if err := p.Close(); err != nil { - writeMessage(log, "warn", err) + writeMessage(log, "warn", fmt.Errorf("close stdio %s", err)) } }() if err := p.create(); err != nil { @@ -127,7 +127,6 @@ func start(log *os.File) error { // Wait for all the childs this process may have // created (needed for exec and init processes when // they join another pid namespace) - osutils.Reap(true) p.Wait() return nil } @@ -151,7 +150,7 @@ func start(log *os.File) error { case 2: // signal if err := syscall.Kill(p.pid(), syscall.Signal(msg.Width)); err != nil { - writeMessage(log, "warn", err) + writeMessage(log, "warn", fmt.Errorf("signal pid %d: %s", msg.Width, err)) } } } diff --git a/example/main.go b/example/main.go index b6798ac..24f4d81 100644 --- a/example/main.go +++ b/example/main.go @@ -3,7 +3,9 @@ package main import ( "flag" "fmt" + "io" "os" + "strconv" "time" "github.com/Sirupsen/logrus" @@ -33,9 +35,9 @@ func runContainer() error { return err } // setup some stdio for our container - container.Stdin = Stdin() - container.Stdout = Stdout() - container.Stderr = Stderr() + container.Stdin = Stdin("") + container.Stdout = Stdout("") + container.Stderr = Stderr("") // go ahead and set the container in the create state and have it ready to start logrus.Info("create container") @@ -49,33 +51,36 @@ func runContainer() error { return err } - if exec { - // start 10 exec processes giving the go var i to exec to stdout - for i := 0; i < 10; i++ { - process, err := container.NewProcess(&specs.Process{ - Args: []string{ - "echo", fmt.Sprintf("sup from itteration %d", i), - }, - Env: env, - Terminal: false, - Cwd: "/", - NoNewPrivileges: true, - Capabilities: caps, - }) + for i := 0; i < exec; i++ { + process, err := container.NewProcess(&specs.Process{ + Args: []string{ + "sh", "-c", + "echo " + fmt.Sprintf("sup from itteration %d", i), + }, + Env: env, + Terminal: false, + Cwd: "/", + NoNewPrivileges: true, + Capabilities: caps, + }) - process.Stdin = os.Stdin - process.Stdout = os.Stdout - process.Stderr = os.Stderr + process.Stdin = Stdin(strconv.Itoa(i)) + stdout := Stdout(strconv.Itoa(i)) - if err := process.Start(); err != nil { - return err - } - procStatus, err := process.Wait() - if err != nil { - return err - } - logrus.Infof("process %d returned with %d", i, procStatus) + stderr := Stderr(strconv.Itoa(i)) + go io.Copy(os.Stdout, stdout) + go io.Copy(os.Stdout, stderr) + process.Stdout = stdout + process.Stderr = stderr + + if err := process.Start(); err != nil { + return err } + procStatus, err := process.Wait() + if err != nil { + return err + } + logrus.Infof("process %d returned with %d", i, procStatus) } if load { @@ -101,13 +106,13 @@ func runContainer() error { } var ( - exec bool + exec int load bool ) // "Hooks do optional work. Drivers do mandatory work" func main() { - flag.BoolVar(&exec, "exec", false, "run the execs") + flag.IntVar(&exec, "exec", 0, "run n number of execs") flag.BoolVar(&load, "load", false, "reload the container") flag.Parse() if err := osutils.SetSubreaper(1); err != nil { diff --git a/example/utils.go b/example/utils.go index df7e49b..76ac1a8 100644 --- a/example/utils.go +++ b/example/utils.go @@ -182,8 +182,8 @@ func (t *testConfig) Spec(m *containerkit.Mount) (*specs.Spec, error) { }, nil } -func Stdin() *os.File { - abs, err := filepath.Abs("stdin") +func Stdin(n string) *os.File { + abs, err := filepath.Abs("stdin" + n) if err != nil { panic(err) } @@ -197,8 +197,8 @@ func Stdin() *os.File { return f } -func Stdout() *os.File { - abs, err := filepath.Abs("stdout") +func Stdout(n string) *os.File { + abs, err := filepath.Abs("stdout" + n) if err != nil { panic(err) } @@ -212,8 +212,8 @@ func Stdout() *os.File { return f } -func Stderr() *os.File { - abs, err := filepath.Abs("stderr") +func Stderr(n string) *os.File { + abs, err := filepath.Abs("stderr" + n) if err != nil { panic(err) } diff --git a/monitor/monitor_linux.go b/monitor/monitor_linux.go index 6a44a1f..022190d 100644 --- a/monitor/monitor_linux.go +++ b/monitor/monitor_linux.go @@ -1,6 +1,7 @@ package monitor import ( + "io" "sync" "syscall" @@ -10,6 +11,9 @@ import ( type Monitorable interface { FD() int + // Remove returns true if the monitorable should be removed + // from the event monitor under the lock of when the event was received + Remove() bool } type Flusher interface { @@ -63,6 +67,10 @@ func (m *Monitor) Add(ma Monitorable) error { func (m *Monitor) Remove(ma Monitorable) error { m.m.Lock() defer m.m.Unlock() + return m.remove(ma) +} + +func (m *Monitor) remove(ma Monitorable) error { fd := ma.FD() delete(m.receivers, fd) return syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ @@ -84,18 +92,28 @@ func (m *Monitor) Run() { if err == syscall.EINTR { continue } - logrus.WithField("error", err).Fatal("containerd: epoll wait") + logrus.WithField("error", err).Fatal("shim: epoll wait") } for i := 0; i < n; i++ { fd := int(events[i].Fd) m.m.Lock() r := m.receivers[fd] - m.m.Unlock() if f, ok := r.(Flusher); ok { if err := f.Flush(); err != nil { - logrus.WithField("error", err).Fatal("containerd: flush event FD") + logrus.WithField("error", err).Fatal("shim: flush event FD") } } + if r.Remove() { + if err := m.remove(r); err != nil { + logrus.WithField("error", err).Fatal("shim: remove event FD") + } + } + if f, ok := r.(io.Closer); ok { + if err := f.Close(); err != nil { + logrus.WithField("error", err).Fatal("shim: close event FD") + } + } + m.m.Unlock() m.events <- r } } diff --git a/shim/process.go b/shim/process.go index 2e2a212..25d89eb 100644 --- a/shim/process.go +++ b/shim/process.go @@ -31,12 +31,22 @@ var ( const UnknownStatus = 255 -func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerkit.Container, cmd *exec.Cmd) (*process, error) { - if err := os.Mkdir(root, 0711); err != nil { - return nil, err - } +type processOpts struct { + root string + noPivotRoot bool + checkpoint string + c *containerkit.Container + cmd *exec.Cmd + exec bool + spec specs.Process + stdin io.Reader + stdout io.Writer + stderr io.Writer +} + +func newProcess(opts processOpts) (*process, error) { var ( - spec = c.Spec() + spec = opts.c.Spec() stdin, stdout, stderr string ) uid, gid, err := getRootIDs(spec) @@ -49,15 +59,15 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki }{ { path: &stdin, - v: c.Stdin, + v: opts.stdin, }, { path: &stdout, - v: c.Stdout, + v: opts.stdout, }, { path: &stderr, - v: c.Stderr, + v: opts.stderr, }, } { p, err := getFifoPath(t.v) @@ -67,20 +77,20 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki *t.path = p } p := &process{ - root: root, - cmd: cmd, + root: opts.root, + cmd: opts.cmd, done: make(chan struct{}), - spec: spec.Process, - exec: false, + spec: opts.spec, + exec: opts.exec, rootUid: uid, rootGid: gid, - noPivotRoot: noPivotRoot, - checkpoint: checkpoint, + noPivotRoot: opts.noPivotRoot, + checkpoint: opts.checkpoint, stdin: stdin, stdout: stdout, stderr: stderr, } - f, err := os.Create(filepath.Join(root, "process.json")) + f, err := os.Create(filepath.Join(opts.root, "process.json")) if err != nil { return nil, err } @@ -89,11 +99,11 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki if err != nil { return nil, err } - exit, err := getExitPipe(filepath.Join(root, "exit")) + exit, err := getExitPipe(filepath.Join(opts.root, "exit")) if err != nil { return nil, err } - control, err := getControlPipe(filepath.Join(root, "control")) + control, err := getControlPipe(filepath.Join(opts.root, "control")) if err != nil { return nil, err } @@ -199,6 +209,14 @@ func (p *process) FD() int { return int(p.exit.Fd()) } +func (p *process) Close() error { + return p.exit.Close() +} + +func (p *process) Remove() bool { + return true +} + func (p *process) Wait() (rst uint32, rerr error) { <-p.done data, err := ioutil.ReadFile(filepath.Join(p.root, "exitStatus")) @@ -228,6 +246,8 @@ func (p *process) Signal(s os.Signal) error { // same checks if the process is the same process originally launched func (p *process) same() (bool, error) { /// for backwards compat assume true if it is not set + p.mu.Lock() + defer p.mu.Unlock() if p.startTime == "" { return true, nil } @@ -245,14 +265,19 @@ func (p *process) same() (bool, error) { func (p *process) checkExited() { err := p.cmd.Wait() if err == nil { + p.mu.Lock() + if p.success { + p.mu.Unlock() + return + } p.success = true + p.mu.Unlock() } if same, _ := p.same(); same && p.hasPid() { // The process changed its PR_SET_PDEATHSIG, so force kill it logrus.Infof("containerd: (pid %v) has become an orphan, killing it", p.pid) if err := unix.Kill(p.pid, syscall.SIGKILL); err != nil && err != syscall.ESRCH { logrus.Errorf("containerd: unable to SIGKILL (pid %v): %v", p.pid, err) - close(p.done) return } // wait for the container process to exit @@ -263,7 +288,6 @@ func (p *process) checkExited() { time.Sleep(5 * time.Millisecond) } } - close(p.done) } func (p *process) hasPid() bool { @@ -273,12 +297,6 @@ func (p *process) hasPid() bool { return r } -func (p *process) setPid(pid int) { - p.mu.Lock() - p.pid = pid - p.mu.Unlock() -} - type pidResponse struct { pid int err error @@ -293,21 +311,30 @@ func (p *process) waitForCreate(timeout time.Duration) error { if resp.err != nil { return resp.err } - p.setPid(resp.pid) + p.mu.Lock() + p.pid = resp.pid started, err := readProcessStartTime(resp.pid) if err != nil { + if os.IsNotExist(err) { + // process already exited + p.success = true + p.mu.Unlock() + return nil + } logrus.Warnf("shim: unable to save starttime: %v", err) } p.startTime = started f, err := os.Create(filepath.Join(p.root, "process.json")) if err != nil { - logrus.Warnf("shim: unable to save starttime: %v", err) + logrus.Warnf("shim: unable to create process.json: %v", err) + p.mu.Unlock() return nil } defer f.Close() if err := json.NewEncoder(f).Encode(p); err != nil { - logrus.Warnf("shim: unable to save starttime: %v", err) + logrus.Warnf("shim: unable to encode process: %v", err) } + p.mu.Unlock() return nil case <-time.After(timeout): p.cmd.Process.Kill() diff --git a/shim/shim.go b/shim/shim.go index bb3fea0..4bb15ec 100644 --- a/shim/shim.go +++ b/shim/shim.go @@ -12,7 +12,6 @@ import ( "syscall" "time" - "github.com/Sirupsen/logrus" "github.com/docker/containerd/monitor" "github.com/docker/containerd/oci" "github.com/docker/containerkit" @@ -104,6 +103,7 @@ func Load(root string) (*Shim, error) { return nil, err } s.m = m + go s.startMonitor() dirs, err := ioutil.ReadDir(root) if err != nil { return nil, err @@ -204,6 +204,9 @@ func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate, root = filepath.Join(s.root, "init") cmd = s.command(c.ID(), c.Path(), s.runtime.Name()) ) + if err := os.Mkdir(root, 0711); err != nil { + return nil, err + } // exec the shim inside the state directory setup with the process // information for what is being run cmd.Dir = root @@ -211,17 +214,23 @@ func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate, cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } - p, err := s.startCommand(c, cmd) + p, err := s.startCommand(processOpts{ + spec: c.Spec().Process, + root: root, + noPivotRoot: s.noPivotRoot, + checkpoint: s.checkpoint, + c: c, + cmd: cmd, + stdin: c.Stdin, + stdout: c.Stdout, + stderr: c.Stderr, + }) if err != nil { return nil, err } - if err := s.m.Add(p); err != nil { - return nil, err - } s.pmu.Lock() s.processes["init"] = p s.pmu.Unlock() - f, err := os.Create(filepath.Join(s.root, "state.json")) if err != nil { return nil, err @@ -276,10 +285,38 @@ func (s *Shim) Delete(c *containerkit.Container) error { return os.RemoveAll(s.root) } -var errnotimpl = errors.New("NOT IMPL RIGHT NOW, CHILL") - func (s *Shim) Exec(c *containerkit.Container, p *containerkit.Process) (containerkit.ProcessDelegate, error) { - return nil, errnotimpl + root, err := ioutil.TempDir(s.root, "") + if err != nil { + return nil, err + } + cmd := s.command(c.ID(), c.Path(), s.runtime.Name()) + // exec the shim inside the state directory setup with the process + // information for what is being run + cmd.Dir = root + // make sure the shim is in a new process group + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + sp, err := s.startCommand(processOpts{ + exec: true, + spec: *p.Spec(), + root: root, + noPivotRoot: s.noPivotRoot, + checkpoint: s.checkpoint, + c: c, + cmd: cmd, + stdin: p.Stdin, + stdout: p.Stdout, + stderr: p.Stderr, + }) + if err != nil { + return nil, err + } + s.pmu.Lock() + s.processes[filepath.Base(root)] = sp + s.pmu.Unlock() + return sp, nil } func (s *Shim) Load(id string) (containerkit.ProcessDelegate, error) { @@ -296,12 +333,15 @@ func (s *Shim) getContainerInit() (*process, error) { return p, nil } -func (s *Shim) startCommand(c *containerkit.Container, cmd *exec.Cmd) (*process, error) { - p, err := newProcess(filepath.Join(s.root, "init"), s.noPivotRoot, s.checkpoint, c, cmd) +func (s *Shim) startCommand(opts processOpts) (*process, error) { + p, err := newProcess(opts) if err != nil { return nil, err } - if err := cmd.Start(); err != nil { + if err := s.m.Add(p); err != nil { + return nil, err + } + if err := opts.cmd.Start(); err != nil { close(p.done) if checkShimNotFound(err) { return nil, fmt.Errorf("%s not install on system", s.name) @@ -323,12 +363,11 @@ func (s *Shim) command(args ...string) *exec.Cmd { } func (s *Shim) startMonitor() { + go s.m.Run() + defer s.m.Close() for m := range s.m.Events() { p := m.(*process) close(p.done) - if err := s.m.Remove(p); err != nil { - logrus.Error(err) - } } }