Fix monitor with process events

Monitor was receiving multiple events for the process

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-10-06 15:06:34 -07:00
parent 1911191f94
commit a861ae9d18
6 changed files with 172 additions and 84 deletions

View file

@ -83,7 +83,7 @@ func start(log *os.File) error {
} }
defer func() { defer func() {
if err := p.Close(); err != nil { if err := p.Close(); err != nil {
writeMessage(log, "warn", err) writeMessage(log, "warn", fmt.Errorf("close stdio %s", err))
} }
}() }()
if err := p.create(); err != nil { if err := p.create(); err != nil {
@ -127,7 +127,6 @@ func start(log *os.File) error {
// Wait for all the childs this process may have // Wait for all the childs this process may have
// created (needed for exec and init processes when // created (needed for exec and init processes when
// they join another pid namespace) // they join another pid namespace)
osutils.Reap(true)
p.Wait() p.Wait()
return nil return nil
} }
@ -151,7 +150,7 @@ func start(log *os.File) error {
case 2: case 2:
// signal // signal
if err := syscall.Kill(p.pid(), syscall.Signal(msg.Width)); err != nil { if err := syscall.Kill(p.pid(), syscall.Signal(msg.Width)); err != nil {
writeMessage(log, "warn", err) writeMessage(log, "warn", fmt.Errorf("signal pid %d: %s", msg.Width, err))
} }
} }
} }

View file

@ -3,7 +3,9 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"os" "os"
"strconv"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@ -33,9 +35,9 @@ func runContainer() error {
return err return err
} }
// setup some stdio for our container // setup some stdio for our container
container.Stdin = Stdin() container.Stdin = Stdin("")
container.Stdout = Stdout() container.Stdout = Stdout("")
container.Stderr = Stderr() container.Stderr = Stderr("")
// go ahead and set the container in the create state and have it ready to start // go ahead and set the container in the create state and have it ready to start
logrus.Info("create container") logrus.Info("create container")
@ -49,12 +51,11 @@ func runContainer() error {
return err return err
} }
if exec { for i := 0; i < exec; i++ {
// start 10 exec processes giving the go var i to exec to stdout
for i := 0; i < 10; i++ {
process, err := container.NewProcess(&specs.Process{ process, err := container.NewProcess(&specs.Process{
Args: []string{ Args: []string{
"echo", fmt.Sprintf("sup from itteration %d", i), "sh", "-c",
"echo " + fmt.Sprintf("sup from itteration %d", i),
}, },
Env: env, Env: env,
Terminal: false, Terminal: false,
@ -63,9 +64,14 @@ func runContainer() error {
Capabilities: caps, Capabilities: caps,
}) })
process.Stdin = os.Stdin process.Stdin = Stdin(strconv.Itoa(i))
process.Stdout = os.Stdout stdout := Stdout(strconv.Itoa(i))
process.Stderr = os.Stderr
stderr := Stderr(strconv.Itoa(i))
go io.Copy(os.Stdout, stdout)
go io.Copy(os.Stdout, stderr)
process.Stdout = stdout
process.Stderr = stderr
if err := process.Start(); err != nil { if err := process.Start(); err != nil {
return err return err
@ -76,7 +82,6 @@ func runContainer() error {
} }
logrus.Infof("process %d returned with %d", i, procStatus) logrus.Infof("process %d returned with %d", i, procStatus)
} }
}
if load { if load {
if container, err = containerkit.LoadContainer(dockerContainer, runtime); err != nil { if container, err = containerkit.LoadContainer(dockerContainer, runtime); err != nil {
@ -101,13 +106,13 @@ func runContainer() error {
} }
var ( var (
exec bool exec int
load bool load bool
) )
// "Hooks do optional work. Drivers do mandatory work" // "Hooks do optional work. Drivers do mandatory work"
func main() { func main() {
flag.BoolVar(&exec, "exec", false, "run the execs") flag.IntVar(&exec, "exec", 0, "run n number of execs")
flag.BoolVar(&load, "load", false, "reload the container") flag.BoolVar(&load, "load", false, "reload the container")
flag.Parse() flag.Parse()
if err := osutils.SetSubreaper(1); err != nil { if err := osutils.SetSubreaper(1); err != nil {

View file

@ -182,8 +182,8 @@ func (t *testConfig) Spec(m *containerkit.Mount) (*specs.Spec, error) {
}, nil }, nil
} }
func Stdin() *os.File { func Stdin(n string) *os.File {
abs, err := filepath.Abs("stdin") abs, err := filepath.Abs("stdin" + n)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -197,8 +197,8 @@ func Stdin() *os.File {
return f return f
} }
func Stdout() *os.File { func Stdout(n string) *os.File {
abs, err := filepath.Abs("stdout") abs, err := filepath.Abs("stdout" + n)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -212,8 +212,8 @@ func Stdout() *os.File {
return f return f
} }
func Stderr() *os.File { func Stderr(n string) *os.File {
abs, err := filepath.Abs("stderr") abs, err := filepath.Abs("stderr" + n)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -1,6 +1,7 @@
package monitor package monitor
import ( import (
"io"
"sync" "sync"
"syscall" "syscall"
@ -10,6 +11,9 @@ import (
type Monitorable interface { type Monitorable interface {
FD() int FD() int
// Remove returns true if the monitorable should be removed
// from the event monitor under the lock of when the event was received
Remove() bool
} }
type Flusher interface { type Flusher interface {
@ -63,6 +67,10 @@ func (m *Monitor) Add(ma Monitorable) error {
func (m *Monitor) Remove(ma Monitorable) error { func (m *Monitor) Remove(ma Monitorable) error {
m.m.Lock() m.m.Lock()
defer m.m.Unlock() defer m.m.Unlock()
return m.remove(ma)
}
func (m *Monitor) remove(ma Monitorable) error {
fd := ma.FD() fd := ma.FD()
delete(m.receivers, fd) delete(m.receivers, fd)
return syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ return syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
@ -84,18 +92,28 @@ func (m *Monitor) Run() {
if err == syscall.EINTR { if err == syscall.EINTR {
continue continue
} }
logrus.WithField("error", err).Fatal("containerd: epoll wait") logrus.WithField("error", err).Fatal("shim: epoll wait")
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
fd := int(events[i].Fd) fd := int(events[i].Fd)
m.m.Lock() m.m.Lock()
r := m.receivers[fd] r := m.receivers[fd]
m.m.Unlock()
if f, ok := r.(Flusher); ok { if f, ok := r.(Flusher); ok {
if err := f.Flush(); err != nil { if err := f.Flush(); err != nil {
logrus.WithField("error", err).Fatal("containerd: flush event FD") logrus.WithField("error", err).Fatal("shim: flush event FD")
} }
} }
if r.Remove() {
if err := m.remove(r); err != nil {
logrus.WithField("error", err).Fatal("shim: remove event FD")
}
}
if f, ok := r.(io.Closer); ok {
if err := f.Close(); err != nil {
logrus.WithField("error", err).Fatal("shim: close event FD")
}
}
m.m.Unlock()
m.events <- r m.events <- r
} }
} }

View file

@ -31,12 +31,22 @@ var (
const UnknownStatus = 255 const UnknownStatus = 255
func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerkit.Container, cmd *exec.Cmd) (*process, error) { type processOpts struct {
if err := os.Mkdir(root, 0711); err != nil { root string
return nil, err noPivotRoot bool
checkpoint string
c *containerkit.Container
cmd *exec.Cmd
exec bool
spec specs.Process
stdin io.Reader
stdout io.Writer
stderr io.Writer
} }
func newProcess(opts processOpts) (*process, error) {
var ( var (
spec = c.Spec() spec = opts.c.Spec()
stdin, stdout, stderr string stdin, stdout, stderr string
) )
uid, gid, err := getRootIDs(spec) uid, gid, err := getRootIDs(spec)
@ -49,15 +59,15 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki
}{ }{
{ {
path: &stdin, path: &stdin,
v: c.Stdin, v: opts.stdin,
}, },
{ {
path: &stdout, path: &stdout,
v: c.Stdout, v: opts.stdout,
}, },
{ {
path: &stderr, path: &stderr,
v: c.Stderr, v: opts.stderr,
}, },
} { } {
p, err := getFifoPath(t.v) p, err := getFifoPath(t.v)
@ -67,20 +77,20 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki
*t.path = p *t.path = p
} }
p := &process{ p := &process{
root: root, root: opts.root,
cmd: cmd, cmd: opts.cmd,
done: make(chan struct{}), done: make(chan struct{}),
spec: spec.Process, spec: opts.spec,
exec: false, exec: opts.exec,
rootUid: uid, rootUid: uid,
rootGid: gid, rootGid: gid,
noPivotRoot: noPivotRoot, noPivotRoot: opts.noPivotRoot,
checkpoint: checkpoint, checkpoint: opts.checkpoint,
stdin: stdin, stdin: stdin,
stdout: stdout, stdout: stdout,
stderr: stderr, stderr: stderr,
} }
f, err := os.Create(filepath.Join(root, "process.json")) f, err := os.Create(filepath.Join(opts.root, "process.json"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -89,11 +99,11 @@ func newProcess(root string, noPivotRoot bool, checkpoint string, c *containerki
if err != nil { if err != nil {
return nil, err return nil, err
} }
exit, err := getExitPipe(filepath.Join(root, "exit")) exit, err := getExitPipe(filepath.Join(opts.root, "exit"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
control, err := getControlPipe(filepath.Join(root, "control")) control, err := getControlPipe(filepath.Join(opts.root, "control"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -199,6 +209,14 @@ func (p *process) FD() int {
return int(p.exit.Fd()) return int(p.exit.Fd())
} }
func (p *process) Close() error {
return p.exit.Close()
}
func (p *process) Remove() bool {
return true
}
func (p *process) Wait() (rst uint32, rerr error) { func (p *process) Wait() (rst uint32, rerr error) {
<-p.done <-p.done
data, err := ioutil.ReadFile(filepath.Join(p.root, "exitStatus")) data, err := ioutil.ReadFile(filepath.Join(p.root, "exitStatus"))
@ -228,6 +246,8 @@ func (p *process) Signal(s os.Signal) error {
// same checks if the process is the same process originally launched // same checks if the process is the same process originally launched
func (p *process) same() (bool, error) { func (p *process) same() (bool, error) {
/// for backwards compat assume true if it is not set /// for backwards compat assume true if it is not set
p.mu.Lock()
defer p.mu.Unlock()
if p.startTime == "" { if p.startTime == "" {
return true, nil return true, nil
} }
@ -245,14 +265,19 @@ func (p *process) same() (bool, error) {
func (p *process) checkExited() { func (p *process) checkExited() {
err := p.cmd.Wait() err := p.cmd.Wait()
if err == nil { if err == nil {
p.mu.Lock()
if p.success {
p.mu.Unlock()
return
}
p.success = true p.success = true
p.mu.Unlock()
} }
if same, _ := p.same(); same && p.hasPid() { if same, _ := p.same(); same && p.hasPid() {
// The process changed its PR_SET_PDEATHSIG, so force kill it // The process changed its PR_SET_PDEATHSIG, so force kill it
logrus.Infof("containerd: (pid %v) has become an orphan, killing it", p.pid) logrus.Infof("containerd: (pid %v) has become an orphan, killing it", p.pid)
if err := unix.Kill(p.pid, syscall.SIGKILL); err != nil && err != syscall.ESRCH { if err := unix.Kill(p.pid, syscall.SIGKILL); err != nil && err != syscall.ESRCH {
logrus.Errorf("containerd: unable to SIGKILL (pid %v): %v", p.pid, err) logrus.Errorf("containerd: unable to SIGKILL (pid %v): %v", p.pid, err)
close(p.done)
return return
} }
// wait for the container process to exit // wait for the container process to exit
@ -263,7 +288,6 @@ func (p *process) checkExited() {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
} }
close(p.done)
} }
func (p *process) hasPid() bool { func (p *process) hasPid() bool {
@ -273,12 +297,6 @@ func (p *process) hasPid() bool {
return r return r
} }
func (p *process) setPid(pid int) {
p.mu.Lock()
p.pid = pid
p.mu.Unlock()
}
type pidResponse struct { type pidResponse struct {
pid int pid int
err error err error
@ -293,21 +311,30 @@ func (p *process) waitForCreate(timeout time.Duration) error {
if resp.err != nil { if resp.err != nil {
return resp.err return resp.err
} }
p.setPid(resp.pid) p.mu.Lock()
p.pid = resp.pid
started, err := readProcessStartTime(resp.pid) started, err := readProcessStartTime(resp.pid)
if err != nil { if err != nil {
if os.IsNotExist(err) {
// process already exited
p.success = true
p.mu.Unlock()
return nil
}
logrus.Warnf("shim: unable to save starttime: %v", err) logrus.Warnf("shim: unable to save starttime: %v", err)
} }
p.startTime = started p.startTime = started
f, err := os.Create(filepath.Join(p.root, "process.json")) f, err := os.Create(filepath.Join(p.root, "process.json"))
if err != nil { if err != nil {
logrus.Warnf("shim: unable to save starttime: %v", err) logrus.Warnf("shim: unable to create process.json: %v", err)
p.mu.Unlock()
return nil return nil
} }
defer f.Close() defer f.Close()
if err := json.NewEncoder(f).Encode(p); err != nil { if err := json.NewEncoder(f).Encode(p); err != nil {
logrus.Warnf("shim: unable to save starttime: %v", err) logrus.Warnf("shim: unable to encode process: %v", err)
} }
p.mu.Unlock()
return nil return nil
case <-time.After(timeout): case <-time.After(timeout):
p.cmd.Process.Kill() p.cmd.Process.Kill()

View file

@ -12,7 +12,6 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/monitor" "github.com/docker/containerd/monitor"
"github.com/docker/containerd/oci" "github.com/docker/containerd/oci"
"github.com/docker/containerkit" "github.com/docker/containerkit"
@ -104,6 +103,7 @@ func Load(root string) (*Shim, error) {
return nil, err return nil, err
} }
s.m = m s.m = m
go s.startMonitor()
dirs, err := ioutil.ReadDir(root) dirs, err := ioutil.ReadDir(root)
if err != nil { if err != nil {
return nil, err return nil, err
@ -204,6 +204,9 @@ func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate,
root = filepath.Join(s.root, "init") root = filepath.Join(s.root, "init")
cmd = s.command(c.ID(), c.Path(), s.runtime.Name()) cmd = s.command(c.ID(), c.Path(), s.runtime.Name())
) )
if err := os.Mkdir(root, 0711); err != nil {
return nil, err
}
// exec the shim inside the state directory setup with the process // exec the shim inside the state directory setup with the process
// information for what is being run // information for what is being run
cmd.Dir = root cmd.Dir = root
@ -211,17 +214,23 @@ func (s *Shim) Create(c *containerkit.Container) (containerkit.ProcessDelegate,
cmd.SysProcAttr = &syscall.SysProcAttr{ cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true, Setpgid: true,
} }
p, err := s.startCommand(c, cmd) p, err := s.startCommand(processOpts{
spec: c.Spec().Process,
root: root,
noPivotRoot: s.noPivotRoot,
checkpoint: s.checkpoint,
c: c,
cmd: cmd,
stdin: c.Stdin,
stdout: c.Stdout,
stderr: c.Stderr,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := s.m.Add(p); err != nil {
return nil, err
}
s.pmu.Lock() s.pmu.Lock()
s.processes["init"] = p s.processes["init"] = p
s.pmu.Unlock() s.pmu.Unlock()
f, err := os.Create(filepath.Join(s.root, "state.json")) f, err := os.Create(filepath.Join(s.root, "state.json"))
if err != nil { if err != nil {
return nil, err return nil, err
@ -276,10 +285,38 @@ func (s *Shim) Delete(c *containerkit.Container) error {
return os.RemoveAll(s.root) return os.RemoveAll(s.root)
} }
var errnotimpl = errors.New("NOT IMPL RIGHT NOW, CHILL")
func (s *Shim) Exec(c *containerkit.Container, p *containerkit.Process) (containerkit.ProcessDelegate, error) { func (s *Shim) Exec(c *containerkit.Container, p *containerkit.Process) (containerkit.ProcessDelegate, error) {
return nil, errnotimpl root, err := ioutil.TempDir(s.root, "")
if err != nil {
return nil, err
}
cmd := s.command(c.ID(), c.Path(), s.runtime.Name())
// exec the shim inside the state directory setup with the process
// information for what is being run
cmd.Dir = root
// make sure the shim is in a new process group
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
sp, err := s.startCommand(processOpts{
exec: true,
spec: *p.Spec(),
root: root,
noPivotRoot: s.noPivotRoot,
checkpoint: s.checkpoint,
c: c,
cmd: cmd,
stdin: p.Stdin,
stdout: p.Stdout,
stderr: p.Stderr,
})
if err != nil {
return nil, err
}
s.pmu.Lock()
s.processes[filepath.Base(root)] = sp
s.pmu.Unlock()
return sp, nil
} }
func (s *Shim) Load(id string) (containerkit.ProcessDelegate, error) { func (s *Shim) Load(id string) (containerkit.ProcessDelegate, error) {
@ -296,12 +333,15 @@ func (s *Shim) getContainerInit() (*process, error) {
return p, nil return p, nil
} }
func (s *Shim) startCommand(c *containerkit.Container, cmd *exec.Cmd) (*process, error) { func (s *Shim) startCommand(opts processOpts) (*process, error) {
p, err := newProcess(filepath.Join(s.root, "init"), s.noPivotRoot, s.checkpoint, c, cmd) p, err := newProcess(opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := cmd.Start(); err != nil { if err := s.m.Add(p); err != nil {
return nil, err
}
if err := opts.cmd.Start(); err != nil {
close(p.done) close(p.done)
if checkShimNotFound(err) { if checkShimNotFound(err) {
return nil, fmt.Errorf("%s not install on system", s.name) return nil, fmt.Errorf("%s not install on system", s.name)
@ -323,12 +363,11 @@ func (s *Shim) command(args ...string) *exec.Cmd {
} }
func (s *Shim) startMonitor() { func (s *Shim) startMonitor() {
go s.m.Run()
defer s.m.Close()
for m := range s.m.Events() { for m := range s.m.Events() {
p := m.(*process) p := m.(*process)
close(p.done) close(p.done)
if err := s.m.Remove(p); err != nil {
logrus.Error(err)
}
} }
} }