Merge pull request #106 from docker/event-switch

Remove eventloop package and use type switch
This commit is contained in:
Michael Crosby 2016-02-22 11:02:41 -08:00
commit 67ad55ad56
15 changed files with 265 additions and 359 deletions

View file

@ -33,7 +33,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
if c.BundlePath == "" { if c.BundlePath == "" {
return nil, errors.New("empty bundle path") return nil, errors.New("empty bundle path")
} }
e := supervisor.NewTask(supervisor.StartContainerTaskType) e := &supervisor.StartTask{}
e.ID = c.Id e.ID = c.Id
e.BundlePath = c.BundlePath e.BundlePath = c.BundlePath
e.Stdin = c.Stdin e.Stdin = c.Stdin
@ -47,7 +47,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
} }
} }
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
r := <-e.StartResponse r := <-e.StartResponse
@ -61,12 +61,12 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
} }
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
e := supervisor.NewTask(supervisor.SignalTaskType) e := &supervisor.SignalTask{}
e.ID = r.Id e.ID = r.Id
e.Pid = r.Pid e.PID = r.Pid
e.Signal = syscall.Signal(int(r.Signal)) e.Signal = syscall.Signal(int(r.Signal))
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
return &types.SignalResponse{}, nil return &types.SignalResponse{}, nil
@ -90,16 +90,16 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
if r.Pid == "" { if r.Pid == "" {
return nil, fmt.Errorf("process id cannot be empty") return nil, fmt.Errorf("process id cannot be empty")
} }
e := supervisor.NewTask(supervisor.AddProcessTaskType) e := &supervisor.AddProcessTask{}
e.ID = r.Id e.ID = r.Id
e.Pid = r.Pid e.PID = r.Pid
e.ProcessSpec = process e.ProcessSpec = process
e.Stdin = r.Stdin e.Stdin = r.Stdin
e.Stdout = r.Stdout e.Stdout = r.Stdout
e.Stderr = r.Stderr e.Stderr = r.Stderr
e.StartResponse = make(chan supervisor.StartResponse, 1) e.StartResponse = make(chan supervisor.StartResponse, 1)
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
<-e.StartResponse <-e.StartResponse
@ -107,7 +107,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
} }
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := supervisor.NewTask(supervisor.CreateCheckpointTaskType) e := &supervisor.CreateCheckpointTask{}
e.ID = r.Id e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: r.Checkpoint.Name, Name: r.Checkpoint.Name,
@ -117,7 +117,7 @@ func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpo
Shell: r.Checkpoint.Shell, Shell: r.Checkpoint.Shell,
} }
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
return &types.CreateCheckpointResponse{}, nil return &types.CreateCheckpointResponse{}, nil
@ -127,22 +127,22 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
if r.Name == "" { if r.Name == "" {
return nil, errors.New("checkpoint name cannot be empty") return nil, errors.New("checkpoint name cannot be empty")
} }
e := supervisor.NewTask(supervisor.DeleteCheckpointTaskType) e := &supervisor.DeleteCheckpointTask{}
e.ID = r.Id e.ID = r.Id
e.Checkpoint = &runtime.Checkpoint{ e.Checkpoint = &runtime.Checkpoint{
Name: r.Name, Name: r.Name,
} }
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
return &types.DeleteCheckpointResponse{}, nil return &types.DeleteCheckpointResponse{}, nil
} }
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := supervisor.NewTask(supervisor.GetContainerTaskType) e := &supervisor.GetContainersTask{}
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
var container runtime.Container var container runtime.Container
@ -174,10 +174,10 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR
} }
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) { func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
e := supervisor.NewTask(supervisor.GetContainerTaskType) e := &supervisor.GetContainersTask{}
e.ID = r.Id e.ID = r.Id
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
m := s.sv.Machine() m := s.sv.Machine()
@ -248,25 +248,25 @@ func toUint32(its []int) []uint32 {
} }
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
e := supervisor.NewTask(supervisor.UpdateContainerTaskType) e := &supervisor.UpdateTask{}
e.ID = r.Id e.ID = r.Id
e.State = runtime.State(r.Status) e.State = runtime.State(r.Status)
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
return &types.UpdateContainerResponse{}, nil return &types.UpdateContainerResponse{}, nil
} }
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) { func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
e := supervisor.NewTask(supervisor.UpdateProcessTaskType) e := &supervisor.UpdateProcessTask{}
e.ID = r.Id e.ID = r.Id
e.Pid = r.Pid e.PID = r.Pid
e.Height = int(r.Height) e.Height = int(r.Height)
e.Width = int(r.Width) e.Width = int(r.Width)
e.CloseStdin = r.CloseStdin e.CloseStdin = r.CloseStdin
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
return &types.UpdateProcessResponse{}, nil return &types.UpdateProcessResponse{}, nil
@ -284,7 +284,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
Id: e.ID, Id: e.ID,
Type: e.Type, Type: e.Type,
Timestamp: uint64(e.Timestamp.Unix()), Timestamp: uint64(e.Timestamp.Unix()),
Pid: e.Pid, Pid: e.PID,
Status: uint32(e.Status), Status: uint32(e.Status),
}); err != nil { }); err != nil {
return err return err
@ -294,11 +294,11 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
} }
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := supervisor.NewTask(supervisor.StatsTaskType) e := &supervisor.StatsTask{}
e.ID = r.Id e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1) e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e) s.sv.SendTask(e)
if err := <-e.Err; err != nil { if err := <-e.ErrorCh(); err != nil {
return nil, err return nil, err
} }
stats := <-e.Stat stats := <-e.Stat

View file

@ -1,46 +0,0 @@
package eventloop
import "sync"
// Event is receiving notification from loop with Handle() call.
type Event interface {
Handle()
}
// EventLoop is interface for event loops.
// Start starting events processing
// Send adding event to loop
type EventLoop interface {
Start() error
Send(Event) error
}
// ChanLoop is implementation of EventLoop based on channels.
type ChanLoop struct {
events chan Event
once sync.Once
}
// NewChanLoop returns ChanLoop with internal channel buffer set to q.
func NewChanLoop(q int) EventLoop {
return &ChanLoop{
events: make(chan Event, q),
}
}
// Start starting to read events from channel in separate goroutines.
// All calls after first is no-op.
func (el *ChanLoop) Start() error {
go el.once.Do(func() {
for ev := range el.events {
ev.Handle()
}
})
return nil
}
// Send sends event to channel. Will block if buffer is full.
func (el *ChanLoop) Send(ev Event) error {
el.events <- ev
return nil
}

View file

@ -1,73 +0,0 @@
package eventloop
import (
"sync"
"testing"
"time"
)
type racyEvent struct {
m map[int]struct{}
wg *sync.WaitGroup
}
func (e *racyEvent) Handle() {
e.m[0] = struct{}{}
e.wg.Done()
}
func simulateRacyEvents(el EventLoop) {
wg := &sync.WaitGroup{}
raceMap := make(map[int]struct{})
var evs []*racyEvent
for i := 0; i < 1024; i++ {
wg.Add(1)
evs = append(evs, &racyEvent{m: raceMap, wg: wg})
}
for _, ev := range evs {
el.Send(ev)
}
wg.Wait()
}
// run with -race
func TestChanRace(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
simulateRacyEvents(e)
}
// run with -race
func TestChanStartTwiceRace(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
e.Start()
simulateRacyEvents(e)
}
type testEvent struct {
wg *sync.WaitGroup
}
func (e *testEvent) Handle() {
e.wg.Done()
}
func TestChanEventSpawn(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
wg := &sync.WaitGroup{}
wg.Add(2)
e.Send(&testEvent{wg: wg})
e.Send(&testEvent{wg: wg})
waitCh := make(chan struct{})
go func() {
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
case <-time.After(1 * time.Second):
t.Fatal("Events was not handled in loop")
}
}

View file

@ -4,34 +4,40 @@ import (
"time" "time"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/opencontainers/specs"
) )
type AddProcessTask struct { type AddProcessTask struct {
s *Supervisor baseTask
ID string
PID string
Stdout string
Stderr string
Stdin string
ProcessSpec *specs.Process
StartResponse chan StartResponse
} }
// TODO: add this to worker for concurrent starts??? maybe not because of races where the container func (s *Supervisor) addProcess(t *AddProcessTask) error {
// could be stopped and removed...
func (h *AddProcessTask) Handle(e *Task) error {
start := time.Now() start := time.Now()
ci, ok := h.s.containers[e.ID] ci, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
process, err := ci.container.Exec(e.Pid, *e.ProcessSpec, runtime.NewStdio(e.Stdin, e.Stdout, e.Stderr)) process, err := ci.container.Exec(t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil { if err != nil {
return err return err
} }
if err := h.s.monitorProcess(process); err != nil { if err := s.monitorProcess(process); err != nil {
return err return err
} }
ExecProcessTimer.UpdateSince(start) ExecProcessTimer.UpdateSince(start)
e.StartResponse <- StartResponse{} t.StartResponse <- StartResponse{}
h.s.notifySubscribers(Event{ s.notifySubscribers(Event{
Timestamp: time.Now(), Timestamp: time.Now(),
Type: "start-process", Type: "start-process",
Pid: e.Pid, PID: t.PID,
ID: e.ID, ID: t.ID,
}) })
return nil return nil
} }

View file

@ -1,25 +1,31 @@
package supervisor package supervisor
import "github.com/docker/containerd/runtime"
type CreateCheckpointTask struct { type CreateCheckpointTask struct {
s *Supervisor baseTask
ID string
Checkpoint *runtime.Checkpoint
} }
func (h *CreateCheckpointTask) Handle(e *Task) error { func (s *Supervisor) createCheckpoint(t *CreateCheckpointTask) error {
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
return i.container.Checkpoint(*e.Checkpoint) return i.container.Checkpoint(*t.Checkpoint)
} }
type DeleteCheckpointTask struct { type DeleteCheckpointTask struct {
s *Supervisor baseTask
ID string
Checkpoint *runtime.Checkpoint
} }
func (h *DeleteCheckpointTask) Handle(e *Task) error { func (s *Supervisor) deleteCheckpoint(t *DeleteCheckpointTask) error {
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
return i.container.DeleteCheckpoint(e.Checkpoint.Name) return i.container.DeleteCheckpoint(t.Checkpoint.Name)
} }

View file

@ -7,31 +7,39 @@ import (
) )
type StartTask struct { type StartTask struct {
s *Supervisor baseTask
ID string
BundlePath string
Stdout string
Stderr string
Stdin string
StartResponse chan StartResponse
Checkpoint *runtime.Checkpoint
Labels []string
} }
func (h *StartTask) Handle(e *Task) error { func (s *Supervisor) start(t *StartTask) error {
start := time.Now() start := time.Now()
container, err := runtime.New(h.s.stateDir, e.ID, e.BundlePath, e.Labels) container, err := runtime.New(s.stateDir, t.ID, t.BundlePath, t.Labels)
if err != nil { if err != nil {
return err return err
} }
h.s.containers[e.ID] = &containerInfo{ s.containers[t.ID] = &containerInfo{
container: container, container: container,
} }
ContainersCounter.Inc(1) ContainersCounter.Inc(1)
task := &startTask{ task := &startTask{
Err: e.Err, Err: t.ErrorCh(),
Container: container, Container: container,
StartResponse: e.StartResponse, StartResponse: t.StartResponse,
Stdin: e.Stdin, Stdin: t.Stdin,
Stdout: e.Stdout, Stdout: t.Stdout,
Stderr: e.Stderr, Stderr: t.Stderr,
} }
if e.Checkpoint != nil { if t.Checkpoint != nil {
task.Checkpoint = e.Checkpoint.Name task.Checkpoint = t.Checkpoint.Name
} }
h.s.tasks <- task s.startTasks <- task
ContainerCreateTimer.UpdateSince(start) ContainerCreateTimer.UpdateSince(start)
return errDeferedResponse return errDeferedResponse
} }

View file

@ -8,21 +8,24 @@ import (
) )
type DeleteTask struct { type DeleteTask struct {
s *Supervisor baseTask
ID string
Status int
PID string
} }
func (h *DeleteTask) Handle(e *Task) error { func (s *Supervisor) delete(t *DeleteTask) error {
if i, ok := h.s.containers[e.ID]; ok { if i, ok := s.containers[t.ID]; ok {
start := time.Now() start := time.Now()
if err := h.deleteContainer(i.container); err != nil { if err := s.deleteContainer(i.container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container") logrus.WithField("error", err).Error("containerd: deleting container")
} }
h.s.notifySubscribers(Event{ s.notifySubscribers(Event{
Type: "exit", Type: "exit",
Timestamp: time.Now(), Timestamp: time.Now(),
ID: e.ID, ID: t.ID,
Status: e.Status, Status: t.Status,
Pid: e.Pid, PID: t.PID,
}) })
ContainersCounter.Dec(1) ContainersCounter.Dec(1)
ContainerDeleteTimer.UpdateSince(start) ContainerDeleteTimer.UpdateSince(start)
@ -30,7 +33,7 @@ func (h *DeleteTask) Handle(e *Task) error {
return nil return nil
} }
func (h *DeleteTask) deleteContainer(container runtime.Container) error { func (s *Supervisor) deleteContainer(container runtime.Container) error {
delete(h.s.containers, container.ID()) delete(s.containers, container.ID())
return container.Delete() return container.Delete()
} }

View file

@ -8,12 +8,13 @@ import (
) )
type ExitTask struct { type ExitTask struct {
s *Supervisor baseTask
Process runtime.Process
} }
func (h *ExitTask) Handle(e *Task) error { func (s *Supervisor) exit(t *ExitTask) error {
start := time.Now() start := time.Now()
proc := e.Process proc := t.Process
status, err := proc.ExitStatus() status, err := proc.ExitStatus()
if err != nil { if err != nil {
logrus.WithField("error", err).Error("containerd: get exit status") logrus.WithField("error", err).Error("containerd: get exit status")
@ -23,21 +24,22 @@ func (h *ExitTask) Handle(e *Task) error {
// if the process is the the init process of the container then // if the process is the the init process of the container then
// fire a separate event for this process // fire a separate event for this process
if proc.ID() != runtime.InitProcessID { if proc.ID() != runtime.InitProcessID {
ne := NewTask(ExecExitTaskType) ne := &ExecExitTask{
ne.ID = proc.Container().ID() ID: proc.Container().ID(),
ne.Pid = proc.ID() PID: proc.ID(),
ne.Status = status Status: status,
ne.Process = proc Process: proc,
h.s.SendTask(ne) }
s.SendTask(ne)
return nil return nil
} }
container := proc.Container() container := proc.Container()
ne := NewTask(DeleteTaskType) ne := &DeleteTask{
ne.ID = container.ID() ID: container.ID(),
ne.Status = status Status: status,
ne.Pid = proc.ID() PID: proc.ID(),
h.s.SendTask(ne) }
s.SendTask(ne)
ExitProcessTimer.UpdateSince(start) ExitProcessTimer.UpdateSince(start)
@ -45,21 +47,25 @@ func (h *ExitTask) Handle(e *Task) error {
} }
type ExecExitTask struct { type ExecExitTask struct {
s *Supervisor baseTask
ID string
PID string
Status int
Process runtime.Process
} }
func (h *ExecExitTask) Handle(e *Task) error { func (s *Supervisor) execExit(t *ExecExitTask) error {
container := e.Process.Container() container := t.Process.Container()
// exec process: we remove this process without notifying the main event loop // exec process: we remove this process without notifying the main event loop
if err := container.RemoveProcess(e.Pid); err != nil { if err := container.RemoveProcess(t.PID); err != nil {
logrus.WithField("error", err).Error("containerd: find container for pid") logrus.WithField("error", err).Error("containerd: find container for pid")
} }
h.s.notifySubscribers(Event{ s.notifySubscribers(Event{
Timestamp: time.Now(), Timestamp: time.Now(),
ID: e.ID, ID: t.ID,
Type: "exit", Type: "exit",
Pid: e.Pid, PID: t.PID,
Status: e.Status, Status: t.Status,
}) })
return nil return nil
} }

View file

@ -1,20 +1,24 @@
package supervisor package supervisor
import "github.com/docker/containerd/runtime"
type GetContainersTask struct { type GetContainersTask struct {
s *Supervisor baseTask
ID string
Containers []runtime.Container
} }
func (h *GetContainersTask) Handle(e *Task) error { func (s *Supervisor) getContainers(t *GetContainersTask) error {
if e.ID != "" { if t.ID != "" {
ci := h.s.containers[e.ID] ci := s.containers[t.ID]
if ci == nil { if ci == nil {
return ErrContainerNotFound return ErrContainerNotFound
} }
e.Containers = append(e.Containers, ci.container) t.Containers = append(t.Containers, ci.container)
return nil return nil
} }
for _, i := range h.s.containers { for _, i := range s.containers {
e.Containers = append(e.Containers, i.container) t.Containers = append(t.Containers, i.container)
} }
return nil return nil
} }

View file

@ -1,11 +1,16 @@
package supervisor package supervisor
import "os"
type SignalTask struct { type SignalTask struct {
s *Supervisor baseTask
ID string
PID string
Signal os.Signal
} }
func (h *SignalTask) Handle(e *Task) error { func (s *Supervisor) signal(t *SignalTask) error {
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
@ -14,8 +19,8 @@ func (h *SignalTask) Handle(e *Task) error {
return err return err
} }
for _, p := range processes { for _, p := range processes {
if p.ID() == e.Pid { if p.ID() == t.PID {
return p.Signal(e.Signal) return p.Signal(t.Signal)
} }
} }
return ErrProcessNotFound return ErrProcessNotFound

View file

@ -1,14 +1,21 @@
package supervisor package supervisor
import "time" import (
"time"
"github.com/docker/containerd/runtime"
)
type StatsTask struct { type StatsTask struct {
s *Supervisor baseTask
ID string
Stat chan *runtime.Stat
Err chan error
} }
func (h *StatsTask) Handle(e *Task) error { func (s *Supervisor) stats(t *StatsTask) error {
start := time.Now() start := time.Now()
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
@ -16,12 +23,12 @@ func (h *StatsTask) Handle(e *Task) error {
go func() { go func() {
s, err := i.container.Stats() s, err := i.container.Stats()
if err != nil { if err != nil {
e.Err <- err t.Err <- err
return return
} }
e.Err <- nil t.Err <- nil
e.Stat <- s t.Stat <- s
ContainerStatsTimer.UpdateSince(start) ContainerStatsTimer.UpdateSince(start)
}() }()
return errDeferedResponse return nil
} }

View file

@ -12,7 +12,6 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify" "github.com/docker/containerd/chanotify"
"github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
) )
@ -22,7 +21,7 @@ const (
// New returns an initialized Process supervisor. // New returns an initialized Process supervisor.
func New(stateDir string, oom bool) (*Supervisor, error) { func New(stateDir string, oom bool) (*Supervisor, error) {
tasks := make(chan *startTask, 10) startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil { if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err return nil, err
} }
@ -37,10 +36,10 @@ func New(stateDir string, oom bool) (*Supervisor, error) {
s := &Supervisor{ s := &Supervisor{
stateDir: stateDir, stateDir: stateDir,
containers: make(map[string]*containerInfo), containers: make(map[string]*containerInfo),
tasks: tasks, startTasks: startTasks,
machine: machine, machine: machine,
subscribers: make(map[chan Event]struct{}), subscribers: make(map[chan Event]struct{}),
el: eventloop.NewChanLoop(defaultBufferSize), tasks: make(chan Task, defaultBufferSize),
monitor: monitor, monitor: monitor,
} }
if err := setupEventLog(s); err != nil { if err := setupEventLog(s); err != nil {
@ -48,28 +47,16 @@ func New(stateDir string, oom bool) (*Supervisor, error) {
} }
if oom { if oom {
s.notifier = chanotify.New() s.notifier = chanotify.New()
go func() {
for id := range s.notifier.Chan() { /*
e := NewTask(OOMTaskType) go func() {
e.ID = id.(string) for id := range s.notifier.Chan() {
s.SendTask(e) e := NewTask(OOMTaskType)
} e.ID = id.(string)
}() s.SendTask(e)
} }
// register default event handlers }()
s.handlers = map[TaskType]Handler{ */
ExecExitTaskType: &ExecExitTask{s},
ExitTaskType: &ExitTask{s},
StartContainerTaskType: &StartTask{s},
DeleteTaskType: &DeleteTask{s},
GetContainerTaskType: &GetContainersTask{s},
SignalTaskType: &SignalTask{s},
AddProcessTaskType: &AddProcessTask{s},
UpdateContainerTaskType: &UpdateTask{s},
CreateCheckpointTaskType: &CreateCheckpointTask{s},
DeleteCheckpointTaskType: &DeleteCheckpointTask{s},
StatsTaskType: &StatsTask{s},
UpdateProcessTaskType: &UpdateProcessTask{s},
} }
go s.exitHandler() go s.exitHandler()
if err := s.restore(); err != nil { if err := s.restore(); err != nil {
@ -131,26 +118,24 @@ type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information. // stateDir is the directory on the system to store container runtime state information.
stateDir string stateDir string
containers map[string]*containerInfo containers map[string]*containerInfo
handlers map[TaskType]Handler startTasks chan *startTask
events chan *Task
tasks chan *startTask
// we need a lock around the subscribers map only because additions and deletions from // we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency // the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex subscriberLock sync.RWMutex
subscribers map[chan Event]struct{} subscribers map[chan Event]struct{}
machine Machine machine Machine
notifier *chanotify.Notifier notifier *chanotify.Notifier
el eventloop.EventLoop tasks chan Task
monitor *Monitor monitor *Monitor
eventLog []Event eventLog []Event
} }
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
// terminate. After it has handled all the SIGCHILD events it will close the signals chan // terminate. After it has handled all the SIGCHILD events it will close the signals chan
// and exit. Stop is a non-blocking call and will return after the containers have been signaled // and exit. Stop is a non-blocking call and will return after the containers have been signaled
func (s *Supervisor) Stop() { func (s *Supervisor) Stop() {
// Close the tasks channel so that no new containers get started // Close the startTasks channel so that no new containers get started
close(s.tasks) close(s.startTasks)
} }
// Close closes any open files in the supervisor but expects that Stop has been // Close closes any open files in the supervisor but expects that Stop has been
@ -163,7 +148,7 @@ type Event struct {
ID string `json:"id"` ID string `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Pid string `json:"pid,omitempty"` PID string `json:"pid,omitempty"`
Status int `json:"status,omitempty"` Status int `json:"status,omitempty"`
} }
@ -218,7 +203,48 @@ func (s *Supervisor) notifySubscribers(e Event) {
// state of the Supervisor // state of the Supervisor
func (s *Supervisor) Start() error { func (s *Supervisor) Start() error {
logrus.WithField("stateDir", s.stateDir).Debug("containerd: supervisor running") logrus.WithField("stateDir", s.stateDir).Debug("containerd: supervisor running")
return s.el.Start() go func() {
for i := range s.tasks {
s.handleTask(i)
}
}()
return nil
}
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *ExecExitTask:
err = s.execExit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
default:
err = ErrUnknownTask
}
if err != errDeferedResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
} }
// Machine returns the machine information for which the // Machine returns the machine information for which the
@ -228,15 +254,16 @@ func (s *Supervisor) Machine() Machine {
} }
// SendTask sends the provided event the the supervisors main event loop // SendTask sends the provided event the the supervisors main event loop
func (s *Supervisor) SendTask(evt *Task) { func (s *Supervisor) SendTask(evt Task) {
TasksCounter.Inc(1) TasksCounter.Inc(1)
s.el.Send(&commonTask{data: evt, sv: s}) s.tasks <- evt
} }
func (s *Supervisor) exitHandler() { func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() { for p := range s.monitor.Exits() {
e := NewTask(ExitTaskType) e := &ExitTask{
e.Process = p Process: p,
}
s.SendTask(e) s.SendTask(e)
} }
} }
@ -283,8 +310,9 @@ func (s *Supervisor) restore() error {
// exit events // exit events
sort.Sort(&processSorter{exitedProcesses}) sort.Sort(&processSorter{exitedProcesses})
for _, p := range exitedProcesses { for _, p := range exitedProcesses {
e := NewTask(ExitTaskType) e := &ExitTask{
e.Process = p Process: p,
}
s.SendTask(e) s.SendTask(e)
} }
} }

View file

@ -1,89 +1,33 @@
package supervisor package supervisor
import ( import (
"os" "sync"
"time"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/opencontainers/specs"
) )
type TaskType string // StartResponse is the response containing a started container
const (
ExecExitTaskType TaskType = "execExit"
ExitTaskType TaskType = "exit"
StartContainerTaskType TaskType = "startContainer"
DeleteTaskType TaskType = "deleteContainerEvent"
GetContainerTaskType TaskType = "getContainer"
SignalTaskType TaskType = "signal"
AddProcessTaskType TaskType = "addProcess"
UpdateContainerTaskType TaskType = "updateContainer"
UpdateProcessTaskType TaskType = "updateProcess"
CreateCheckpointTaskType TaskType = "createCheckpoint"
DeleteCheckpointTaskType TaskType = "deleteCheckpoint"
StatsTaskType TaskType = "events"
OOMTaskType TaskType = "oom"
)
func NewTask(t TaskType) *Task {
return &Task{
Type: t,
Timestamp: time.Now(),
Err: make(chan error, 1),
}
}
type StartResponse struct { type StartResponse struct {
Container runtime.Container Container runtime.Container
} }
type Task struct { // Task executes an action returning an error chan with either nil or
Type TaskType // the error from executing the task
Timestamp time.Time type Task interface {
ID string // ErrorCh returns a channel used to report and error from an async task
BundlePath string ErrorCh() chan error
Stdout string
Stderr string
Stdin string
Console string
Pid string
Status int
Signal os.Signal
Process runtime.Process
State runtime.State
ProcessSpec *specs.Process
Containers []runtime.Container
Checkpoint *runtime.Checkpoint
Err chan error
StartResponse chan StartResponse
Stat chan *runtime.Stat
CloseStdin bool
ResizeTty bool
Width int
Height int
Labels []string
} }
type Handler interface { type baseTask struct {
Handle(*Task) error errCh chan error
mu sync.Mutex
} }
type commonTask struct { func (t *baseTask) ErrorCh() chan error {
data *Task t.mu.Lock()
sv *Supervisor defer t.mu.Unlock()
} if t.errCh == nil {
t.errCh = make(chan error, 1)
func (e *commonTask) Handle() {
h, ok := e.sv.handlers[e.data.Type]
if !ok {
e.data.Err <- ErrUnknownTask
return
}
err := h.Handle(e.data)
if err != errDeferedResponse {
e.data.Err <- err
close(e.data.Err)
return
} }
return t.errCh
} }

View file

@ -7,23 +7,25 @@ import (
) )
type UpdateTask struct { type UpdateTask struct {
s *Supervisor baseTask
ID string
State runtime.State
} }
func (h *UpdateTask) Handle(e *Task) error { func (s *Supervisor) updateContainer(t *UpdateTask) error {
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
container := i.container container := i.container
if e.State != "" { if t.State != "" {
switch e.State { switch t.State {
case runtime.Running: case runtime.Running:
if err := container.Resume(); err != nil { if err := container.Resume(); err != nil {
return ErrUnknownContainerStatus return ErrUnknownContainerStatus
} }
h.s.notifySubscribers(Event{ s.notifySubscribers(Event{
ID: e.ID, ID: t.ID,
Type: "resume", Type: "resume",
Timestamp: time.Now(), Timestamp: time.Now(),
}) })
@ -31,8 +33,8 @@ func (h *UpdateTask) Handle(e *Task) error {
if err := container.Pause(); err != nil { if err := container.Pause(); err != nil {
return ErrUnknownContainerStatus return ErrUnknownContainerStatus
} }
h.s.notifySubscribers(Event{ s.notifySubscribers(Event{
ID: e.ID, ID: t.ID,
Type: "pause", Type: "pause",
Timestamp: time.Now(), Timestamp: time.Now(),
}) })
@ -44,11 +46,16 @@ func (h *UpdateTask) Handle(e *Task) error {
} }
type UpdateProcessTask struct { type UpdateProcessTask struct {
s *Supervisor baseTask
ID string
PID string
CloseStdin bool
Width int
Height int
} }
func (h *UpdateProcessTask) Handle(e *Task) error { func (s *Supervisor) updateProcess(t *UpdateProcessTask) error {
i, ok := h.s.containers[e.ID] i, ok := s.containers[t.ID]
if !ok { if !ok {
return ErrContainerNotFound return ErrContainerNotFound
} }
@ -58,7 +65,7 @@ func (h *UpdateProcessTask) Handle(e *Task) error {
} }
var process runtime.Process var process runtime.Process
for _, p := range processes { for _, p := range processes {
if p.ID() == e.Pid { if p.ID() == t.PID {
process = p process = p
break break
} }
@ -66,13 +73,13 @@ func (h *UpdateProcessTask) Handle(e *Task) error {
if process == nil { if process == nil {
return ErrProcessNotFound return ErrProcessNotFound
} }
if e.CloseStdin { if t.CloseStdin {
if err := process.CloseStdin(); err != nil { if err := process.CloseStdin(); err != nil {
return err return err
} }
} }
if e.Width > 0 || e.Height > 0 { if t.Width > 0 || t.Height > 0 {
if err := process.Resize(e.Width, e.Height); err != nil { if err := process.Resize(t.Width, t.Height); err != nil {
return err return err
} }
} }

View file

@ -36,12 +36,13 @@ type worker struct {
func (w *worker) Start() { func (w *worker) Start() {
defer w.wg.Done() defer w.wg.Done()
for t := range w.s.tasks { for t := range w.s.startTasks {
started := time.Now() started := time.Now()
process, err := t.Container.Start(t.Checkpoint, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) process, err := t.Container.Start(t.Checkpoint, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil { if err != nil {
evt := NewTask(DeleteTaskType) evt := &DeleteTask{
evt.ID = t.Container.ID() ID: t.Container.ID(),
}
w.s.SendTask(evt) w.s.SendTask(evt)
t.Err <- err t.Err <- err
continue continue