Add oom support to events
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
70a8c1ec3f
commit
500ca74f38
10 changed files with 201 additions and 256 deletions
|
@ -10,8 +10,9 @@ import (
|
|||
|
||||
func NewMonitor() (*Monitor, error) {
|
||||
m := &Monitor{
|
||||
processes: make(map[int]runtime.Process),
|
||||
receivers: make(map[int]interface{}),
|
||||
exits: make(chan runtime.Process, 1024),
|
||||
ooms: make(chan string, 1024),
|
||||
}
|
||||
fd, err := syscall.EpollCreate1(0)
|
||||
if err != nil {
|
||||
|
@ -24,8 +25,9 @@ func NewMonitor() (*Monitor, error) {
|
|||
|
||||
type Monitor struct {
|
||||
m sync.Mutex
|
||||
processes map[int]runtime.Process
|
||||
receivers map[int]interface{}
|
||||
exits chan runtime.Process
|
||||
ooms chan string
|
||||
epollFd int
|
||||
}
|
||||
|
||||
|
@ -33,6 +35,10 @@ func (m *Monitor) Exits() chan runtime.Process {
|
|||
return m.exits
|
||||
}
|
||||
|
||||
func (m *Monitor) OOMs() chan string {
|
||||
return m.ooms
|
||||
}
|
||||
|
||||
func (m *Monitor) Monitor(p runtime.Process) error {
|
||||
m.m.Lock()
|
||||
defer m.m.Unlock()
|
||||
|
@ -45,7 +51,27 @@ func (m *Monitor) Monitor(p runtime.Process) error {
|
|||
return err
|
||||
}
|
||||
EpollFdCounter.Inc(1)
|
||||
m.processes[fd] = p
|
||||
m.receivers[fd] = p
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Monitor) MonitorOOM(c runtime.Container) error {
|
||||
m.m.Lock()
|
||||
defer m.m.Unlock()
|
||||
o, err := c.OOM()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fd := o.FD()
|
||||
event := syscall.EpollEvent{
|
||||
Fd: int32(fd),
|
||||
Events: syscall.EPOLLHUP | syscall.EPOLLIN,
|
||||
}
|
||||
if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
|
||||
return err
|
||||
}
|
||||
EpollFdCounter.Inc(1)
|
||||
m.receivers[fd] = o
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -65,24 +91,38 @@ func (m *Monitor) start() {
|
|||
}
|
||||
// process events
|
||||
for i := 0; i < n; i++ {
|
||||
if events[i].Events == syscall.EPOLLHUP {
|
||||
fd := int(events[i].Fd)
|
||||
m.m.Lock()
|
||||
proc := m.processes[fd]
|
||||
delete(m.processes, fd)
|
||||
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
|
||||
Events: syscall.EPOLLHUP,
|
||||
Fd: int32(fd),
|
||||
}); err != nil {
|
||||
logrus.WithField("error", err).Fatal("containerd: epoll remove fd")
|
||||
fd := int(events[i].Fd)
|
||||
m.m.Lock()
|
||||
r := m.receivers[fd]
|
||||
switch t := r.(type) {
|
||||
case runtime.Process:
|
||||
if events[i].Events == syscall.EPOLLHUP {
|
||||
delete(m.receivers, fd)
|
||||
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
|
||||
Events: syscall.EPOLLHUP,
|
||||
Fd: int32(fd),
|
||||
}); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: epoll remove fd")
|
||||
}
|
||||
if err := t.Close(); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: close process IO")
|
||||
}
|
||||
EpollFdCounter.Dec(1)
|
||||
m.exits <- t
|
||||
}
|
||||
EpollFdCounter.Dec(1)
|
||||
if err := proc.Close(); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: close process IO")
|
||||
case runtime.OOM:
|
||||
// always flush the event fd
|
||||
t.Flush()
|
||||
if t.Removed() {
|
||||
delete(m.receivers, fd)
|
||||
// epoll will remove the fd from its set after it has been closed
|
||||
t.Close()
|
||||
EpollFdCounter.Dec(1)
|
||||
} else {
|
||||
m.ooms <- t.ContainerID()
|
||||
}
|
||||
m.m.Unlock()
|
||||
m.exits <- proc
|
||||
}
|
||||
m.m.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
22
supervisor/oom.go
Normal file
22
supervisor/oom.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package supervisor
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
type OOMTask struct {
|
||||
baseTask
|
||||
ID string
|
||||
}
|
||||
|
||||
func (s *Supervisor) oom(t *OOMTask) error {
|
||||
logrus.WithField("id", t.ID).Debug("containerd: container oom")
|
||||
s.notifySubscribers(Event{
|
||||
Timestamp: time.Now(),
|
||||
ID: t.ID,
|
||||
Type: "oom",
|
||||
})
|
||||
return nil
|
||||
}
|
|
@ -10,7 +10,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/containerd/chanotify"
|
||||
"github.com/docker/containerd/runtime"
|
||||
)
|
||||
|
||||
|
@ -45,20 +44,8 @@ func New(stateDir string, oom bool, runtimeName string) (*Supervisor, error) {
|
|||
if err := setupEventLog(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if oom {
|
||||
s.notifier = chanotify.New()
|
||||
|
||||
/*
|
||||
go func() {
|
||||
for id := range s.notifier.Chan() {
|
||||
e := NewTask(OOMTaskType)
|
||||
e.ID = id.(string)
|
||||
s.SendTask(e)
|
||||
}
|
||||
}()
|
||||
*/
|
||||
}
|
||||
go s.exitHandler()
|
||||
go s.oomHandler()
|
||||
if err := s.restore(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -126,7 +113,6 @@ type Supervisor struct {
|
|||
subscriberLock sync.RWMutex
|
||||
subscribers map[chan Event]struct{}
|
||||
machine Machine
|
||||
notifier *chanotify.Notifier
|
||||
tasks chan Task
|
||||
monitor *Monitor
|
||||
eventLog []Event
|
||||
|
@ -244,6 +230,15 @@ func (s *Supervisor) exitHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) oomHandler() {
|
||||
for id := range s.monitor.OOMs() {
|
||||
e := &OOMTask{
|
||||
ID: id,
|
||||
}
|
||||
s.SendTask(e)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) monitorProcess(p runtime.Process) error {
|
||||
return s.monitor.Monitor(p)
|
||||
}
|
||||
|
@ -266,10 +261,14 @@ func (s *Supervisor) restore() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ContainersCounter.Inc(1)
|
||||
s.containers[id] = &containerInfo{
|
||||
container: container,
|
||||
}
|
||||
if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
|
||||
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
||||
}
|
||||
logrus.WithField("id", id).Debug("containerd: container restored")
|
||||
var exitedProcesses []runtime.Process
|
||||
for _, p := range processes {
|
||||
|
|
|
@ -27,6 +27,8 @@ func (s *Supervisor) handleTask(i Task) {
|
|||
err = s.updateContainer(t)
|
||||
case *UpdateProcessTask:
|
||||
err = s.updateProcess(t)
|
||||
case *OOMTask:
|
||||
err = s.oom(t)
|
||||
default:
|
||||
err = ErrUnknownTask
|
||||
}
|
||||
|
|
|
@ -51,16 +51,9 @@ func (w *worker) Start() {
|
|||
w.s.SendTask(evt)
|
||||
continue
|
||||
}
|
||||
/*
|
||||
if w.s.notifier != nil {
|
||||
n, err := t.Container.OOM()
|
||||
if err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
||||
} else {
|
||||
w.s.notifier.Add(n, t.Container.ID())
|
||||
}
|
||||
}
|
||||
*/
|
||||
if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited {
|
||||
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
||||
}
|
||||
if err := w.s.monitorProcess(process); err != nil {
|
||||
logrus.WithField("error", err).Error("containerd: add process to monitor")
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue