1b8cc65462
Truncate the event log on disk and in memory so that it does not grow forever. This is mainly used for higher levels to receive past events if they miss any. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
347 lines
8.8 KiB
Go
347 lines
8.8 KiB
Go
package supervisor
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/containerd/runtime"
|
|
)
|
|
|
|
const (
|
|
defaultBufferSize = 2048 // size of queue in eventloop
|
|
)
|
|
|
|
// New returns an initialized Process supervisor.
|
|
func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
|
|
startTasks := make(chan *startTask, 10)
|
|
if err := os.MkdirAll(stateDir, 0755); err != nil {
|
|
return nil, err
|
|
}
|
|
machine, err := CollectMachineInformation()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
monitor, err := NewMonitor()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s := &Supervisor{
|
|
stateDir: stateDir,
|
|
containers: make(map[string]*containerInfo),
|
|
startTasks: startTasks,
|
|
machine: machine,
|
|
subscribers: make(map[chan Event]struct{}),
|
|
tasks: make(chan Task, defaultBufferSize),
|
|
monitor: monitor,
|
|
runtime: runtimeName,
|
|
runtimeArgs: runtimeArgs,
|
|
shim: shimName,
|
|
timeout: timeout,
|
|
}
|
|
if err := setupEventLog(s, retainCount); err != nil {
|
|
return nil, err
|
|
}
|
|
go s.exitHandler()
|
|
go s.oomHandler()
|
|
if err := s.restore(); err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
type containerInfo struct {
|
|
container runtime.Container
|
|
}
|
|
|
|
func setupEventLog(s *Supervisor, retainCount int) error {
|
|
if err := readEventLog(s); err != nil {
|
|
return err
|
|
}
|
|
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
|
|
events := s.Events(time.Time{})
|
|
return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
|
|
}
|
|
|
|
func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
|
|
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
var (
|
|
count = len(s.eventLog)
|
|
enc = json.NewEncoder(f)
|
|
)
|
|
for e := range events {
|
|
// if we have a specified retain count make sure the truncate the event
|
|
// log if it grows past the specified number of events to keep.
|
|
if retainCount > 0 {
|
|
if count > retainCount {
|
|
logrus.Debug("truncating event log")
|
|
// close the log file
|
|
if f != nil {
|
|
f.Close()
|
|
}
|
|
slice := retainCount - 1
|
|
l := len(s.eventLog)
|
|
if slice >= l {
|
|
slice = l
|
|
}
|
|
s.eventLock.Lock()
|
|
s.eventLog = s.eventLog[len(s.eventLog)-slice:]
|
|
s.eventLock.Unlock()
|
|
if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
|
|
logrus.WithField("error", err).Error("containerd: open event to journal")
|
|
continue
|
|
}
|
|
enc = json.NewEncoder(f)
|
|
count = 0
|
|
for _, le := range s.eventLog {
|
|
if err := enc.Encode(le); err != nil {
|
|
logrus.WithField("error", err).Error("containerd: write event to journal")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
s.eventLock.Lock()
|
|
s.eventLog = append(s.eventLog, e)
|
|
s.eventLock.Unlock()
|
|
count++
|
|
if err := enc.Encode(e); err != nil {
|
|
logrus.WithField("error", err).Error("containerd: write event to journal")
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func readEventLog(s *Supervisor) error {
|
|
f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
dec := json.NewDecoder(f)
|
|
for {
|
|
var e Event
|
|
if err := dec.Decode(&e); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
s.eventLog = append(s.eventLog, e)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type Supervisor struct {
|
|
// stateDir is the directory on the system to store container runtime state information.
|
|
stateDir string
|
|
// name of the OCI compatible runtime used to execute containers
|
|
runtime string
|
|
runtimeArgs []string
|
|
shim string
|
|
containers map[string]*containerInfo
|
|
startTasks chan *startTask
|
|
// we need a lock around the subscribers map only because additions and deletions from
|
|
// the map are via the API so we cannot really control the concurrency
|
|
subscriberLock sync.RWMutex
|
|
subscribers map[chan Event]struct{}
|
|
machine Machine
|
|
tasks chan Task
|
|
monitor *Monitor
|
|
eventLog []Event
|
|
eventLock sync.Mutex
|
|
timeout time.Duration
|
|
}
|
|
|
|
// Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
|
|
// terminate. After it has handled all the SIGCHILD events it will close the signals chan
|
|
// and exit. Stop is a non-blocking call and will return after the containers have been signaled
|
|
func (s *Supervisor) Stop() {
|
|
// Close the startTasks channel so that no new containers get started
|
|
close(s.startTasks)
|
|
}
|
|
|
|
// Close closes any open files in the supervisor but expects that Stop has been
|
|
// callsed so that no more containers are started.
|
|
func (s *Supervisor) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type Event struct {
|
|
ID string `json:"id"`
|
|
Type string `json:"type"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
PID string `json:"pid,omitempty"`
|
|
Status int `json:"status,omitempty"`
|
|
}
|
|
|
|
// Events returns an event channel that external consumers can use to receive updates
|
|
// on container events
|
|
func (s *Supervisor) Events(from time.Time) chan Event {
|
|
s.subscriberLock.Lock()
|
|
defer s.subscriberLock.Unlock()
|
|
c := make(chan Event, defaultBufferSize)
|
|
EventSubscriberCounter.Inc(1)
|
|
s.subscribers[c] = struct{}{}
|
|
if !from.IsZero() {
|
|
// replay old event
|
|
s.eventLock.Lock()
|
|
past := s.eventLog[:]
|
|
s.eventLock.Unlock()
|
|
for _, e := range past {
|
|
if e.Timestamp.After(from) {
|
|
c <- e
|
|
}
|
|
}
|
|
// Notify the client that from now on it's live events
|
|
c <- Event{
|
|
Type: "live",
|
|
Timestamp: time.Now(),
|
|
}
|
|
}
|
|
return c
|
|
}
|
|
|
|
// Unsubscribe removes the provided channel from receiving any more events
|
|
func (s *Supervisor) Unsubscribe(sub chan Event) {
|
|
s.subscriberLock.Lock()
|
|
defer s.subscriberLock.Unlock()
|
|
delete(s.subscribers, sub)
|
|
close(sub)
|
|
EventSubscriberCounter.Dec(1)
|
|
}
|
|
|
|
// notifySubscribers will send the provided event to the external subscribers
|
|
// of the events channel
|
|
func (s *Supervisor) notifySubscribers(e Event) {
|
|
s.subscriberLock.RLock()
|
|
defer s.subscriberLock.RUnlock()
|
|
for sub := range s.subscribers {
|
|
// do a non-blocking send for the channel
|
|
select {
|
|
case sub <- e:
|
|
default:
|
|
logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
|
|
// executing new containers.
|
|
//
|
|
// This event loop is the only thing that is allowed to modify state of containers and processes
|
|
// therefore it is save to do operations in the handlers that modify state of the system or
|
|
// state of the Supervisor
|
|
func (s *Supervisor) Start() error {
|
|
logrus.WithFields(logrus.Fields{
|
|
"stateDir": s.stateDir,
|
|
"runtime": s.runtime,
|
|
"runtimeArgs": s.runtimeArgs,
|
|
"memory": s.machine.Memory,
|
|
"cpus": s.machine.Cpus,
|
|
}).Debug("containerd: supervisor running")
|
|
go func() {
|
|
for i := range s.tasks {
|
|
s.handleTask(i)
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// Machine returns the machine information for which the
|
|
// supervisor is executing on.
|
|
func (s *Supervisor) Machine() Machine {
|
|
return s.machine
|
|
}
|
|
|
|
// SendTask sends the provided event the the supervisors main event loop
|
|
func (s *Supervisor) SendTask(evt Task) {
|
|
TasksCounter.Inc(1)
|
|
s.tasks <- evt
|
|
}
|
|
|
|
func (s *Supervisor) exitHandler() {
|
|
for p := range s.monitor.Exits() {
|
|
e := &ExitTask{
|
|
Process: p,
|
|
}
|
|
s.SendTask(e)
|
|
}
|
|
}
|
|
|
|
func (s *Supervisor) oomHandler() {
|
|
for id := range s.monitor.OOMs() {
|
|
e := &OOMTask{
|
|
ID: id,
|
|
}
|
|
s.SendTask(e)
|
|
}
|
|
}
|
|
|
|
func (s *Supervisor) monitorProcess(p runtime.Process) error {
|
|
return s.monitor.Monitor(p)
|
|
}
|
|
|
|
func (s *Supervisor) restore() error {
|
|
dirs, err := ioutil.ReadDir(s.stateDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, d := range dirs {
|
|
if !d.IsDir() {
|
|
continue
|
|
}
|
|
id := d.Name()
|
|
container, err := runtime.Load(s.stateDir, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
processes, err := container.Processes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ContainersCounter.Inc(1)
|
|
s.containers[id] = &containerInfo{
|
|
container: container,
|
|
}
|
|
if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
|
|
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
|
}
|
|
logrus.WithField("id", id).Debug("containerd: container restored")
|
|
var exitedProcesses []runtime.Process
|
|
for _, p := range processes {
|
|
if p.State() == runtime.Running {
|
|
if err := s.monitorProcess(p); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
exitedProcesses = append(exitedProcesses, p)
|
|
}
|
|
}
|
|
if len(exitedProcesses) > 0 {
|
|
// sort processes so that init is fired last because that is how the kernel sends the
|
|
// exit events
|
|
sortProcesses(exitedProcesses)
|
|
for _, p := range exitedProcesses {
|
|
e := &ExitTask{
|
|
Process: p,
|
|
}
|
|
s.SendTask(e)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|