diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go deleted file mode 100644 index 940cb99..0000000 --- a/chanotify/chanotify.go +++ /dev/null @@ -1,94 +0,0 @@ -package chanotify - -import ( - "errors" - "sync" -) - -// Notifier can effectively notify you about receiving from particular channels. -// It operates with pairs <-chan struct{} <-> string which is notification -// channel and its identificator respectively. -// Notification channel is <-chan struc{}, each send to which is spawn -// notification from Notifier, close doesn't spawn anything and removes channel -// from Notifier. -type Notifier struct { - c chan interface{} - - m sync.Mutex // guards doneCh - doneCh map[interface{}]chan struct{} - closed bool -} - -// New returns a new notifier. A notifier must be closed by -// calling, (*Notifier).Close, once it is no longer in use. -func New() *Notifier { - s := &Notifier{ - c: make(chan interface{}), - doneCh: make(map[interface{}]chan struct{}), - } - return s -} - -// Chan returns channel on which client listen for notifications. -// IDs of notifications is sent to the returned channel. -func (n *Notifier) Chan() <-chan interface{} { - return n.c -} - -// Add adds new notification channel to the notifier. -// Multiple registrations of the same ID is not allowed. -func (n *Notifier) Add(id interface{}, ch <-chan struct{}) error { - n.m.Lock() - defer n.m.Unlock() - - if n.closed { - return errors.New("notifier closed; cannot add the channel on the notifier") - } - if _, ok := n.doneCh[id]; ok { - return errors.New("cannot register duplicate key") - } - - done := make(chan struct{}) - n.doneCh[id] = done - - n.startWorker(ch, id, done) - return nil -} - -func (n *Notifier) killWorker(id interface{}, done chan struct{}) { - n.m.Lock() - delete(n.doneCh, id) - n.m.Unlock() -} - -func (n *Notifier) startWorker(ch <-chan struct{}, id interface{}, done chan struct{}) { - go func() { - for { - select { - case _, ok := <-ch: - if !ok { - // If the channel is closed, we don't need the goroutine - // or the done channel mechanism running anymore. - n.killWorker(id, done) - return - } - n.c <- id - case <-done: - // We don't need this goroutine running anymore, return. - n.killWorker(id, done) - return - } - } - }() -} - -// Close closes the notifier and releases its underlying resources. -func (n *Notifier) Close() { - n.m.Lock() - defer n.m.Unlock() - for _, done := range n.doneCh { - close(done) - } - close(n.c) - n.closed = true -} diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go deleted file mode 100644 index ef770bb..0000000 --- a/chanotify/chanotify_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package chanotify - -import ( - "sync" - "testing" - "time" -) - -func TestNotifier(t *testing.T) { - s := New() - ch1 := make(chan struct{}, 1) - ch2 := make(chan struct{}, 1) - id1 := "1" - id2 := "2" - - if err := s.Add(id1, ch1); err != nil { - t.Fatal(err) - } - if err := s.Add(id2, ch2); err != nil { - t.Fatal(err) - } - s.m.Lock() - if len(s.doneCh) != 2 { - t.Fatalf("want 2 channels, got %d", len(s.doneCh)) - } - s.m.Unlock() - ch1 <- struct{}{} - if got, want := <-s.Chan(), id1; got != want { - t.Fatalf("got %v; want %v", got, want) - } - ch2 <- struct{}{} - if got, want := <-s.Chan(), id2; got != want { - t.Fatalf("got %v; want %v", got, want) - } - close(ch1) - close(ch2) - time.Sleep(100 * time.Millisecond) - s.m.Lock() - if len(s.doneCh) != 0 { - t.Fatalf("want 0 channels, got %d", len(s.doneCh)) - } - s.m.Unlock() -} - -func TestConcurrentNotifier(t *testing.T) { - s := New() - var chs []chan struct{} - for i := 0; i < 8; i++ { - ch := make(chan struct{}, 2) - if err := s.Add(i, ch); err != nil { - t.Fatal(err) - } - chs = append(chs, ch) - } - testCounter := make(map[interface{}]int) - done := make(chan struct{}) - go func() { - for id := range s.Chan() { - testCounter[id]++ - } - close(done) - }() - var wg sync.WaitGroup - for _, ch := range chs { - wg.Add(1) - go func(ch chan struct{}) { - ch <- struct{}{} - ch <- struct{}{} - close(ch) - wg.Done() - }(ch) - } - wg.Wait() - // wait for notifications - time.Sleep(1 * time.Second) - s.Close() - <-done - if len(testCounter) != 8 { - t.Fatalf("expect to find exactly 8 distinct ids, got %d", len(testCounter)) - } - for id, c := range testCounter { - if c != 2 { - t.Fatalf("Expected to find exactly 2 id %s, but got %d", id, c) - } - } -} - -func TestAddToBlocked(t *testing.T) { - s := New() - ch := make(chan struct{}, 1) - id := 1 - go func() { - // give some time to start first select - time.Sleep(1 * time.Second) - if err := s.Add(id, ch); err != nil { - t.Fatal(err) - } - ch <- struct{}{} - }() - if got, want := <-s.Chan(), id; got != want { - t.Fatalf("got %v; want %v", got, want) - } -} - -func TestAddDuplicate(t *testing.T) { - s := New() - ch1 := make(chan struct{}, 1) - ch2 := make(chan struct{}, 1) - - if err := s.Add(1, ch1); err != nil { - t.Fatalf("cannot add; err = %v", err) - } - - if err := s.Add(1, ch2); err == nil { - t.Fatalf("duplicate keys are not allowed; but Add succeeded") - } -} diff --git a/runtime/container.go b/runtime/container.go index f43ed56..e34805e 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -2,6 +2,7 @@ package runtime import ( "encoding/json" + "io" "io/ioutil" "os" "path/filepath" @@ -46,7 +47,15 @@ type Container interface { // Name or path of the OCI compliant runtime used to execute the container Runtime() string // OOM signals the channel if the container received an OOM notification - // OOM() (<-chan struct{}, error) + OOM() (OOM, error) +} + +type OOM interface { + io.Closer + FD() int + ContainerID() string + Flush() + Removed() bool } type Stdio struct { @@ -159,8 +168,8 @@ type container struct { bundle string runtime string processes map[string]*process - stdio Stdio labels []string + oomFds []int } func (c *container) ID() string { diff --git a/runtime/container_linux.go b/runtime/container_linux.go index ae660f8..77c8d39 100644 --- a/runtime/container_linux.go +++ b/runtime/container_linux.go @@ -2,6 +2,7 @@ package runtime import ( "encoding/json" + "fmt" "io/ioutil" "os" "os/exec" @@ -240,6 +241,59 @@ func (c *container) Stats() (*Stat, error) { }, nil } +func (c *container) OOM() (OOM, error) { + container, err := c.getLibctContainer() + if err != nil { + if lerr, ok := err.(libcontainer.Error); ok { + // with oom registration sometimes the container can run, exit, and be destroyed + // faster than we can get the state back so we can just ignore this + if lerr.Code() == libcontainer.ContainerNotExists { + return nil, ErrContainerExited + } + } + return nil, err + } + state, err := container.State() + if err != nil { + return nil, err + } + memoryPath := state.CgroupPaths["memory"] + return c.getMemeoryEventFD(memoryPath) +} + +func (c *container) getMemeoryEventFD(root string) (*oom, error) { + f, err := os.Open(filepath.Join(root, "memory.oom_control")) + if err != nil { + return nil, err + } + fd, _, serr := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, syscall.FD_CLOEXEC, 0) + if serr != 0 { + f.Close() + return nil, serr + } + if err := c.writeEventFD(root, int(f.Fd()), int(fd)); err != nil { + syscall.Close(int(fd)) + f.Close() + return nil, err + } + return &oom{ + root: root, + id: c.id, + eventfd: int(fd), + control: f, + }, nil +} + +func (c *container) writeEventFD(root string, cfd, efd int) error { + f, err := os.OpenFile(filepath.Join(root, "cgroup.event_control"), os.O_WRONLY, 0) + if err != nil { + return err + } + defer f.Close() + _, err = f.WriteString(fmt.Sprintf("%d %d", efd, cfd)) + return err +} + func waitForStart(p *process, cmd *exec.Cmd) error { for i := 0; i < 50; i++ { if _, err := p.getPidFromFile(); err != nil { @@ -270,3 +324,36 @@ func isAlive(cmd *exec.Cmd) (bool, error) { } return true, nil } + +type oom struct { + id string + root string + control *os.File + eventfd int +} + +func (o *oom) ContainerID() string { + return o.id +} + +func (o *oom) FD() int { + return o.eventfd +} + +func (o *oom) Flush() { + buf := make([]byte, 8) + syscall.Read(o.eventfd, buf) +} + +func (o *oom) Removed() bool { + _, err := os.Lstat(filepath.Join(o.root, "cgroup.event_control")) + return os.IsNotExist(err) +} + +func (o *oom) Close() error { + err := syscall.Close(o.eventfd) + if cerr := o.control.Close(); err == nil { + err = cerr + } + return err +} diff --git a/runtime/container_windows.go b/runtime/container_windows.go index d13dfa3..47b6b49 100644 --- a/runtime/container_windows.go +++ b/runtime/container_windows.go @@ -60,3 +60,7 @@ func (c *container) Pids() ([]int, error) { func (c *container) Stats() (*Stat, error) { return nil, errors.New("Stats not yet implemented on Windows") } + +func (c *container) OOM() (OOM, error) { + return nil, errors.New("OOM not yet implemented on Windows") +} diff --git a/supervisor/monitor_linux.go b/supervisor/monitor_linux.go index 3a3046b..adf4ffd 100644 --- a/supervisor/monitor_linux.go +++ b/supervisor/monitor_linux.go @@ -10,8 +10,9 @@ import ( func NewMonitor() (*Monitor, error) { m := &Monitor{ - processes: make(map[int]runtime.Process), + receivers: make(map[int]interface{}), exits: make(chan runtime.Process, 1024), + ooms: make(chan string, 1024), } fd, err := syscall.EpollCreate1(0) if err != nil { @@ -24,8 +25,9 @@ func NewMonitor() (*Monitor, error) { type Monitor struct { m sync.Mutex - processes map[int]runtime.Process + receivers map[int]interface{} exits chan runtime.Process + ooms chan string epollFd int } @@ -33,6 +35,10 @@ func (m *Monitor) Exits() chan runtime.Process { return m.exits } +func (m *Monitor) OOMs() chan string { + return m.ooms +} + func (m *Monitor) Monitor(p runtime.Process) error { m.m.Lock() defer m.m.Unlock() @@ -45,7 +51,27 @@ func (m *Monitor) Monitor(p runtime.Process) error { return err } EpollFdCounter.Inc(1) - m.processes[fd] = p + m.receivers[fd] = p + return nil +} + +func (m *Monitor) MonitorOOM(c runtime.Container) error { + m.m.Lock() + defer m.m.Unlock() + o, err := c.OOM() + if err != nil { + return err + } + fd := o.FD() + event := syscall.EpollEvent{ + Fd: int32(fd), + Events: syscall.EPOLLHUP | syscall.EPOLLIN, + } + if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { + return err + } + EpollFdCounter.Inc(1) + m.receivers[fd] = o return nil } @@ -65,24 +91,38 @@ func (m *Monitor) start() { } // process events for i := 0; i < n; i++ { - if events[i].Events == syscall.EPOLLHUP { - fd := int(events[i].Fd) - m.m.Lock() - proc := m.processes[fd] - delete(m.processes, fd) - if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ - Events: syscall.EPOLLHUP, - Fd: int32(fd), - }); err != nil { - logrus.WithField("error", err).Fatal("containerd: epoll remove fd") + fd := int(events[i].Fd) + m.m.Lock() + r := m.receivers[fd] + switch t := r.(type) { + case runtime.Process: + if events[i].Events == syscall.EPOLLHUP { + delete(m.receivers, fd) + if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ + Events: syscall.EPOLLHUP, + Fd: int32(fd), + }); err != nil { + logrus.WithField("error", err).Error("containerd: epoll remove fd") + } + if err := t.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close process IO") + } + EpollFdCounter.Dec(1) + m.exits <- t } - EpollFdCounter.Dec(1) - if err := proc.Close(); err != nil { - logrus.WithField("error", err).Error("containerd: close process IO") + case runtime.OOM: + // always flush the event fd + t.Flush() + if t.Removed() { + delete(m.receivers, fd) + // epoll will remove the fd from its set after it has been closed + t.Close() + EpollFdCounter.Dec(1) + } else { + m.ooms <- t.ContainerID() } - m.m.Unlock() - m.exits <- proc } + m.m.Unlock() } } } diff --git a/supervisor/oom.go b/supervisor/oom.go new file mode 100644 index 0000000..58eb094 --- /dev/null +++ b/supervisor/oom.go @@ -0,0 +1,22 @@ +package supervisor + +import ( + "time" + + "github.com/Sirupsen/logrus" +) + +type OOMTask struct { + baseTask + ID string +} + +func (s *Supervisor) oom(t *OOMTask) error { + logrus.WithField("id", t.ID).Debug("containerd: container oom") + s.notifySubscribers(Event{ + Timestamp: time.Now(), + ID: t.ID, + Type: "oom", + }) + return nil +} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 1d42525..b0ec844 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -10,7 +10,6 @@ import ( "time" "github.com/Sirupsen/logrus" - "github.com/docker/containerd/chanotify" "github.com/docker/containerd/runtime" ) @@ -45,20 +44,8 @@ func New(stateDir string, oom bool, runtimeName string) (*Supervisor, error) { if err := setupEventLog(s); err != nil { return nil, err } - if oom { - s.notifier = chanotify.New() - - /* - go func() { - for id := range s.notifier.Chan() { - e := NewTask(OOMTaskType) - e.ID = id.(string) - s.SendTask(e) - } - }() - */ - } go s.exitHandler() + go s.oomHandler() if err := s.restore(); err != nil { return nil, err } @@ -126,7 +113,6 @@ type Supervisor struct { subscriberLock sync.RWMutex subscribers map[chan Event]struct{} machine Machine - notifier *chanotify.Notifier tasks chan Task monitor *Monitor eventLog []Event @@ -244,6 +230,15 @@ func (s *Supervisor) exitHandler() { } } +func (s *Supervisor) oomHandler() { + for id := range s.monitor.OOMs() { + e := &OOMTask{ + ID: id, + } + s.SendTask(e) + } +} + func (s *Supervisor) monitorProcess(p runtime.Process) error { return s.monitor.Monitor(p) } @@ -266,10 +261,14 @@ func (s *Supervisor) restore() error { if err != nil { return err } + ContainersCounter.Inc(1) s.containers[id] = &containerInfo{ container: container, } + if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited { + logrus.WithField("error", err).Error("containerd: notify OOM events") + } logrus.WithField("id", id).Debug("containerd: container restored") var exitedProcesses []runtime.Process for _, p := range processes { diff --git a/supervisor/supervisor_linux.go b/supervisor/supervisor_linux.go index 775fd17..95a3776 100644 --- a/supervisor/supervisor_linux.go +++ b/supervisor/supervisor_linux.go @@ -27,6 +27,8 @@ func (s *Supervisor) handleTask(i Task) { err = s.updateContainer(t) case *UpdateProcessTask: err = s.updateProcess(t) + case *OOMTask: + err = s.oom(t) default: err = ErrUnknownTask } diff --git a/supervisor/worker.go b/supervisor/worker.go index b6e4dbd..fec1815 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -51,16 +51,9 @@ func (w *worker) Start() { w.s.SendTask(evt) continue } - /* - if w.s.notifier != nil { - n, err := t.Container.OOM() - if err != nil { - logrus.WithField("error", err).Error("containerd: notify OOM events") - } else { - w.s.notifier.Add(n, t.Container.ID()) - } - } - */ + if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited { + logrus.WithField("error", err).Error("containerd: notify OOM events") + } if err := w.s.monitorProcess(process); err != nil { logrus.WithField("error", err).Error("containerd: add process to monitor") }