From 97c3c3847a61faccd0b832e4c4a772257e793c14 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 22 Sep 2016 14:03:45 -0700 Subject: [PATCH] Refactor epoll monitor for generic use Signed-off-by: Michael Crosby --- monitor/monitor_linux.go | 102 ++++++++++++++++++++++ monitor/monitor_solaris.go | 29 +++++++ runtime/container.go | 7 +- runtime/process.go | 8 +- supervisor/add_process.go | 2 +- supervisor/monitor_linux.go | 132 ---------------------------- supervisor/monitor_solaris.go | 38 -------- supervisor/supervisor.go | 158 +++++++++------------------------- supervisor/utils.go | 101 ++++++++++++++++++++++ supervisor/worker.go | 8 +- 10 files changed, 288 insertions(+), 297 deletions(-) create mode 100644 monitor/monitor_linux.go create mode 100644 monitor/monitor_solaris.go delete mode 100644 supervisor/monitor_linux.go delete mode 100644 supervisor/monitor_solaris.go create mode 100644 supervisor/utils.go diff --git a/monitor/monitor_linux.go b/monitor/monitor_linux.go new file mode 100644 index 0000000..b0f451f --- /dev/null +++ b/monitor/monitor_linux.go @@ -0,0 +1,102 @@ +package monitor + +import ( + "sync" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/archutils" +) + +type Monitorable interface { + FD() int +} + +type Flusher interface { + Flush() error +} + +// New returns a new process monitor that emits events whenever the +// state of the fd refering to a process changes +func New() (*Monitor, error) { + fd, err := archutils.EpollCreate1(0) + if err != nil { + return nil, err + } + return &Monitor{ + epollFd: fd, + receivers: make(map[int]Monitorable), + events: make(chan Monitorable, 1024), + }, nil +} + +type Monitor struct { + m sync.Mutex + receivers map[int]Monitorable + events chan Monitorable + epollFd int +} + +// Events returns a chan that receives a Monitorable when it's FD changes state +func (m *Monitor) Events() chan Monitorable { + return m.events +} + +// Add adds a process to the list of the one being monitored +func (m *Monitor) Add(ma Monitorable) error { + m.m.Lock() + defer m.m.Unlock() + fd := ma.FD() + event := syscall.EpollEvent{ + Fd: int32(fd), + Events: syscall.EPOLLHUP, + } + if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { + return err + } + m.receivers[fd] = ma + return nil +} + +// Remove deletes the Monitorable type from the monitor so that +// no other events are generated +func (m *Monitor) Remove(ma Monitorable) error { + m.m.Lock() + defer m.m.Unlock() + fd := ma.FD() + delete(m.receivers, fd) + return syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ + Events: syscall.EPOLLHUP, + Fd: int32(fd), + }) +} + +// Close cleans up resources allocated to the Monitor +func (m *Monitor) Close() error { + return syscall.Close(m.epollFd) +} + +func (m *Monitor) Run() { + var events [128]syscall.EpollEvent + for { + n, err := archutils.EpollWait(m.epollFd, events[:], -1) + if err != nil { + if err == syscall.EINTR { + continue + } + logrus.WithField("error", err).Fatal("containerd: epoll wait") + } + for i := 0; i < n; i++ { + fd := int(events[i].Fd) + m.m.Lock() + r := m.receivers[fd] + m.m.Unlock() + if f, ok := r.(Flusher); ok { + if err := f.Flush(); err != nil { + logrus.WithField("error", err).Fatal("containerd: flush event FD") + } + } + m.events <- r + } + } +} diff --git a/monitor/monitor_solaris.go b/monitor/monitor_solaris.go new file mode 100644 index 0000000..0888006 --- /dev/null +++ b/monitor/monitor_solaris.go @@ -0,0 +1,29 @@ +package monitor + +import "errors" + +func New() (*Monitor, error) { + return nil, errors.New("Monitor NewMonitor() not implemented on Solaris") +} + +type Monitor struct { +} + +func (m *Monitor) Events() chan Monitorable { + return nil +} + +func (m *Monitor) Add(Monitorable) error { + return errors.New("Monitor Add() not implemented on Solaris") +} + +func (m *Monitor) Remove(Monitorable) error { + return errors.New("Monitor Remove() not implemented on Solaris") +} + +func (m *Monitor) Close() error { + return errors.New("Monitor Close() not implemented on Solaris") +} + +func (m *Monitor) Run() { +} diff --git a/runtime/container.go b/runtime/container.go index c4a2f7e..ec3189f 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -67,7 +67,7 @@ type OOM interface { io.Closer FD() int ContainerID() string - Flush() + Flush() error Removed() bool } @@ -692,9 +692,10 @@ func (o *oom) FD() int { return o.eventfd } -func (o *oom) Flush() { +func (o *oom) Flush() error { buf := make([]byte, 8) - syscall.Read(o.eventfd, buf) + _, err := syscall.Read(o.eventfd, buf) + return err } func (o *oom) Removed() bool { diff --git a/runtime/process.go b/runtime/process.go index a56faa5..e2e340d 100644 --- a/runtime/process.go +++ b/runtime/process.go @@ -32,8 +32,8 @@ type Process interface { Start() error CloseStdin() error Resize(int, int) error - // ExitFD returns the fd the provides an event when the process exits - ExitFD() int + // FD returns the fd the provides an event when the process exits + FD() int // ExitStatus returns the exit status of the process or an error if it // has not exited ExitStatus() (uint32, error) @@ -213,8 +213,8 @@ func (p *process) SystemPid() int { return p.pid } -// ExitFD returns the fd of the exit pipe -func (p *process) ExitFD() int { +// FD returns the fd of the exit pipe +func (p *process) FD() int { return int(p.exitPipe.Fd()) } diff --git a/supervisor/add_process.go b/supervisor/add_process.go index e3c23f4..29d4e30 100644 --- a/supervisor/add_process.go +++ b/supervisor/add_process.go @@ -29,7 +29,7 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error { if err != nil { return err } - if err := s.monitorProcess(process); err != nil { + if err := s.monitor.Add(process); err != nil { return err } t.StartResponse <- StartResponse{} diff --git a/supervisor/monitor_linux.go b/supervisor/monitor_linux.go deleted file mode 100644 index 8875a19..0000000 --- a/supervisor/monitor_linux.go +++ /dev/null @@ -1,132 +0,0 @@ -package supervisor - -import ( - "sync" - "syscall" - - "github.com/Sirupsen/logrus" - "github.com/docker/containerd/archutils" - "github.com/docker/containerd/runtime" -) - -// NewMonitor starts a new process monitor and returns it -func NewMonitor() (*Monitor, error) { - m := &Monitor{ - receivers: make(map[int]interface{}), - exits: make(chan runtime.Process, 1024), - ooms: make(chan string, 1024), - } - fd, err := archutils.EpollCreate1(0) - if err != nil { - return nil, err - } - m.epollFd = fd - go m.start() - return m, nil -} - -// Monitor represents a runtime.Process monitor -type Monitor struct { - m sync.Mutex - receivers map[int]interface{} - exits chan runtime.Process - ooms chan string - epollFd int -} - -// Exits returns the channel used to notify of a process exit -func (m *Monitor) Exits() chan runtime.Process { - return m.exits -} - -// OOMs returns the channel used to notify of a container exit due to OOM -func (m *Monitor) OOMs() chan string { - return m.ooms -} - -// Monitor adds a process to the list of the one being monitored -func (m *Monitor) Monitor(p runtime.Process) error { - m.m.Lock() - defer m.m.Unlock() - fd := p.ExitFD() - event := syscall.EpollEvent{ - Fd: int32(fd), - Events: syscall.EPOLLHUP, - } - if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { - return err - } - m.receivers[fd] = p - return nil -} - -// MonitorOOM adds a container to the list of the ones monitored for OOM -func (m *Monitor) MonitorOOM(c runtime.Container) error { - m.m.Lock() - defer m.m.Unlock() - o, err := c.OOM() - if err != nil { - return err - } - fd := o.FD() - event := syscall.EpollEvent{ - Fd: int32(fd), - Events: syscall.EPOLLHUP | syscall.EPOLLIN, - } - if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { - return err - } - m.receivers[fd] = o - return nil -} - -// Close cleans up resources allocated by NewMonitor() -func (m *Monitor) Close() error { - return syscall.Close(m.epollFd) -} - -func (m *Monitor) start() { - var events [128]syscall.EpollEvent - for { - n, err := archutils.EpollWait(m.epollFd, events[:], -1) - if err != nil { - if err == syscall.EINTR { - continue - } - logrus.WithField("error", err).Fatal("containerd: epoll wait") - } - // process events - for i := 0; i < n; i++ { - fd := int(events[i].Fd) - m.m.Lock() - r := m.receivers[fd] - switch t := r.(type) { - case runtime.Process: - if events[i].Events == syscall.EPOLLHUP { - delete(m.receivers, fd) - if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ - Events: syscall.EPOLLHUP, - Fd: int32(fd), - }); err != nil { - logrus.WithField("error", err).Error("containerd: epoll remove fd") - } - if err := t.Close(); err != nil { - logrus.WithField("error", err).Error("containerd: close process IO") - } - m.exits <- t - } - case runtime.OOM: - // always flush the event fd - t.Flush() - if t.Removed() { - delete(m.receivers, fd) - // epoll will remove the fd from its set after it has been closed - t.Close() - } else { - m.ooms <- t.ContainerID() - } - } - m.m.Unlock() - } - } -} diff --git a/supervisor/monitor_solaris.go b/supervisor/monitor_solaris.go deleted file mode 100644 index 6ad56ac..0000000 --- a/supervisor/monitor_solaris.go +++ /dev/null @@ -1,38 +0,0 @@ -package supervisor - -import ( - "errors" - - "github.com/docker/containerd/runtime" -) - -func NewMonitor() (*Monitor, error) { - return &Monitor{}, errors.New("Monitor NewMonitor() not implemented on Solaris") -} - -type Monitor struct { - ooms chan string -} - -func (m *Monitor) Exits() chan runtime.Process { - return nil -} - -func (m *Monitor) OOMs() chan string { - return m.ooms -} - -func (m *Monitor) Monitor(p runtime.Process) error { - return errors.New("Monitor Monitor() not implemented on Solaris") -} - -func (m *Monitor) MonitorOOM(c runtime.Container) error { - return errors.New("Monitor MonitorOOM() not implemented on Solaris") -} - -func (m *Monitor) Close() error { - return errors.New("Monitor Close() not implemented on Solaris") -} - -func (m *Monitor) start() { -} diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 9d89c3e..fd7ee9a 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -1,15 +1,13 @@ package supervisor import ( - "encoding/json" - "io" "io/ioutil" "os" - "path/filepath" "sync" "time" "github.com/Sirupsen/logrus" + "github.com/docker/containerd/monitor" "github.com/docker/containerd/runtime" ) @@ -36,10 +34,11 @@ func New(c Config) (*Supervisor, error) { if err != nil { return nil, err } - monitor, err := NewMonitor() + m, err := monitor.New() if err != nil { return nil, err } + go m.Run() s := &Supervisor{ config: c, containers: make(map[string]*containerInfo), @@ -47,108 +46,18 @@ func New(c Config) (*Supervisor, error) { machine: machine, subscribers: make(map[chan Event]struct{}), tasks: make(chan Task, defaultBufferSize), - monitor: monitor, + monitor: m, } if err := setupEventLog(s, c.EventRetainCount); err != nil { return nil, err } - go s.exitHandler() - go s.oomHandler() + go s.monitorEventHandler() if err := s.restore(); err != nil { return nil, err } return s, nil } -type containerInfo struct { - container runtime.Container -} - -func setupEventLog(s *Supervisor, retainCount int) error { - if err := readEventLog(s); err != nil { - return err - } - logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events") - events := s.Events(time.Time{}, false, "") - return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount) -} - -func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error { - f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) - if err != nil { - return err - } - go func() { - var ( - count = len(s.eventLog) - enc = json.NewEncoder(f) - ) - for e := range events { - // if we have a specified retain count make sure the truncate the event - // log if it grows past the specified number of events to keep. - if retainCount > 0 { - if count > retainCount { - logrus.Debug("truncating event log") - // close the log file - if f != nil { - f.Close() - } - slice := retainCount - 1 - l := len(s.eventLog) - if slice >= l { - slice = l - } - s.eventLock.Lock() - s.eventLog = s.eventLog[len(s.eventLog)-slice:] - s.eventLock.Unlock() - if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil { - logrus.WithField("error", err).Error("containerd: open event to journal") - continue - } - enc = json.NewEncoder(f) - count = 0 - for _, le := range s.eventLog { - if err := enc.Encode(le); err != nil { - logrus.WithField("error", err).Error("containerd: write event to journal") - } - } - } - } - s.eventLock.Lock() - s.eventLog = append(s.eventLog, e) - s.eventLock.Unlock() - count++ - if err := enc.Encode(e); err != nil { - logrus.WithField("error", err).Error("containerd: write event to journal") - } - } - }() - return nil -} - -func readEventLog(s *Supervisor) error { - f, err := os.Open(filepath.Join(s.config.StateDir, "events.log")) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - defer f.Close() - dec := json.NewDecoder(f) - for { - var e Event - if err := dec.Decode(&e); err != nil { - if err == io.EOF { - break - } - return err - } - s.eventLog = append(s.eventLog, e) - } - return nil -} - // Supervisor represents a container supervisor type Supervisor struct { config Config @@ -160,7 +69,7 @@ type Supervisor struct { subscribers map[chan Event]struct{} machine Machine tasks chan Task - monitor *Monitor + monitor *monitor.Monitor eventLog []Event eventLock sync.Mutex } @@ -276,28 +185,39 @@ func (s *Supervisor) SendTask(evt Task) { s.tasks <- evt } -func (s *Supervisor) exitHandler() { - for p := range s.monitor.Exits() { - e := &ExitTask{ - Process: p, +func (s *Supervisor) monitorEventHandler() { + for e := range s.monitor.Events() { + switch t := e.(type) { + case runtime.Process: + if err := s.monitor.Remove(e); err != nil { + logrus.WithField("error", err).Error("containerd: remove process event FD from monitor") + } + if err := t.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close process event FD") + } + ev := &ExitTask{ + Process: t, + } + s.SendTask(ev) + case runtime.OOM: + if t.Removed() { + if err := s.monitor.Remove(e); err != nil { + logrus.WithField("error", err).Error("containerd: remove oom event FD from monitor") + } + if err := t.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close oom event FD") + } + // don't send an event on the close of this FD + continue + } + ev := &OOMTask{ + ID: t.ContainerID(), + } + s.SendTask(ev) } - s.SendTask(e) } } -func (s *Supervisor) oomHandler() { - for id := range s.monitor.OOMs() { - e := &OOMTask{ - ID: id, - } - s.SendTask(e) - } -} - -func (s *Supervisor) monitorProcess(p runtime.Process) error { - return s.monitor.Monitor(p) -} - func (s *Supervisor) restore() error { dirs, err := ioutil.ReadDir(s.config.StateDir) if err != nil { @@ -320,14 +240,18 @@ func (s *Supervisor) restore() error { s.containers[id] = &containerInfo{ container: container, } - if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited { + oom, err := container.OOM() + if err != nil { + logrus.WithField("error", err).Error("containerd: get oom FD") + } + if err := s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited { logrus.WithField("error", err).Error("containerd: notify OOM events") } logrus.WithField("id", id).Debug("containerd: container restored") var exitedProcesses []runtime.Process for _, p := range processes { if p.State() == runtime.Running { - if err := s.monitorProcess(p); err != nil { + if err := s.monitor.Add(p); err != nil { return err } } else { diff --git a/supervisor/utils.go b/supervisor/utils.go new file mode 100644 index 0000000..138f873 --- /dev/null +++ b/supervisor/utils.go @@ -0,0 +1,101 @@ +package supervisor + +import ( + "encoding/json" + "io" + "os" + "path/filepath" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/runtime" +) + +type containerInfo struct { + container runtime.Container +} + +func setupEventLog(s *Supervisor, retainCount int) error { + if err := readEventLog(s); err != nil { + return err + } + logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events") + events := s.Events(time.Time{}, false, "") + return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount) +} + +func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755) + if err != nil { + return err + } + go func() { + var ( + count = len(s.eventLog) + enc = json.NewEncoder(f) + ) + for e := range events { + // if we have a specified retain count make sure the truncate the event + // log if it grows past the specified number of events to keep. + if retainCount > 0 { + if count > retainCount { + logrus.Debug("truncating event log") + // close the log file + if f != nil { + f.Close() + } + slice := retainCount - 1 + l := len(s.eventLog) + if slice >= l { + slice = l + } + s.eventLock.Lock() + s.eventLog = s.eventLog[len(s.eventLog)-slice:] + s.eventLock.Unlock() + if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil { + logrus.WithField("error", err).Error("containerd: open event to journal") + continue + } + enc = json.NewEncoder(f) + count = 0 + for _, le := range s.eventLog { + if err := enc.Encode(le); err != nil { + logrus.WithField("error", err).Error("containerd: write event to journal") + } + } + } + } + s.eventLock.Lock() + s.eventLog = append(s.eventLog, e) + s.eventLock.Unlock() + count++ + if err := enc.Encode(e); err != nil { + logrus.WithField("error", err).Error("containerd: write event to journal") + } + } + }() + return nil +} + +func readEventLog(s *Supervisor) error { + f, err := os.Open(filepath.Join(s.config.StateDir, "events.log")) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + dec := json.NewDecoder(f) + for { + var e Event + if err := dec.Decode(&e); err != nil { + if err == io.EOF { + break + } + return err + } + s.eventLog = append(s.eventLog, e) + } + return nil +} diff --git a/supervisor/worker.go b/supervisor/worker.go index cfb6700..5e3b70a 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -55,12 +55,16 @@ func (w *worker) Start() { w.s.SendTask(evt) continue } - if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited { + oom, err := t.Container.OOM() + if err != nil { + logrus.WithField("error", err).Error("containerd: get oom FD") + } + if err := w.s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited { if process.State() != runtime.Stopped { logrus.WithField("error", err).Error("containerd: notify OOM events") } } - if err := w.s.monitorProcess(process); err != nil { + if err := w.s.monitor.Add(process); err != nil { logrus.WithField("error", err).Error("containerd: add process to monitor") t.Err <- err evt := &DeleteTask{