Remove containerd files

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-11-07 13:10:09 -08:00
parent 992fdbfd76
commit e115b52ce2
74 changed files with 0 additions and 9757 deletions

View file

@ -1,43 +0,0 @@
package supervisor
import (
"time"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/specs"
)
// AddProcessTask holds everything necessary to add a process to a
// container
type AddProcessTask struct {
baseTask
ID string
PID string
Stdout string
Stderr string
Stdin string
ProcessSpec *specs.ProcessSpec
StartResponse chan StartResponse
}
func (s *Supervisor) addProcess(t *AddProcessTask) error {
ci, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
process, err := ci.container.Exec(t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
return err
}
if err := s.monitor.Add(process); err != nil {
return err
}
t.StartResponse <- StartResponse{}
s.notifySubscribers(Event{
Timestamp: time.Now(),
Type: StateStartProcess,
PID: t.PID,
ID: t.ID,
})
return nil
}

View file

@ -1,37 +0,0 @@
// +build !windows
package supervisor
import "github.com/docker/containerd/runtime"
// CreateCheckpointTask holds needed parameters to create a new checkpoint
type CreateCheckpointTask struct {
baseTask
ID string
CheckpointDir string
Checkpoint *runtime.Checkpoint
}
func (s *Supervisor) createCheckpoint(t *CreateCheckpointTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
return i.container.Checkpoint(*t.Checkpoint, t.CheckpointDir)
}
// DeleteCheckpointTask holds needed parameters to delete a checkpoint
type DeleteCheckpointTask struct {
baseTask
ID string
CheckpointDir string
Checkpoint *runtime.Checkpoint
}
func (s *Supervisor) deleteCheckpoint(t *DeleteCheckpointTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
return i.container.DeleteCheckpoint(t.Checkpoint.Name, t.CheckpointDir)
}

View file

@ -1,63 +0,0 @@
package supervisor
import (
"path/filepath"
"github.com/docker/containerd/runtime"
)
// StartTask holds needed parameters to create a new container
type StartTask struct {
baseTask
ID string
BundlePath string
Stdout string
Stderr string
Stdin string
StartResponse chan StartResponse
Labels []string
NoPivotRoot bool
Checkpoint *runtime.Checkpoint
CheckpointDir string
Runtime string
RuntimeArgs []string
}
func (s *Supervisor) start(t *StartTask) error {
rt := s.config.Runtime
rtArgs := s.config.RuntimeArgs
if t.Runtime != "" {
rt = t.Runtime
rtArgs = t.RuntimeArgs
}
container, err := runtime.New(runtime.ContainerOpts{
Root: s.config.StateDir,
ID: t.ID,
Bundle: t.BundlePath,
Runtime: rt,
RuntimeArgs: rtArgs,
Shim: s.config.ShimName,
Labels: t.Labels,
NoPivotRoot: t.NoPivotRoot,
Timeout: s.config.Timeout,
})
if err != nil {
return err
}
s.containers[t.ID] = &containerInfo{
container: container,
}
task := &startTask{
Err: t.ErrorCh(),
Container: container,
StartResponse: t.StartResponse,
Stdin: t.Stdin,
Stdout: t.Stdout,
Stderr: t.Stderr,
}
if t.Checkpoint != nil {
task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name)
}
s.startTasks <- task
return errDeferredResponse
}

View file

@ -1,8 +0,0 @@
package supervisor
type platformStartTask struct {
}
// Checkpoint not supported on Solaris
func (task *startTask) setTaskCheckpoint(t *StartTask) {
}

View file

@ -1,44 +0,0 @@
package supervisor
import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
// DeleteTask holds needed parameters to remove a container
type DeleteTask struct {
baseTask
ID string
Status uint32
PID string
NoEvent bool
Process runtime.Process
}
func (s *Supervisor) delete(t *DeleteTask) error {
if i, ok := s.containers[t.ID]; ok {
if err := s.deleteContainer(i.container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container")
}
if t.Process != nil {
t.Process.Wait()
}
if !t.NoEvent {
s.notifySubscribers(Event{
Type: StateExit,
Timestamp: time.Now(),
ID: t.ID,
Status: t.Status,
PID: t.PID,
})
}
}
return nil
}
func (s *Supervisor) deleteContainer(container runtime.Container) error {
delete(s.containers, container.ID())
return container.Delete()
}

View file

@ -1,28 +0,0 @@
package supervisor
import "errors"
var (
// ErrContainerNotFound is returned when the container ID passed
// for a given operation is invalid
ErrContainerNotFound = errors.New("containerd: container not found")
// ErrProcessNotFound is returned when the process ID passed for
// a given operation is invalid
ErrProcessNotFound = errors.New("containerd: process not found for container")
// ErrUnknownContainerStatus is returned when the container status
// cannot be determined
ErrUnknownContainerStatus = errors.New("containerd: unknown container status ")
// ErrUnknownTask is returned when an unknown Task type is
// scheduled (should never happen).
ErrUnknownTask = errors.New("containerd: unknown task type")
// Internal errors
errShutdown = errors.New("containerd: supervisor is shutdown")
errRootNotAbs = errors.New("containerd: rootfs path is not an absolute path")
errNoContainerForPid = errors.New("containerd: pid not registered for any container")
// internal error where the handler will defer to another for the final response
//
// TODO: we could probably do a typed error with another error channel for this to make it
// less like magic
errDeferredResponse = errors.New("containerd: deferred response")
)

View file

@ -1,87 +0,0 @@
package supervisor
import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
// ExitTask holds needed parameters to execute the exit task
type ExitTask struct {
baseTask
Process runtime.Process
}
func (s *Supervisor) exit(t *ExitTask) error {
proc := t.Process
status, err := proc.ExitStatus()
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"pid": proc.ID(),
"id": proc.Container().ID(),
"systemPid": proc.SystemPid(),
}).Error("containerd: get exit status")
}
logrus.WithFields(logrus.Fields{
"pid": proc.ID(),
"status": status,
"id": proc.Container().ID(),
"systemPid": proc.SystemPid(),
}).Debug("containerd: process exited")
// if the process is the the init process of the container then
// fire a separate event for this process
if proc.ID() != runtime.InitProcessID {
ne := &ExecExitTask{
ID: proc.Container().ID(),
PID: proc.ID(),
Status: status,
Process: proc,
}
s.execExit(ne)
return nil
}
container := proc.Container()
ne := &DeleteTask{
ID: container.ID(),
Status: status,
PID: proc.ID(),
Process: proc,
}
s.delete(ne)
return nil
}
// ExecExitTask holds needed parameters to execute the exec exit task
type ExecExitTask struct {
baseTask
ID string
PID string
Status uint32
Process runtime.Process
}
func (s *Supervisor) execExit(t *ExecExitTask) error {
container := t.Process.Container()
// exec process: we remove this process without notifying the main event loop
if err := container.RemoveProcess(t.PID); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
// If the exec spawned children which are still using its IO
// waiting here will block until they die or close their IO
// descriptors.
// Hence, we use a go routine to avoid block all other operations
go func() {
t.Process.Wait()
s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.ID,
Type: StateExit,
PID: t.PID,
Status: t.Status,
})
}()
return nil
}

View file

@ -1,47 +0,0 @@
package supervisor
import "github.com/docker/containerd/runtime"
// GetContainersTask holds needed parameters to retrieve a list of
// containers
type GetContainersTask struct {
baseTask
ID string
GetState func(c runtime.Container) (interface{}, error)
Containers []runtime.Container
States []interface{}
}
func (s *Supervisor) getContainers(t *GetContainersTask) error {
if t.ID != "" {
ci, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
t.Containers = append(t.Containers, ci.container)
if t.GetState != nil {
st, err := t.GetState(ci.container)
if err != nil {
return err
}
t.States = append(t.States, st)
}
return nil
}
for _, ci := range s.containers {
t.Containers = append(t.Containers, ci.container)
if t.GetState != nil {
st, err := t.GetState(ci.container)
if err != nil {
return err
}
t.States = append(t.States, st)
}
}
return nil
}

View file

@ -1,28 +0,0 @@
// +build !solaris
package supervisor
import "github.com/cloudfoundry/gosigar"
// Machine holds the current machine cpu count and ram size
type Machine struct {
Cpus int
Memory int64
}
// CollectMachineInformation returns information regarding the current
// machine (e.g. CPU count, RAM amount)
func CollectMachineInformation() (Machine, error) {
m := Machine{}
cpu := sigar.CpuList{}
if err := cpu.Get(); err != nil {
return m, err
}
m.Cpus = len(cpu.List)
mem := sigar.Mem{}
if err := mem.Get(); err != nil {
return m, err
}
m.Memory = int64(mem.Total / 1024 / 1024)
return m, nil
}

View file

@ -1,15 +0,0 @@
package supervisor
import (
"errors"
)
type Machine struct {
Cpus int
Memory int64
}
func CollectMachineInformation() (Machine, error) {
m := Machine{}
return m, errors.New("supervisor CollectMachineInformation not implemented on Solaris")
}

View file

@ -1,23 +0,0 @@
package supervisor
import (
"time"
"github.com/Sirupsen/logrus"
)
// OOMTask holds needed parameters to report a container OOM
type OOMTask struct {
baseTask
ID string
}
func (s *Supervisor) oom(t *OOMTask) error {
logrus.WithField("id", t.ID).Debug("containerd: container oom")
s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.ID,
Type: StateOOM,
})
return nil
}

View file

@ -1,28 +0,0 @@
package supervisor
import "os"
// SignalTask holds needed parameters to signal a container
type SignalTask struct {
baseTask
ID string
PID string
Signal os.Signal
}
func (s *Supervisor) signal(t *SignalTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
processes, err := i.container.Processes()
if err != nil {
return err
}
for _, p := range processes {
if p.ID() == t.PID {
return p.Signal(t.Signal)
}
}
return ErrProcessNotFound
}

View file

@ -1,27 +0,0 @@
package supervisor
import (
"sort"
"github.com/docker/containerd/runtime"
)
func sortProcesses(p []runtime.Process) {
sort.Sort(&processSorter{p})
}
type processSorter struct {
processes []runtime.Process
}
func (s *processSorter) Len() int {
return len(s.processes)
}
func (s *processSorter) Swap(i, j int) {
s.processes[i], s.processes[j] = s.processes[j], s.processes[i]
}
func (s *processSorter) Less(i, j int) bool {
return s.processes[j].ID() == "init"
}

View file

@ -1,89 +0,0 @@
package supervisor
import (
"flag"
"os"
"sort"
"testing"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/specs"
)
var (
runtimeTool = flag.String("runtime", "runc", "Runtime to use for this test")
)
type testProcess struct {
id string
}
func (p *testProcess) ID() string {
return p.id
}
func (p *testProcess) Start() error {
return nil
}
func (p *testProcess) CloseStdin() error {
return nil
}
func (p *testProcess) Resize(w, h int) error {
return nil
}
func (p *testProcess) Stdio() runtime.Stdio {
return runtime.Stdio{}
}
func (p *testProcess) SystemPid() int {
return -1
}
func (p *testProcess) ExitFD() int {
return -1
}
func (p *testProcess) ExitStatus() (uint32, error) {
return runtime.UnknownStatus, nil
}
func (p *testProcess) Container() runtime.Container {
return nil
}
func (p *testProcess) Spec() specs.ProcessSpec {
return specs.ProcessSpec{}
}
func (p *testProcess) Signal(os.Signal) error {
return nil
}
func (p *testProcess) Close() error {
return nil
}
func (p *testProcess) State() runtime.State {
return runtime.Running
}
func (p *testProcess) Wait() {
}
func TestSortProcesses(t *testing.T) {
p := []runtime.Process{
&testProcess{"ls"},
&testProcess{"other"},
&testProcess{"init"},
&testProcess{"other2"},
}
s := &processSorter{p}
sort.Sort(s)
if id := p[len(p)-1].ID(); id != "init" {
t.Fatalf("expected init but received %q", id)
}
}

View file

@ -1,28 +0,0 @@
package supervisor
import "github.com/docker/containerd/runtime"
// StatsTask holds needed parameters to retrieve a container statistics
type StatsTask struct {
baseTask
ID string
Stat chan *runtime.Stat
}
func (s *Supervisor) stats(t *StatsTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
// TODO: use workers for this
go func() {
s, err := i.container.Stats()
if err != nil {
t.ErrorCh() <- err
return
}
t.ErrorCh() <- nil
t.Stat <- s
}()
return errDeferredResponse
}

View file

@ -1,310 +0,0 @@
package supervisor
import (
"io/ioutil"
"os"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/monitor"
"github.com/docker/containerd/runtime"
)
const (
defaultBufferSize = 2048 // size of queue in eventloop
)
type Config struct {
StateDir string
Runtime string
ShimName string
RuntimeArgs []string
Timeout time.Duration
EventRetainCount int
}
// New returns an initialized Process supervisor.
func New(c Config) (*Supervisor, error) {
startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(c.StateDir, 0755); err != nil {
return nil, err
}
machine, err := CollectMachineInformation()
if err != nil {
return nil, err
}
m, err := monitor.New()
if err != nil {
return nil, err
}
go m.Run()
s := &Supervisor{
config: c,
containers: make(map[string]*containerInfo),
startTasks: startTasks,
machine: machine,
subscribers: make(map[chan Event]struct{}),
tasks: make(chan Task, defaultBufferSize),
monitor: m,
}
if err := setupEventLog(s, c.EventRetainCount); err != nil {
return nil, err
}
go s.monitorEventHandler()
if err := s.restore(); err != nil {
return nil, err
}
return s, nil
}
// Supervisor represents a container supervisor
type Supervisor struct {
config Config
containers map[string]*containerInfo
startTasks chan *startTask
// we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex
subscribers map[chan Event]struct{}
machine Machine
tasks chan Task
monitor *monitor.Monitor
eventLog []Event
eventLock sync.Mutex
}
// Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
// terminate. After it has handled all the SIGCHILD events it will close the signals chan
// and exit. Stop is a non-blocking call and will return after the containers have been signaled
func (s *Supervisor) Stop() {
// Close the startTasks channel so that no new containers get started
close(s.startTasks)
}
// Close closes any open files in the supervisor but expects that Stop has been
// callsed so that no more containers are started.
func (s *Supervisor) Close() error {
return nil
}
// Event represents a container event
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
PID string `json:"pid,omitempty"`
Status uint32 `json:"status,omitempty"`
}
// Events returns an event channel that external consumers can use to receive updates
// on container events
func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Event {
c := make(chan Event, defaultBufferSize)
if storedOnly {
defer s.Unsubscribe(c)
}
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
if !from.IsZero() {
// replay old event
s.eventLock.Lock()
past := s.eventLog[:]
s.eventLock.Unlock()
for _, e := range past {
if e.Timestamp.After(from) {
if id == "" || e.ID == id {
c <- e
}
}
}
}
if storedOnly {
close(c)
} else {
s.subscribers[c] = struct{}{}
}
return c
}
// Unsubscribe removes the provided channel from receiving any more events
func (s *Supervisor) Unsubscribe(sub chan Event) {
s.subscriberLock.Lock()
defer s.subscriberLock.Unlock()
if _, ok := s.subscribers[sub]; ok {
delete(s.subscribers, sub)
close(sub)
}
}
// notifySubscribers will send the provided event to the external subscribers
// of the events channel
func (s *Supervisor) notifySubscribers(e Event) {
s.subscriberLock.RLock()
defer s.subscriberLock.RUnlock()
for sub := range s.subscribers {
// do a non-blocking send for the channel
select {
case sub <- e:
default:
logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
}
}
}
// Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
// executing new containers.
//
// This event loop is the only thing that is allowed to modify state of containers and processes
// therefore it is save to do operations in the handlers that modify state of the system or
// state of the Supervisor
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir": s.config.StateDir,
"runtime": s.config.Runtime,
"runtimeArgs": s.config.RuntimeArgs,
"memory": s.machine.Memory,
"cpus": s.machine.Cpus,
}).Debug("containerd: supervisor running")
go func() {
for i := range s.tasks {
s.handleTask(i)
}
}()
return nil
}
// Machine returns the machine information for which the
// supervisor is executing on.
func (s *Supervisor) Machine() Machine {
return s.machine
}
// SendTask sends the provided event the the supervisors main event loop
func (s *Supervisor) SendTask(evt Task) {
s.tasks <- evt
}
func (s *Supervisor) monitorEventHandler() {
for e := range s.monitor.Events() {
switch t := e.(type) {
case runtime.Process:
if err := s.monitor.Remove(e); err != nil {
logrus.WithField("error", err).Error("containerd: remove process event FD from monitor")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process event FD")
}
ev := &ExitTask{
Process: t,
}
s.SendTask(ev)
case runtime.OOM:
if t.Removed() {
if err := s.monitor.Remove(e); err != nil {
logrus.WithField("error", err).Error("containerd: remove oom event FD from monitor")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close oom event FD")
}
// don't send an event on the close of this FD
continue
}
ev := &OOMTask{
ID: t.ContainerID(),
}
s.SendTask(ev)
}
}
}
func (s *Supervisor) restore() error {
dirs, err := ioutil.ReadDir(s.config.StateDir)
if err != nil {
return err
}
for _, d := range dirs {
if !d.IsDir() {
continue
}
id := d.Name()
container, err := runtime.Load(s.config.StateDir, id, s.config.ShimName, s.config.Timeout)
if err != nil {
return err
}
processes, err := container.Processes()
if err != nil {
return err
}
s.containers[id] = &containerInfo{
container: container,
}
oom, err := container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: get oom FD")
}
if err := s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited {
logrus.WithField("error", err).Error("containerd: notify OOM events")
}
logrus.WithField("id", id).Debug("containerd: container restored")
var exitedProcesses []runtime.Process
for _, p := range processes {
if p.State() == runtime.Running {
if err := s.monitor.Add(p); err != nil {
return err
}
} else {
exitedProcesses = append(exitedProcesses, p)
}
}
if len(exitedProcesses) > 0 {
// sort processes so that init is fired last because that is how the kernel sends the
// exit events
sortProcesses(exitedProcesses)
for _, p := range exitedProcesses {
e := &ExitTask{
Process: p,
}
s.SendTask(e)
}
}
}
return nil
}
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
case *OOMTask:
err = s.oom(t)
default:
err = ErrUnknownTask
}
if err != errDeferredResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}

View file

@ -1,33 +0,0 @@
package supervisor
import (
"sync"
"github.com/docker/containerd/runtime"
)
// StartResponse is the response containing a started container
type StartResponse struct {
Container runtime.Container
}
// Task executes an action returning an error chan with either nil or
// the error from executing the task
type Task interface {
// ErrorCh returns a channel used to report and error from an async task
ErrorCh() chan error
}
type baseTask struct {
errCh chan error
mu sync.Mutex
}
func (t *baseTask) ErrorCh() chan error {
t.mu.Lock()
defer t.mu.Unlock()
if t.errCh == nil {
t.errCh = make(chan error, 1)
}
return t.errCh
}

View file

@ -1,12 +0,0 @@
package supervisor
// State constants used in Event types
const (
StateStart = "start-container"
StatePause = "pause"
StateResume = "resume"
StateExit = "exit"
StateStartProcess = "start-process"
StateOOM = "oom"
StateLive = "live"
)

View file

@ -1,95 +0,0 @@
package supervisor
import (
"time"
"github.com/docker/containerd/runtime"
)
// UpdateTask holds needed parameters to update a container resource constraints
type UpdateTask struct {
baseTask
ID string
State runtime.State
Resources *runtime.Resource
}
func (s *Supervisor) updateContainer(t *UpdateTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
container := i.container
if t.State != "" {
switch t.State {
case runtime.Running:
if err := container.Resume(); err != nil {
return err
}
s.notifySubscribers(Event{
ID: t.ID,
Type: StateResume,
Timestamp: time.Now(),
})
case runtime.Paused:
if err := container.Pause(); err != nil {
return err
}
s.notifySubscribers(Event{
ID: t.ID,
Type: StatePause,
Timestamp: time.Now(),
})
default:
return ErrUnknownContainerStatus
}
return nil
}
if t.Resources != nil {
return container.UpdateResources(t.Resources)
}
return nil
}
// UpdateProcessTask holds needed parameters to update a container
// process terminal size or close its stdin
type UpdateProcessTask struct {
baseTask
ID string
PID string
CloseStdin bool
Width int
Height int
}
func (s *Supervisor) updateProcess(t *UpdateProcessTask) error {
i, ok := s.containers[t.ID]
if !ok {
return ErrContainerNotFound
}
processes, err := i.container.Processes()
if err != nil {
return err
}
var process runtime.Process
for _, p := range processes {
if p.ID() == t.PID {
process = p
break
}
}
if process == nil {
return ErrProcessNotFound
}
if t.CloseStdin {
if err := process.CloseStdin(); err != nil {
return err
}
}
if t.Width > 0 || t.Height > 0 {
if err := process.Resize(t.Width, t.Height); err != nil {
return err
}
}
return nil
}

View file

@ -1,101 +0,0 @@
package supervisor
import (
"encoding/json"
"io"
"os"
"path/filepath"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
type containerInfo struct {
container runtime.Container
}
func setupEventLog(s *Supervisor, retainCount int) error {
if err := readEventLog(s); err != nil {
return err
}
logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
events := s.Events(time.Time{}, false, "")
return eventLogger(s, filepath.Join(s.config.StateDir, "events.log"), events, retainCount)
}
func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
if err != nil {
return err
}
go func() {
var (
count = len(s.eventLog)
enc = json.NewEncoder(f)
)
for e := range events {
// if we have a specified retain count make sure the truncate the event
// log if it grows past the specified number of events to keep.
if retainCount > 0 {
if count > retainCount {
logrus.Debug("truncating event log")
// close the log file
if f != nil {
f.Close()
}
slice := retainCount - 1
l := len(s.eventLog)
if slice >= l {
slice = l
}
s.eventLock.Lock()
s.eventLog = s.eventLog[len(s.eventLog)-slice:]
s.eventLock.Unlock()
if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
logrus.WithField("error", err).Error("containerd: open event to journal")
continue
}
enc = json.NewEncoder(f)
count = 0
for _, le := range s.eventLog {
if err := enc.Encode(le); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}
}
s.eventLock.Lock()
s.eventLog = append(s.eventLog, e)
s.eventLock.Unlock()
count++
if err := enc.Encode(e); err != nil {
logrus.WithField("error", err).Error("containerd: write event to journal")
}
}
}()
return nil
}
func readEventLog(s *Supervisor) error {
f, err := os.Open(filepath.Join(s.config.StateDir, "events.log"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := json.NewDecoder(f)
for {
var e Event
if err := dec.Decode(&e); err != nil {
if err == io.EOF {
break
}
return err
}
s.eventLog = append(s.eventLog, e)
}
return nil
}

View file

@ -1,103 +0,0 @@
package supervisor
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
// Worker interface
type Worker interface {
Start()
}
type startTask struct {
Container runtime.Container
CheckpointPath string
Stdin string
Stdout string
Stderr string
Err chan error
StartResponse chan StartResponse
}
// NewWorker return a new initialized worker
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
return &worker{
s: s,
wg: wg,
}
}
type worker struct {
wg *sync.WaitGroup
s *Supervisor
}
// Start runs a loop in charge of starting new containers
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.startTasks {
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"id": t.Container.ID(),
}).Error("containerd: start container")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
oom, err := t.Container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: get oom FD")
}
if err := w.s.monitor.Add(oom); err != nil && err != runtime.ErrContainerExited {
if process.State() != runtime.Stopped {
logrus.WithField("error", err).Error("containerd: notify OOM events")
}
}
if err := w.s.monitor.Add(process); err != nil {
logrus.WithField("error", err).Error("containerd: add process to monitor")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
// only call process start if we aren't restoring from a checkpoint
// if we have restored from a checkpoint then the process is already started
if t.CheckpointPath == "" {
if err := process.Start(); err != nil {
logrus.WithField("error", err).Error("containerd: start init process")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}
}
t.Err <- nil
t.StartResponse <- StartResponse{
Container: t.Container,
}
w.s.notifySubscribers(Event{
Timestamp: time.Now(),
ID: t.Container.ID(),
Type: StateStart,
})
}
}