Refactor epoll monitor for generic use

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-09-22 14:03:45 -07:00
parent b6bf350a07
commit 97c3c3847a
10 changed files with 288 additions and 297 deletions

102
monitor/monitor_linux.go Normal file
View file

@ -0,0 +1,102 @@
package monitor
import (
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/archutils"
)
type Monitorable interface {
FD() int
}
type Flusher interface {
Flush() error
}
// New returns a new process monitor that emits events whenever the
// state of the fd refering to a process changes
func New() (*Monitor, error) {
fd, err := archutils.EpollCreate1(0)
if err != nil {
return nil, err
}
return &Monitor{
epollFd: fd,
receivers: make(map[int]Monitorable),
events: make(chan Monitorable, 1024),
}, nil
}
type Monitor struct {
m sync.Mutex
receivers map[int]Monitorable
events chan Monitorable
epollFd int
}
// Events returns a chan that receives a Monitorable when it's FD changes state
func (m *Monitor) Events() chan Monitorable {
return m.events
}
// Add adds a process to the list of the one being monitored
func (m *Monitor) Add(ma Monitorable) error {
m.m.Lock()
defer m.m.Unlock()
fd := ma.FD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
m.receivers[fd] = ma
return nil
}
// Remove deletes the Monitorable type from the monitor so that
// no other events are generated
func (m *Monitor) Remove(ma Monitorable) error {
m.m.Lock()
defer m.m.Unlock()
fd := ma.FD()
delete(m.receivers, fd)
return syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
})
}
// Close cleans up resources allocated to the Monitor
func (m *Monitor) Close() error {
return syscall.Close(m.epollFd)
}
func (m *Monitor) Run() {
var events [128]syscall.EpollEvent
for {
n, err := archutils.EpollWait(m.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
logrus.WithField("error", err).Fatal("containerd: epoll wait")
}
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
m.m.Lock()
r := m.receivers[fd]
m.m.Unlock()
if f, ok := r.(Flusher); ok {
if err := f.Flush(); err != nil {
logrus.WithField("error", err).Fatal("containerd: flush event FD")
}
}
m.events <- r
}
}
}

View file

@ -0,0 +1,29 @@
package monitor
import "errors"
func New() (*Monitor, error) {
return nil, errors.New("Monitor NewMonitor() not implemented on Solaris")
}
type Monitor struct {
}
func (m *Monitor) Events() chan Monitorable {
return nil
}
func (m *Monitor) Add(Monitorable) error {
return errors.New("Monitor Add() not implemented on Solaris")
}
func (m *Monitor) Remove(Monitorable) error {
return errors.New("Monitor Remove() not implemented on Solaris")
}
func (m *Monitor) Close() error {
return errors.New("Monitor Close() not implemented on Solaris")
}
func (m *Monitor) Run() {
}

View file

@ -67,7 +67,7 @@ type OOM interface {
io.Closer io.Closer
FD() int FD() int
ContainerID() string ContainerID() string
Flush() Flush() error
Removed() bool Removed() bool
} }
@ -692,9 +692,10 @@ func (o *oom) FD() int {
return o.eventfd return o.eventfd
} }
func (o *oom) Flush() { func (o *oom) Flush() error {
buf := make([]byte, 8) buf := make([]byte, 8)
syscall.Read(o.eventfd, buf) _, err := syscall.Read(o.eventfd, buf)
return err
} }
func (o *oom) Removed() bool { func (o *oom) Removed() bool {

View file

@ -32,8 +32,8 @@ type Process interface {
Start() error Start() error
CloseStdin() error CloseStdin() error
Resize(int, int) error Resize(int, int) error
// ExitFD returns the fd the provides an event when the process exits // FD returns the fd the provides an event when the process exits
ExitFD() int FD() int
// ExitStatus returns the exit status of the process or an error if it // ExitStatus returns the exit status of the process or an error if it
// has not exited // has not exited
ExitStatus() (uint32, error) ExitStatus() (uint32, error)
@ -213,8 +213,8 @@ func (p *process) SystemPid() int {
return p.pid return p.pid
} }
// ExitFD returns the fd of the exit pipe // FD returns the fd of the exit pipe
func (p *process) ExitFD() int { func (p *process) FD() int {
return int(p.exitPipe.Fd()) return int(p.exitPipe.Fd())
} }

View file

@ -29,7 +29,7 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error {
if err != nil { if err != nil {
return err return err
} }
if err := s.monitorProcess(process); err != nil { if err := s.monitor.Add(process); err != nil {
return err return err
} }
t.StartResponse <- StartResponse{} t.StartResponse <- StartResponse{}

View file

@ -1,132 +0,0 @@
package supervisor
import (
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/archutils"
"github.com/docker/containerd/runtime"
)
// NewMonitor starts a new process monitor and returns it
func NewMonitor() (*Monitor, error) {
m := &Monitor{
receivers: make(map[int]interface{}),
exits: make(chan runtime.Process, 1024),
ooms: make(chan string, 1024),
}
fd, err := archutils.EpollCreate1(0)
if err != nil {
return nil, err
}
m.epollFd = fd
go m.start()
return m, nil
}
// Monitor represents a runtime.Process monitor
type Monitor struct {
m sync.Mutex
receivers map[int]interface{}
exits chan runtime.Process
ooms chan string
epollFd int
}
// Exits returns the channel used to notify of a process exit
func (m *Monitor) Exits() chan runtime.Process {
return m.exits
}
// OOMs returns the channel used to notify of a container exit due to OOM
func (m *Monitor) OOMs() chan string {
return m.ooms
}
// Monitor adds a process to the list of the one being monitored
func (m *Monitor) Monitor(p runtime.Process) error {
m.m.Lock()
defer m.m.Unlock()
fd := p.ExitFD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
m.receivers[fd] = p
return nil
}
// MonitorOOM adds a container to the list of the ones monitored for OOM
func (m *Monitor) MonitorOOM(c runtime.Container) error {
m.m.Lock()
defer m.m.Unlock()
o, err := c.OOM()
if err != nil {
return err
}
fd := o.FD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP | syscall.EPOLLIN,
}
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
m.receivers[fd] = o
return nil
}
// Close cleans up resources allocated by NewMonitor()
func (m *Monitor) Close() error {
return syscall.Close(m.epollFd)
}
func (m *Monitor) start() {
var events [128]syscall.EpollEvent
for {
n, err := archutils.EpollWait(m.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
logrus.WithField("error", err).Fatal("containerd: epoll wait")
}
// process events
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
m.m.Lock()
r := m.receivers[fd]
switch t := r.(type) {
case runtime.Process:
if events[i].Events == syscall.EPOLLHUP {
delete(m.receivers, fd)
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
logrus.WithField("error", err).Error("containerd: epoll remove fd")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
m.exits <- t
}
case runtime.OOM:
// always flush the event fd
t.Flush()
if t.Removed() {
delete(m.receivers, fd)
// epoll will remove the fd from its set after it has been closed
t.Close()
} else {
m.ooms <- t.ContainerID()
}
}
m.m.Unlock()
}
}
}

View file

@ -1,38 +0,0 @@
package supervisor
import (
"errors"
"github.com/docker/containerd/runtime"
)
func NewMonitor() (*Monitor, error) {
return &Monitor{}, errors.New("Monitor NewMonitor() not implemented on Solaris")
}
type Monitor struct {
ooms chan string
}
func (m *Monitor) Exits() chan runtime.Process {
return nil
}
func (m *Monitor) OOMs() chan string {
return m.ooms
}
func (m *Monitor) Monitor(p runtime.Process) error {
return errors.New("Monitor Monitor() not implemented on Solaris")
}
func (m *Monitor) MonitorOOM(c runtime.Container) error {
return errors.New("Monitor MonitorOOM() not implemented on Solaris")
}
func (m *Monitor) Close() error {
return errors.New("Monitor Close() not implemented on Solaris")
}
func (m *Monitor) start() {
}

View file

@ -1,15 +1,13 @@
package supervisor package supervisor
import ( import (
"encoding/json"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"sync" "sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/monitor"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
) )
@ -36,10 +34,11 @@ func New(c Config) (*Supervisor, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
monitor, err := NewMonitor() m, err := monitor.New()
if err != nil { if err != nil {
return nil, err return nil, err
} }
go m.Run()
s := &Supervisor{ s := &Supervisor{
config: c, config: c,
containers: make(map[string]*containerInfo), containers: make(map[string]*containerInfo),
@ -47,108 +46,18 @@ func New(c Config) (*Supervisor, error) {
machine: machine, machine: machine,
subscribers: make(map[chan Event]struct{}), subscribers: make(map[chan Event]struct{}),
tasks: make(chan Task, defaultBufferSize), tasks: make(chan Task, defaultBufferSize),
monitor: monitor, monitor: m,
} }
if err := setupEventLog(s, c.EventRetainCount); err != nil { if err := setupEventLog(s, c.EventRetainCount); err != nil {
return nil, err return nil, err
} }
go s.exitHandler() go s.monitorEventHandler()
go s.oomHandler()
if err := s.restore(); err != nil { if err := s.restore(); err != nil {
return nil, err return nil, err
} }
return s, nil return s, nil
} }
type containerInfo struct {
container runtime.Container
}
func setupEventLog(s *Supervisor, retainCount int) error {
if err := readEventLog(s); err != nil {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{}, false, "")
return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount)
}
func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
if err != nil {
return err
}
go func() {
var (
count = len(s.eventLog)
enc = json.NewEncoder(f)
)
for e := range events {
// if we have a specified retain count make sure the truncate the event
// log if it grows past the specified number of events to keep.
if retainCount > 0 {
if count > retainCount {
logrus.Debug("truncating event log")
// close the log file
if f != nil {
f.Close()
}
slice := retainCount - 1
l := len(s.eventLog)
if slice >= l {
slice = l
}
s.eventLock.Lock()
s.eventLog = s.eventLog[len(s.eventLog)-slice:]
s.eventLock.Unlock()
if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
logrus.WithField("error", err).Error("containerd: open event to journal")
continue
}
enc = json.NewEncoder(f)
count = 0
for _, le := range s.eventLog {
if err := enc.Encode(le); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}
}
s.eventLock.Lock()
s.eventLog = append(s.eventLog, e)
s.eventLock.Unlock()
count++
if err := enc.Encode(e); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}()
return nil
}
func readEventLog(s *Supervisor) error {
f, err := os.Open(filepath.Join(s.config.StateDir, "events.log"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var e Event
if err := dec.Decode(&e); err != nil {
if err == io.EOF {
break
}
return err
}
s.eventLog = append(s.eventLog, e)
}
return nil
}
// Supervisor represents a container supervisor // Supervisor represents a container supervisor
type Supervisor struct { type Supervisor struct {
config Config config Config
@ -160,7 +69,7 @@ type Supervisor struct {
subscribers map[chan Event]struct{} subscribers map[chan Event]struct{}
machine Machine machine Machine
tasks chan Task tasks chan Task
monitor *Monitor monitor *monitor.Monitor
eventLog []Event eventLog []Event
eventLock sync.Mutex eventLock sync.Mutex
} }
@ -276,26 +185,37 @@ func (s *Supervisor) SendTask(evt Task) {
s.tasks <- evt s.tasks <- evt
} }
func (s *Supervisor) exitHandler() { func (s *Supervisor) monitorEventHandler() {
for p := range s.monitor.Exits() { for e := range s.monitor.Events() {
e := &ExitTask{ switch t := e.(type) {
Process: p, case runtime.Process:
if err := s.monitor.Remove(e); err != nil {
logrus.WithField("error", err).Error("containerd: remove process event FD from monitor")
} }
s.SendTask(e) if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process event FD")
} }
} ev := &ExitTask{
Process: t,
func (s *Supervisor) oomHandler() { }
for id := range s.monitor.OOMs() { s.SendTask(ev)
e := &OOMTask{ case runtime.OOM:
ID: id, if t.Removed() {
if err := s.monitor.Remove(e); err != nil {
logrus.WithField("error", err).Error("containerd: remove oom event FD from monitor")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close oom event FD")
}
// don't send an event on the close of this FD
continue
}
ev := &OOMTask{
ID: t.ContainerID(),
}
s.SendTask(ev)
} }
s.SendTask(e)
} }
}
func (s *Supervisor) monitorProcess(p runtime.Process) error {
return s.monitor.Monitor(p)
} }
func (s *Supervisor) restore() error { func (s *Supervisor) restore() error {
@ -320,14 +240,18 @@ func (s *Supervisor) restore() error {
s.containers[id] = &containerInfo{ s.containers[id] = &containerInfo{
container: container, container: container,
} }
if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited { oom, err := container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: get oom FD")
}
if err := s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited {
logrus.WithField("error", err).Error("containerd: notify OOM events") logrus.WithField("error", err).Error("containerd: notify OOM events")
} }
logrus.WithField("id", id).Debug("containerd: container restored") logrus.WithField("id", id).Debug("containerd: container restored")
var exitedProcesses []runtime.Process var exitedProcesses []runtime.Process
for _, p := range processes { for _, p := range processes {
if p.State() == runtime.Running { if p.State() == runtime.Running {
if err := s.monitorProcess(p); err != nil { if err := s.monitor.Add(p); err != nil {
return err return err
} }
} else { } else {

101
supervisor/utils.go Normal file
View file

@ -0,0 +1,101 @@
package supervisor
import (
"encoding/json"
"io"
"os"
"path/filepath"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
type containerInfo struct {
container runtime.Container
}
func setupEventLog(s *Supervisor, retainCount int) error {
if err := readEventLog(s); err != nil {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{}, false, "")
return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount)
}
func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
if err != nil {
return err
}
go func() {
var (
count = len(s.eventLog)
enc = json.NewEncoder(f)
)
for e := range events {
// if we have a specified retain count make sure the truncate the event
// log if it grows past the specified number of events to keep.
if retainCount > 0 {
if count > retainCount {
logrus.Debug("truncating event log")
// close the log file
if f != nil {
f.Close()
}
slice := retainCount - 1
l := len(s.eventLog)
if slice >= l {
slice = l
}
s.eventLock.Lock()
s.eventLog = s.eventLog[len(s.eventLog)-slice:]
s.eventLock.Unlock()
if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
logrus.WithField("error", err).Error("containerd: open event to journal")
continue
}
enc = json.NewEncoder(f)
count = 0
for _, le := range s.eventLog {
if err := enc.Encode(le); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}
}
s.eventLock.Lock()
s.eventLog = append(s.eventLog, e)
s.eventLock.Unlock()
count++
if err := enc.Encode(e); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}()
return nil
}
func readEventLog(s *Supervisor) error {
f, err := os.Open(filepath.Join(s.config.StateDir, "events.log"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var e Event
if err := dec.Decode(&e); err != nil {
if err == io.EOF {
break
}
return err
}
s.eventLog = append(s.eventLog, e)
}
return nil
}

View file

@ -55,12 +55,16 @@ func (w *worker) Start() {
w.s.SendTask(evt) w.s.SendTask(evt)
continue continue
} }
if err := w.s.monitor.MonitorOOM(t.Container); err != nil && err != runtime.ErrContainerExited { oom, err := t.Container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: get oom FD")
}
if err := w.s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited {
if process.State() != runtime.Stopped { if process.State() != runtime.Stopped {
logrus.WithField("error", err).Error("containerd: notify OOM events") logrus.WithField("error", err).Error("containerd: notify OOM events")
} }
} }
if err := w.s.monitorProcess(process); err != nil { if err := w.s.monitor.Add(process); err != nil {
logrus.WithField("error", err).Error("containerd: add process to monitor") logrus.WithField("error", err).Error("containerd: add process to monitor")
t.Err <- err t.Err <- err
evt := &DeleteTask{ evt := &DeleteTask{