Add shim for reattach of processes

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Remove runtime files from containerd

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Update supervisor for orphaned containers

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Remove ctr/container.go back to rpc calls

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add attach to loaded container

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add monitor based on epoll for process exits

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Convert pids in containerd to string

This is so that we no longer care about linux or system level pids and
processes in containerd have user defined process id(pid) kinda like the
exec process ids that docker has today.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add reaper back to containerd

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Implement list containers with new process model

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Implement restore of processes

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add NONBLOCK to exit fifo open

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Implement tty reattach

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Fix race in exit pipe creation

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Add delete to shim

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Update shim to use pid-file and not stdout

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-01-06 13:32:46 -08:00
parent 8d1f71c3d7
commit fe38efda50
33 changed files with 1210 additions and 1836 deletions

View file

@ -1,11 +1,5 @@
package supervisor
import (
"time"
"github.com/Sirupsen/logrus"
)
type AddProcessEvent struct {
s *Supervisor
}
@ -13,30 +7,32 @@ type AddProcessEvent struct {
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container
// could be stopped and removed...
func (h *AddProcessEvent) Handle(e *Event) error {
start := time.Now()
ci, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process, e.Console)
if err != nil {
return err
}
if e.Pid, err = p.Pid(); err != nil {
return err
}
h.s.processes[e.Pid] = &containerInfo{
container: ci.container,
}
l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io)
if err != nil {
// log the error but continue with the other commands
logrus.WithFields(logrus.Fields{
"error": err,
"id": e.ID,
}).Error("log stdio")
}
h.s.processes[e.Pid].copier = l
ExecProcessTimer.UpdateSince(start)
/*
start := time.Now()
ci, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process, e.Console)
if err != nil {
return err
}
if e.Pid, err = p.Pid(); err != nil {
return err
}
h.s.processes[e.Pid] = &containerInfo{
container: ci.container,
}
l, err := h.s.copyIO(e.Stdin, e.Stdout, e.Stderr, io)
if err != nil {
// log the error but continue with the other commands
logrus.WithFields(logrus.Fields{
"error": err,
"id": e.ID,
}).Error("log stdio")
}
h.s.processes[e.Pid].copier = l
ExecProcessTimer.UpdateSince(start)
*/
return nil
}

View file

@ -5,11 +5,14 @@ type CreateCheckpointEvent struct {
}
func (h *CreateCheckpointEvent) Handle(e *Event) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
return i.container.Checkpoint(*e.Checkpoint)
/*
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
*/
return nil
// return i.container.Checkpoint(*e.Checkpoint)
}
type DeleteCheckpointEvent struct {
@ -17,9 +20,12 @@ type DeleteCheckpointEvent struct {
}
func (h *DeleteCheckpointEvent) Handle(e *Event) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
return i.container.DeleteCheckpoint(e.Checkpoint.Name)
/*
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
*/
return nil
// return i.container.DeleteCheckpoint(e.Checkpoint.Name)
}

View file

@ -1,6 +1,10 @@
package supervisor
import "time"
import (
"time"
"github.com/docker/containerd/runtime"
)
type StartEvent struct {
s *Supervisor
@ -8,22 +12,17 @@ type StartEvent struct {
func (h *StartEvent) Handle(e *Event) error {
start := time.Now()
container, io, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Console)
container, err := runtime.New(h.s.stateDir, e.ID, e.BundlePath)
if err != nil {
return err
}
h.s.containerGroup.Add(1)
h.s.containers[e.ID] = &containerInfo{
container: container,
}
ContainersCounter.Inc(1)
task := &StartTask{
Err: e.Err,
IO: io,
Container: container,
Stdin: e.Stdin,
Stdout: e.Stdout,
Stderr: e.Stderr,
StartResponse: e.StartResponse,
}
if e.Checkpoint != nil {

View file

@ -29,7 +29,6 @@ func (h *DeleteEvent) Handle(e *Event) error {
Pid: e.Pid,
})
ContainersCounter.Dec(1)
h.s.containerGroup.Done()
ContainerDeleteTimer.UpdateSince(start)
}
return nil

View file

@ -36,7 +36,9 @@ func NewEvent(t EventType) *Event {
}
type StartResponse struct {
Pid int
Stdin string
Stdout string
Stderr string
}
type Event struct {
@ -48,11 +50,12 @@ type Event struct {
Stderr string
Stdin string
Console string
Pid int
Pid string
Status int
Signal os.Signal
Process *specs.Process
Process runtime.Process
State runtime.State
ProcessSpec *specs.Process
Containers []runtime.Container
Checkpoint *runtime.Checkpoint
Err chan error

View file

@ -4,6 +4,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
type ExitEvent struct {
@ -12,36 +13,36 @@ type ExitEvent struct {
func (h *ExitEvent) Handle(e *Event) error {
start := time.Now()
logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}).
Debug("containerd: process exited")
// is it the child process of a container
if info, ok := h.s.processes[e.Pid]; ok {
ne := NewEvent(ExecExitEventType)
ne.ID = info.container.ID()
ne.Pid = e.Pid
ne.Status = e.Status
h.s.SendEvent(ne)
return nil
}
// is it the main container's process
container, err := h.s.getContainerForPid(e.Pid)
proc := e.Process
status, err := proc.ExitStatus()
if err != nil {
if err != errNoContainerForPid {
logrus.WithField("error", err).Error("containerd: find containers main pid")
}
logrus.WithField("error", err).Error("containerd: get exit status")
}
logrus.WithFields(logrus.Fields{"pid": proc.ID(), "status": status}).Debug("containerd: process exited")
// if the process is the the init process of the container then
// fire a separate event for this process
if proc.ID() != runtime.InitProcessID {
ne := NewEvent(ExecExitEventType)
ne.ID = proc.Container().ID()
ne.Status = status
h.s.SendEvent(ne)
return nil
}
container.SetExited(e.Status)
container := proc.Container()
ne := NewEvent(DeleteEventType)
ne.ID = container.ID()
ne.Pid = e.Pid
ne.Status = e.Status
ne.Status = status
ne.Pid = proc.ID()
h.s.SendEvent(ne)
// remove stats collection for container
stopCollect := NewEvent(StopStatsEventType)
stopCollect.ID = container.ID()
h.s.SendEvent(stopCollect)
ExitProcessTimer.UpdateSince(start)
return nil
}
@ -51,14 +52,16 @@ type ExecExitEvent struct {
func (h *ExecExitEvent) Handle(e *Event) error {
// exec process: we remove this process without notifying the main event loop
info := h.s.processes[e.Pid]
if err := info.container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
if err := info.copier.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
delete(h.s.processes, e.Pid)
h.s.notifySubscribers(e)
/*
info := h.s.processes[e.Pid]
if err := info.container.RemoveProcess(e.Pid); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid")
}
if err := info.copier.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
delete(h.s.processes, e.Pid)
h.s.notifySubscribers(e)
*/
return nil
}

View file

@ -3,15 +3,12 @@ package supervisor
import "github.com/cloudfoundry/gosigar"
type Machine struct {
ID string
Cpus int
Memory int64
}
func CollectMachineInformation(id string) (Machine, error) {
m := Machine{
ID: id,
}
func CollectMachineInformation() (Machine, error) {
m := Machine{}
cpu := sigar.CpuList{}
if err := cpu.Get(); err != nil {
return m, err

86
supervisor/monitor.go Normal file
View file

@ -0,0 +1,86 @@
package supervisor
import (
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
)
func NewMonitor() (*Monitor, error) {
m := &Monitor{
processes: make(map[int]runtime.Process),
exits: make(chan runtime.Process, 1024),
}
fd, err := syscall.EpollCreate1(0)
if err != nil {
return nil, err
}
m.epollFd = fd
go m.start()
return m, nil
}
type Monitor struct {
m sync.Mutex
processes map[int]runtime.Process
exits chan runtime.Process
epollFd int
}
func (m *Monitor) Exits() chan runtime.Process {
return m.exits
}
func (m *Monitor) Monitor(p runtime.Process) error {
m.m.Lock()
defer m.m.Unlock()
fd := p.ExitFD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
m.processes[fd] = p
return nil
}
func (m *Monitor) Close() error {
return syscall.Close(m.epollFd)
}
func (m *Monitor) start() {
var events [128]syscall.EpollEvent
for {
n, err := syscall.EpollWait(m.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
logrus.WithField("error", err).Fatal("containerd: epoll wait")
}
// process events
for i := 0; i < n; i++ {
if events[i].Events == syscall.EPOLLHUP {
fd := int(events[i].Fd)
m.m.Lock()
proc := m.processes[fd]
delete(m.processes, fd)
if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
logrus.WithField("error", err).Fatal("containerd: epoll remove fd")
}
if err := proc.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
m.m.Unlock()
m.exits <- proc
}
}
}
}

View file

@ -5,18 +5,20 @@ type SignalEvent struct {
}
func (h *SignalEvent) Handle(e *Event) error {
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
processes, err := i.container.Processes()
if err != nil {
return err
}
for _, p := range processes {
if pid, err := p.Pid(); err == nil && pid == e.Pid {
return p.Signal(e.Signal)
/*
i, ok := h.s.containers[e.ID]
if !ok {
return ErrContainerNotFound
}
}
processes, err := i.container.Processes()
if err != nil {
return err
}
for _, p := range processes {
if pid, err := p.Pid(); err == nil && pid == e.Pid {
return p.Signal(e.Signal)
}
}
*/
return ErrProcessNotFound
}

View file

@ -9,7 +9,6 @@ import (
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/docker/pkg/pubsub"
@ -179,15 +178,17 @@ func (s *statsCollector) run() {
continue
}
for _, pair := range pairs {
stats, err := pair.ct.Stats()
if err != nil {
logrus.Errorf("Error getting stats for container ID %s", pair.ct.ID())
continue
}
/*
for _, pair := range pairs {
stats, err := pair.ct.Stats()
if err != nil {
logrus.Errorf("Error getting stats for container ID %s", pair.ct.ID())
continue
}
pair.pub.Publish(convertToPb(stats))
}
pair.pub.Publish(convertToPb(stats))
}
*/
}
}

View file

@ -1,18 +1,16 @@
package supervisor
import (
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify"
"github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime"
"github.com/opencontainers/runc/libcontainer"
)
const (
@ -21,29 +19,27 @@ const (
)
// New returns an initialized Process supervisor.
func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) {
func New(stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, error) {
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
// register counters
r, err := newRuntime(filepath.Join(stateDir, id))
machine, err := CollectMachineInformation()
if err != nil {
return nil, err
}
machine, err := CollectMachineInformation(id)
monitor, err := NewMonitor()
if err != nil {
return nil, err
}
s := &Supervisor{
stateDir: stateDir,
containers: make(map[string]*containerInfo),
processes: make(map[int]*containerInfo),
runtime: r,
tasks: tasks,
machine: machine,
subscribers: make(map[chan *Event]struct{}),
statsCollector: newStatsCollector(statsInterval),
el: eventloop.NewChanLoop(defaultBufferSize),
monitor: monitor,
}
if oom {
s.notifier = chanotify.New()
@ -71,7 +67,10 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err
UnsubscribeStatsEventType: &UnsubscribeStatsEvent{s},
StopStatsEventType: &StopStatsEvent{s},
}
// start the container workers for concurrent container starts
go s.exitHandler()
if err := s.restore(); err != nil {
return nil, err
}
return s, nil
}
@ -84,9 +83,7 @@ type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information.
stateDir string
containers map[string]*containerInfo
processes map[int]*containerInfo
handlers map[EventType]Handler
runtime runtime.Runtime
events chan *Event
tasks chan *StartTask
// we need a lock around the subscribers map only because additions and deletions from
@ -94,51 +91,18 @@ type Supervisor struct {
subscriberLock sync.RWMutex
subscribers map[chan *Event]struct{}
machine Machine
containerGroup sync.WaitGroup
statsCollector *statsCollector
notifier *chanotify.Notifier
el eventloop.EventLoop
monitor *Monitor
}
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
// terminate. After it has handled all the SIGCHILD events it will close the signals chan
// and exit. Stop is a non-blocking call and will return after the containers have been signaled
func (s *Supervisor) Stop(sig chan os.Signal) {
func (s *Supervisor) Stop() {
// Close the tasks channel so that no new containers get started
close(s.tasks)
// send a SIGTERM to all containers
for id, i := range s.containers {
c := i.container
logrus.WithField("id", id).Debug("sending TERM to container processes")
procs, err := c.Processes()
if err != nil {
logrus.WithField("id", id).Warn("get container processes")
continue
}
if len(procs) == 0 {
continue
}
mainProc := procs[0]
if err := mainProc.Signal(syscall.SIGTERM); err != nil {
pid, _ := mainProc.Pid()
logrus.WithFields(logrus.Fields{
"id": id,
"pid": pid,
"error": err,
}).Error("send SIGTERM to process")
}
}
go func() {
logrus.Debug("waiting for containers to exit")
s.containerGroup.Wait()
logrus.Debug("all containers exited")
if s.notifier != nil {
s.notifier.Close()
}
// stop receiving signals and close the channel
signal.Stop(sig)
close(sig)
}()
}
// Close closes any open files in the supervisor but expects that Stop has been
@ -190,7 +154,6 @@ func (s *Supervisor) notifySubscribers(e *Event) {
// state of the Supervisor
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"runtime": s.runtime.Type(),
"stateDir": s.stateDir,
}).Debug("Supervisor started")
return s.el.Start()
@ -202,45 +165,60 @@ func (s *Supervisor) Machine() Machine {
return s.machine
}
// getContainerForPid returns the container where the provided pid is the pid1 or main
// process in the container
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
for _, i := range s.containers {
container := i.container
cpid, err := container.Pid()
if err != nil {
if lerr, ok := err.(libcontainer.Error); ok {
if lerr.Code() == libcontainer.ProcessNotExecuted {
continue
}
}
logrus.WithField("error", err).Error("containerd: get container pid")
}
if pid == cpid {
return container, nil
}
}
return nil, errNoContainerForPid
}
// SendEvent sends the provided event the the supervisors main event loop
func (s *Supervisor) SendEvent(evt *Event) {
EventsCounter.Inc(1)
s.el.Send(&commonEvent{data: evt, sv: s})
}
func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {
config := &ioConfig{
Stdin: i.Stdin,
Stdout: i.Stdout,
Stderr: i.Stderr,
StdoutPath: stdout,
StderrPath: stderr,
StdinPath: stdin,
func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() {
e := NewEvent(ExitEventType)
e.Process = p
s.SendEvent(e)
}
l, err := newCopier(config)
if err != nil {
return nil, err
}
return l, nil
}
func (s *Supervisor) monitorProcess(p runtime.Process) error {
return s.monitor.Monitor(p)
}
func (s *Supervisor) restore() error {
dirs, err := ioutil.ReadDir(s.stateDir)
if err != nil {
return err
}
for _, d := range dirs {
if !d.IsDir() {
continue
}
id := d.Name()
container, err := runtime.Load(s.stateDir, id)
if err != nil {
if err == runtime.ErrContainerExited {
logrus.WithField("id", id).Debug("containerd: container exited while away")
// TODO: fire events to do the removal
if err := os.RemoveAll(filepath.Join(s.stateDir, id)); err != nil {
logrus.WithField("error", err).Warn("containerd: remove container state")
}
continue
}
return err
}
processes, err := container.Processes()
if err != nil {
return err
}
ContainersCounter.Inc(1)
s.containers[id] = &containerInfo{
container: container,
}
logrus.WithField("id", id).Debug("containerd: container restored")
for _, p := range processes {
if err := s.monitorProcess(p); err != nil {
return err
}
}
}
return nil
}

View file

@ -1,12 +0,0 @@
// +build libcontainer
package supervisor
import (
"github.com/docker/containerd/linux"
"github.com/docker/containerd/runtime"
)
func newRuntime(stateDir string) (runtime.Runtime, error) {
return linux.NewRuntime(stateDir)
}

View file

@ -1,12 +0,0 @@
// +build runc
package supervisor
import (
"github.com/docker/containerd/runc"
"github.com/docker/containerd/runtime"
)
func newRuntime(stateDir string) (runtime.Runtime, error) {
return runc.NewRuntime(stateDir)
}

View file

@ -1,13 +0,0 @@
// +build !libcontainer,!runc
package supervisor
import (
"errors"
"github.com/docker/containerd/runtime"
)
func newRuntime(stateDir string) (runtime.Runtime, error) {
return nil, errors.New("unsupported platform")
}

View file

@ -27,15 +27,17 @@ func (h *UpdateEvent) Handle(e *Event) error {
}
}
if e.Signal != nil {
// signal the pid1/main process of the container
processes, err := container.Processes()
if err != nil {
return err
}
if len(processes) == 0 {
return ErrProcessNotFound
}
return processes[0].Signal(e.Signal)
/*
// signal the pid1/main process of the container
processes, err := container.Processes()
if err != nil {
return err
}
if len(processes) == 0 {
return ErrProcessNotFound
}
return processes[0].Signal(e.Signal)
*/
}
return nil
}

View file

@ -38,26 +38,24 @@ type worker struct {
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.tasks {
started := time.Now()
l, err := w.s.copyIO(t.Stdin, t.Stdout, t.Stderr, t.IO)
if err != nil {
evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
t.Err <- err
continue
}
w.s.containers[t.Container.ID()].copier = l
var (
err error
process runtime.Process
started = time.Now()
)
if t.Checkpoint != "" {
if err := t.Container.Restore(t.Checkpoint); err != nil {
evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
t.Err <- err
continue
}
/*
if err := t.Container.Restore(t.Checkpoint); err != nil {
evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
t.Err <- err
continue
}
*/
} else {
if err := t.Container.Start(); err != nil {
process, err = t.Container.Start()
if err != nil {
evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
@ -65,22 +63,25 @@ func (w *worker) Start() {
continue
}
}
pid, err := t.Container.Pid()
if err != nil {
logrus.WithField("error", err).Error("containerd: get container main pid")
}
if w.s.notifier != nil {
n, err := t.Container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: notify OOM events")
} else {
w.s.notifier.Add(t.Container.ID(), n)
}
/*
if w.s.notifier != nil {
n, err := t.Container.OOM()
if err != nil {
logrus.WithField("error", err).Error("containerd: notify OOM events")
} else {
w.s.notifier.Add(n, t.Container.ID())
}
}
*/
if err := w.s.monitorProcess(process); err != nil {
logrus.WithField("error", err).Error("containerd: add process to monitor")
}
ContainerStartTimer.UpdateSince(started)
t.Err <- nil
t.StartResponse <- StartResponse{
Pid: pid,
Stdin: process.Stdin(),
Stdout: process.Stdout(),
Stderr: process.Stderr(),
}
}
}