058eea362a
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
132 lines
3.1 KiB
Go
132 lines
3.1 KiB
Go
package supervisor
|
|
|
|
import (
|
|
"sync"
|
|
"syscall"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/containerd/archutils"
|
|
"github.com/docker/containerd/runtime"
|
|
)
|
|
|
|
// NewMonitor starts a new process monitor and returns it
|
|
func NewMonitor() (*Monitor, error) {
|
|
m := &Monitor{
|
|
receivers: make(map[int]interface{}),
|
|
exits: make(chan runtime.Process, 1024),
|
|
ooms: make(chan string, 1024),
|
|
}
|
|
fd, err := archutils.EpollCreate1(0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.epollFd = fd
|
|
go m.start()
|
|
return m, nil
|
|
}
|
|
|
|
// Monitor represents a runtime.Process monitor
|
|
type Monitor struct {
|
|
m sync.Mutex
|
|
receivers map[int]interface{}
|
|
exits chan runtime.Process
|
|
ooms chan string
|
|
epollFd int
|
|
}
|
|
|
|
// Exits returns the channel used to notify of a process exit
|
|
func (m *Monitor) Exits() chan runtime.Process {
|
|
return m.exits
|
|
}
|
|
|
|
// OOMs returns the channel used to notify of a container exit due to OOM
|
|
func (m *Monitor) OOMs() chan string {
|
|
return m.ooms
|
|
}
|
|
|
|
// Monitor adds a process to the list of the one being monitored
|
|
func (m *Monitor) Monitor(p runtime.Process) error {
|
|
m.m.Lock()
|
|
defer m.m.Unlock()
|
|
fd := p.ExitFD()
|
|
event := syscall.EpollEvent{
|
|
Fd: int32(fd),
|
|
Events: syscall.EPOLLHUP,
|
|
}
|
|
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
|
|
return err
|
|
}
|
|
m.receivers[fd] = p
|
|
return nil
|
|
}
|
|
|
|
// MonitorOOM adds a container to the list of the ones monitored for OOM
|
|
func (m *Monitor) MonitorOOM(c runtime.Container) error {
|
|
m.m.Lock()
|
|
defer m.m.Unlock()
|
|
o, err := c.OOM()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fd := o.FD()
|
|
event := syscall.EpollEvent{
|
|
Fd: int32(fd),
|
|
Events: syscall.EPOLLHUP | syscall.EPOLLIN,
|
|
}
|
|
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
|
|
return err
|
|
}
|
|
m.receivers[fd] = o
|
|
return nil
|
|
}
|
|
|
|
// Close cleans up resources allocated by NewMonitor()
|
|
func (m *Monitor) Close() error {
|
|
return syscall.Close(m.epollFd)
|
|
}
|
|
|
|
func (m *Monitor) start() {
|
|
var events [128]syscall.EpollEvent
|
|
for {
|
|
n, err := archutils.EpollWait(m.epollFd, events[:], -1)
|
|
if err != nil {
|
|
if err == syscall.EINTR {
|
|
continue
|
|
}
|
|
logrus.WithField("error", err).Fatal("containerd: epoll wait")
|
|
}
|
|
// process events
|
|
for i := 0; i < n; i++ {
|
|
fd := int(events[i].Fd)
|
|
m.m.Lock()
|
|
r := m.receivers[fd]
|
|
switch t := r.(type) {
|
|
case runtime.Process:
|
|
if events[i].Events == syscall.EPOLLHUP {
|
|
delete(m.receivers, fd)
|
|
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
|
|
Events: syscall.EPOLLHUP,
|
|
Fd: int32(fd),
|
|
}); err != nil {
|
|
logrus.WithField("error", err).Error("containerd: epoll remove fd")
|
|
}
|
|
if err := t.Close(); err != nil {
|
|
logrus.WithField("error", err).Error("containerd: close process IO")
|
|
}
|
|
m.exits <- t
|
|
}
|
|
case runtime.OOM:
|
|
// always flush the event fd
|
|
t.Flush()
|
|
if t.Removed() {
|
|
delete(m.receivers, fd)
|
|
// epoll will remove the fd from its set after it has been closed
|
|
t.Close()
|
|
} else {
|
|
m.ooms <- t.ContainerID()
|
|
}
|
|
}
|
|
m.m.Unlock()
|
|
}
|
|
}
|
|
}
|