diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 61685f6..fe9aac0 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -33,7 +33,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine if c.BundlePath == "" { return nil, errors.New("empty bundle path") } - e := supervisor.NewTask(supervisor.StartContainerTaskType) + e := &supervisor.StartTask{} e.ID = c.Id e.BundlePath = c.BundlePath e.Stdin = c.Stdin @@ -47,7 +47,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine } } s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } r := <-e.StartResponse @@ -61,12 +61,12 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine } func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { - e := supervisor.NewTask(supervisor.SignalTaskType) + e := &supervisor.SignalTask{} e.ID = r.Id - e.Pid = r.Pid + e.PID = r.Pid e.Signal = syscall.Signal(int(r.Signal)) s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.SignalResponse{}, nil @@ -90,16 +90,16 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) if r.Pid == "" { return nil, fmt.Errorf("process id cannot be empty") } - e := supervisor.NewTask(supervisor.AddProcessTaskType) + e := &supervisor.AddProcessTask{} e.ID = r.Id - e.Pid = r.Pid + e.PID = r.Pid e.ProcessSpec = process e.Stdin = r.Stdin e.Stdout = r.Stdout e.Stderr = r.Stderr e.StartResponse = make(chan supervisor.StartResponse, 1) s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } <-e.StartResponse @@ -107,7 +107,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) } func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { - e := supervisor.NewTask(supervisor.CreateCheckpointTaskType) + e := &supervisor.CreateCheckpointTask{} e.ID = r.Id e.Checkpoint = &runtime.Checkpoint{ Name: r.Checkpoint.Name, @@ -117,7 +117,7 @@ func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpo Shell: r.Checkpoint.Shell, } s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.CreateCheckpointResponse{}, nil @@ -127,22 +127,22 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo if r.Name == "" { return nil, errors.New("checkpoint name cannot be empty") } - e := supervisor.NewTask(supervisor.DeleteCheckpointTaskType) + e := &supervisor.DeleteCheckpointTask{} e.ID = r.Id e.Checkpoint = &runtime.Checkpoint{ Name: r.Name, } s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.DeleteCheckpointResponse{}, nil } func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { - e := supervisor.NewTask(supervisor.GetContainerTaskType) + e := &supervisor.GetContainersTask{} s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } var container runtime.Container @@ -174,10 +174,10 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR } func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) { - e := supervisor.NewTask(supervisor.GetContainerTaskType) + e := &supervisor.GetContainersTask{} e.ID = r.Id s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } m := s.sv.Machine() @@ -248,25 +248,25 @@ func toUint32(its []int) []uint32 { } func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { - e := supervisor.NewTask(supervisor.UpdateContainerTaskType) + e := &supervisor.UpdateTask{} e.ID = r.Id e.State = runtime.State(r.Status) s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.UpdateContainerResponse{}, nil } func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) { - e := supervisor.NewTask(supervisor.UpdateProcessTaskType) + e := &supervisor.UpdateProcessTask{} e.ID = r.Id - e.Pid = r.Pid + e.PID = r.Pid e.Height = int(r.Height) e.Width = int(r.Width) e.CloseStdin = r.CloseStdin s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } return &types.UpdateProcessResponse{}, nil @@ -284,7 +284,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer Id: e.ID, Type: e.Type, Timestamp: uint64(e.Timestamp.Unix()), - Pid: e.Pid, + Pid: e.PID, Status: uint32(e.Status), }); err != nil { return err @@ -294,11 +294,11 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer } func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { - e := supervisor.NewTask(supervisor.StatsTaskType) + e := &supervisor.StatsTask{} e.ID = r.Id e.Stat = make(chan *runtime.Stat, 1) s.sv.SendTask(e) - if err := <-e.Err; err != nil { + if err := <-e.ErrorCh(); err != nil { return nil, err } stats := <-e.Stat diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go deleted file mode 100644 index 0c26495..0000000 --- a/eventloop/eventloop.go +++ /dev/null @@ -1,46 +0,0 @@ -package eventloop - -import "sync" - -// Event is receiving notification from loop with Handle() call. -type Event interface { - Handle() -} - -// EventLoop is interface for event loops. -// Start starting events processing -// Send adding event to loop -type EventLoop interface { - Start() error - Send(Event) error -} - -// ChanLoop is implementation of EventLoop based on channels. -type ChanLoop struct { - events chan Event - once sync.Once -} - -// NewChanLoop returns ChanLoop with internal channel buffer set to q. -func NewChanLoop(q int) EventLoop { - return &ChanLoop{ - events: make(chan Event, q), - } -} - -// Start starting to read events from channel in separate goroutines. -// All calls after first is no-op. -func (el *ChanLoop) Start() error { - go el.once.Do(func() { - for ev := range el.events { - ev.Handle() - } - }) - return nil -} - -// Send sends event to channel. Will block if buffer is full. -func (el *ChanLoop) Send(ev Event) error { - el.events <- ev - return nil -} diff --git a/eventloop/eventloop_test.go b/eventloop/eventloop_test.go deleted file mode 100644 index a4fda47..0000000 --- a/eventloop/eventloop_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package eventloop - -import ( - "sync" - "testing" - "time" -) - -type racyEvent struct { - m map[int]struct{} - wg *sync.WaitGroup -} - -func (e *racyEvent) Handle() { - e.m[0] = struct{}{} - e.wg.Done() -} - -func simulateRacyEvents(el EventLoop) { - wg := &sync.WaitGroup{} - raceMap := make(map[int]struct{}) - var evs []*racyEvent - for i := 0; i < 1024; i++ { - wg.Add(1) - evs = append(evs, &racyEvent{m: raceMap, wg: wg}) - } - for _, ev := range evs { - el.Send(ev) - } - wg.Wait() -} - -// run with -race -func TestChanRace(t *testing.T) { - e := NewChanLoop(1024) - e.Start() - simulateRacyEvents(e) -} - -// run with -race -func TestChanStartTwiceRace(t *testing.T) { - e := NewChanLoop(1024) - e.Start() - e.Start() - simulateRacyEvents(e) -} - -type testEvent struct { - wg *sync.WaitGroup -} - -func (e *testEvent) Handle() { - e.wg.Done() -} - -func TestChanEventSpawn(t *testing.T) { - e := NewChanLoop(1024) - e.Start() - wg := &sync.WaitGroup{} - wg.Add(2) - e.Send(&testEvent{wg: wg}) - e.Send(&testEvent{wg: wg}) - waitCh := make(chan struct{}) - go func() { - wg.Wait() - close(waitCh) - }() - select { - case <-waitCh: - case <-time.After(1 * time.Second): - t.Fatal("Events was not handled in loop") - } -} diff --git a/supervisor/add_process.go b/supervisor/add_process.go index 6801a21..e07cac2 100644 --- a/supervisor/add_process.go +++ b/supervisor/add_process.go @@ -4,34 +4,40 @@ import ( "time" "github.com/docker/containerd/runtime" + "github.com/opencontainers/specs" ) type AddProcessTask struct { - s *Supervisor + baseTask + ID string + PID string + Stdout string + Stderr string + Stdin string + ProcessSpec *specs.Process + StartResponse chan StartResponse } -// TODO: add this to worker for concurrent starts??? maybe not because of races where the container -// could be stopped and removed... -func (h *AddProcessTask) Handle(e *Task) error { +func (s *Supervisor) addProcess(t *AddProcessTask) error { start := time.Now() - ci, ok := h.s.containers[e.ID] + ci, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } - process, err := ci.container.Exec(e.Pid, *e.ProcessSpec, runtime.NewStdio(e.Stdin, e.Stdout, e.Stderr)) + process, err := ci.container.Exec(t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) if err != nil { return err } - if err := h.s.monitorProcess(process); err != nil { + if err := s.monitorProcess(process); err != nil { return err } ExecProcessTimer.UpdateSince(start) - e.StartResponse <- StartResponse{} - h.s.notifySubscribers(Event{ + t.StartResponse <- StartResponse{} + s.notifySubscribers(Event{ Timestamp: time.Now(), Type: "start-process", - Pid: e.Pid, - ID: e.ID, + PID: t.PID, + ID: t.ID, }) return nil } diff --git a/supervisor/checkpoint.go b/supervisor/checkpoint.go index 53311eb..640f8be 100644 --- a/supervisor/checkpoint.go +++ b/supervisor/checkpoint.go @@ -1,25 +1,31 @@ package supervisor +import "github.com/docker/containerd/runtime" + type CreateCheckpointTask struct { - s *Supervisor + baseTask + ID string + Checkpoint *runtime.Checkpoint } -func (h *CreateCheckpointTask) Handle(e *Task) error { - i, ok := h.s.containers[e.ID] +func (s *Supervisor) createCheckpoint(t *CreateCheckpointTask) error { + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } - return i.container.Checkpoint(*e.Checkpoint) + return i.container.Checkpoint(*t.Checkpoint) } type DeleteCheckpointTask struct { - s *Supervisor + baseTask + ID string + Checkpoint *runtime.Checkpoint } -func (h *DeleteCheckpointTask) Handle(e *Task) error { - i, ok := h.s.containers[e.ID] +func (s *Supervisor) deleteCheckpoint(t *DeleteCheckpointTask) error { + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } - return i.container.DeleteCheckpoint(e.Checkpoint.Name) + return i.container.DeleteCheckpoint(t.Checkpoint.Name) } diff --git a/supervisor/create.go b/supervisor/create.go index a65dea8..165d161 100644 --- a/supervisor/create.go +++ b/supervisor/create.go @@ -7,31 +7,39 @@ import ( ) type StartTask struct { - s *Supervisor + baseTask + ID string + BundlePath string + Stdout string + Stderr string + Stdin string + StartResponse chan StartResponse + Checkpoint *runtime.Checkpoint + Labels []string } -func (h *StartTask) Handle(e *Task) error { +func (s *Supervisor) start(t *StartTask) error { start := time.Now() - container, err := runtime.New(h.s.stateDir, e.ID, e.BundlePath, e.Labels) + container, err := runtime.New(s.stateDir, t.ID, t.BundlePath, t.Labels) if err != nil { return err } - h.s.containers[e.ID] = &containerInfo{ + s.containers[t.ID] = &containerInfo{ container: container, } ContainersCounter.Inc(1) task := &startTask{ - Err: e.Err, + Err: t.ErrorCh(), Container: container, - StartResponse: e.StartResponse, - Stdin: e.Stdin, - Stdout: e.Stdout, - Stderr: e.Stderr, + StartResponse: t.StartResponse, + Stdin: t.Stdin, + Stdout: t.Stdout, + Stderr: t.Stderr, } - if e.Checkpoint != nil { - task.Checkpoint = e.Checkpoint.Name + if t.Checkpoint != nil { + task.Checkpoint = t.Checkpoint.Name } - h.s.tasks <- task + s.startTasks <- task ContainerCreateTimer.UpdateSince(start) return errDeferedResponse } diff --git a/supervisor/delete.go b/supervisor/delete.go index 1a11060..0ff348c 100644 --- a/supervisor/delete.go +++ b/supervisor/delete.go @@ -8,21 +8,24 @@ import ( ) type DeleteTask struct { - s *Supervisor + baseTask + ID string + Status int + PID string } -func (h *DeleteTask) Handle(e *Task) error { - if i, ok := h.s.containers[e.ID]; ok { +func (s *Supervisor) delete(t *DeleteTask) error { + if i, ok := s.containers[t.ID]; ok { start := time.Now() - if err := h.deleteContainer(i.container); err != nil { + if err := s.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - h.s.notifySubscribers(Event{ + s.notifySubscribers(Event{ Type: "exit", Timestamp: time.Now(), - ID: e.ID, - Status: e.Status, - Pid: e.Pid, + ID: t.ID, + Status: t.Status, + PID: t.PID, }) ContainersCounter.Dec(1) ContainerDeleteTimer.UpdateSince(start) @@ -30,7 +33,7 @@ func (h *DeleteTask) Handle(e *Task) error { return nil } -func (h *DeleteTask) deleteContainer(container runtime.Container) error { - delete(h.s.containers, container.ID()) +func (s *Supervisor) deleteContainer(container runtime.Container) error { + delete(s.containers, container.ID()) return container.Delete() } diff --git a/supervisor/exit.go b/supervisor/exit.go index d0673ec..5e179cf 100644 --- a/supervisor/exit.go +++ b/supervisor/exit.go @@ -8,12 +8,13 @@ import ( ) type ExitTask struct { - s *Supervisor + baseTask + Process runtime.Process } -func (h *ExitTask) Handle(e *Task) error { +func (s *Supervisor) exit(t *ExitTask) error { start := time.Now() - proc := e.Process + proc := t.Process status, err := proc.ExitStatus() if err != nil { logrus.WithField("error", err).Error("containerd: get exit status") @@ -23,21 +24,22 @@ func (h *ExitTask) Handle(e *Task) error { // if the process is the the init process of the container then // fire a separate event for this process if proc.ID() != runtime.InitProcessID { - ne := NewTask(ExecExitTaskType) - ne.ID = proc.Container().ID() - ne.Pid = proc.ID() - ne.Status = status - ne.Process = proc - h.s.SendTask(ne) - + ne := &ExecExitTask{ + ID: proc.Container().ID(), + PID: proc.ID(), + Status: status, + Process: proc, + } + s.SendTask(ne) return nil } container := proc.Container() - ne := NewTask(DeleteTaskType) - ne.ID = container.ID() - ne.Status = status - ne.Pid = proc.ID() - h.s.SendTask(ne) + ne := &DeleteTask{ + ID: container.ID(), + Status: status, + PID: proc.ID(), + } + s.SendTask(ne) ExitProcessTimer.UpdateSince(start) @@ -45,21 +47,25 @@ func (h *ExitTask) Handle(e *Task) error { } type ExecExitTask struct { - s *Supervisor + baseTask + ID string + PID string + Status int + Process runtime.Process } -func (h *ExecExitTask) Handle(e *Task) error { - container := e.Process.Container() +func (s *Supervisor) execExit(t *ExecExitTask) error { + container := t.Process.Container() // exec process: we remove this process without notifying the main event loop - if err := container.RemoveProcess(e.Pid); err != nil { + if err := container.RemoveProcess(t.PID); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } - h.s.notifySubscribers(Event{ + s.notifySubscribers(Event{ Timestamp: time.Now(), - ID: e.ID, + ID: t.ID, Type: "exit", - Pid: e.Pid, - Status: e.Status, + PID: t.PID, + Status: t.Status, }) return nil } diff --git a/supervisor/get_containers.go b/supervisor/get_containers.go index 64acc3f..3e24783 100644 --- a/supervisor/get_containers.go +++ b/supervisor/get_containers.go @@ -1,20 +1,24 @@ package supervisor +import "github.com/docker/containerd/runtime" + type GetContainersTask struct { - s *Supervisor + baseTask + ID string + Containers []runtime.Container } -func (h *GetContainersTask) Handle(e *Task) error { - if e.ID != "" { - ci := h.s.containers[e.ID] +func (s *Supervisor) getContainers(t *GetContainersTask) error { + if t.ID != "" { + ci := s.containers[t.ID] if ci == nil { return ErrContainerNotFound } - e.Containers = append(e.Containers, ci.container) + t.Containers = append(t.Containers, ci.container) return nil } - for _, i := range h.s.containers { - e.Containers = append(e.Containers, i.container) + for _, i := range s.containers { + t.Containers = append(t.Containers, i.container) } return nil } diff --git a/supervisor/signal.go b/supervisor/signal.go index c88a916..0705fc5 100644 --- a/supervisor/signal.go +++ b/supervisor/signal.go @@ -1,11 +1,16 @@ package supervisor +import "os" + type SignalTask struct { - s *Supervisor + baseTask + ID string + PID string + Signal os.Signal } -func (h *SignalTask) Handle(e *Task) error { - i, ok := h.s.containers[e.ID] +func (s *Supervisor) signal(t *SignalTask) error { + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } @@ -14,8 +19,8 @@ func (h *SignalTask) Handle(e *Task) error { return err } for _, p := range processes { - if p.ID() == e.Pid { - return p.Signal(e.Signal) + if p.ID() == t.PID { + return p.Signal(t.Signal) } } return ErrProcessNotFound diff --git a/supervisor/stats.go b/supervisor/stats.go index 6cbbfed..fa68321 100644 --- a/supervisor/stats.go +++ b/supervisor/stats.go @@ -1,14 +1,21 @@ package supervisor -import "time" +import ( + "time" + + "github.com/docker/containerd/runtime" +) type StatsTask struct { - s *Supervisor + baseTask + ID string + Stat chan *runtime.Stat + Err chan error } -func (h *StatsTask) Handle(e *Task) error { +func (s *Supervisor) stats(t *StatsTask) error { start := time.Now() - i, ok := h.s.containers[e.ID] + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } @@ -16,12 +23,12 @@ func (h *StatsTask) Handle(e *Task) error { go func() { s, err := i.container.Stats() if err != nil { - e.Err <- err + t.Err <- err return } - e.Err <- nil - e.Stat <- s + t.Err <- nil + t.Stat <- s ContainerStatsTimer.UpdateSince(start) }() - return errDeferedResponse + return nil } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 75c22b7..1d1ca98 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -12,7 +12,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/containerd/chanotify" - "github.com/docker/containerd/eventloop" "github.com/docker/containerd/runtime" ) @@ -22,7 +21,7 @@ const ( // New returns an initialized Process supervisor. func New(stateDir string, oom bool) (*Supervisor, error) { - tasks := make(chan *startTask, 10) + startTasks := make(chan *startTask, 10) if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err } @@ -37,10 +36,10 @@ func New(stateDir string, oom bool) (*Supervisor, error) { s := &Supervisor{ stateDir: stateDir, containers: make(map[string]*containerInfo), - tasks: tasks, + startTasks: startTasks, machine: machine, subscribers: make(map[chan Event]struct{}), - el: eventloop.NewChanLoop(defaultBufferSize), + tasks: make(chan Task, defaultBufferSize), monitor: monitor, } if err := setupEventLog(s); err != nil { @@ -48,28 +47,16 @@ func New(stateDir string, oom bool) (*Supervisor, error) { } if oom { s.notifier = chanotify.New() - go func() { - for id := range s.notifier.Chan() { - e := NewTask(OOMTaskType) - e.ID = id.(string) - s.SendTask(e) - } - }() - } - // register default event handlers - s.handlers = map[TaskType]Handler{ - ExecExitTaskType: &ExecExitTask{s}, - ExitTaskType: &ExitTask{s}, - StartContainerTaskType: &StartTask{s}, - DeleteTaskType: &DeleteTask{s}, - GetContainerTaskType: &GetContainersTask{s}, - SignalTaskType: &SignalTask{s}, - AddProcessTaskType: &AddProcessTask{s}, - UpdateContainerTaskType: &UpdateTask{s}, - CreateCheckpointTaskType: &CreateCheckpointTask{s}, - DeleteCheckpointTaskType: &DeleteCheckpointTask{s}, - StatsTaskType: &StatsTask{s}, - UpdateProcessTaskType: &UpdateProcessTask{s}, + + /* + go func() { + for id := range s.notifier.Chan() { + e := NewTask(OOMTaskType) + e.ID = id.(string) + s.SendTask(e) + } + }() + */ } go s.exitHandler() if err := s.restore(); err != nil { @@ -131,26 +118,24 @@ type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string containers map[string]*containerInfo - handlers map[TaskType]Handler - events chan *Task - tasks chan *startTask + startTasks chan *startTask // we need a lock around the subscribers map only because additions and deletions from // the map are via the API so we cannot really control the concurrency subscriberLock sync.RWMutex subscribers map[chan Event]struct{} machine Machine notifier *chanotify.Notifier - el eventloop.EventLoop + tasks chan Task monitor *Monitor eventLog []Event } -// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to +// Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to // terminate. After it has handled all the SIGCHILD events it will close the signals chan // and exit. Stop is a non-blocking call and will return after the containers have been signaled func (s *Supervisor) Stop() { - // Close the tasks channel so that no new containers get started - close(s.tasks) + // Close the startTasks channel so that no new containers get started + close(s.startTasks) } // Close closes any open files in the supervisor but expects that Stop has been @@ -163,7 +148,7 @@ type Event struct { ID string `json:"id"` Type string `json:"type"` Timestamp time.Time `json:"timestamp"` - Pid string `json:"pid,omitempty"` + PID string `json:"pid,omitempty"` Status int `json:"status,omitempty"` } @@ -218,7 +203,48 @@ func (s *Supervisor) notifySubscribers(e Event) { // state of the Supervisor func (s *Supervisor) Start() error { logrus.WithField("stateDir", s.stateDir).Debug("containerd: supervisor running") - return s.el.Start() + go func() { + for i := range s.tasks { + s.handleTask(i) + } + }() + return nil +} + +func (s *Supervisor) handleTask(i Task) { + var err error + switch t := i.(type) { + case *AddProcessTask: + err = s.addProcess(t) + case *CreateCheckpointTask: + err = s.createCheckpoint(t) + case *DeleteCheckpointTask: + err = s.deleteCheckpoint(t) + case *StartTask: + err = s.start(t) + case *DeleteTask: + err = s.delete(t) + case *ExitTask: + err = s.exit(t) + case *ExecExitTask: + err = s.execExit(t) + case *GetContainersTask: + err = s.getContainers(t) + case *SignalTask: + err = s.signal(t) + case *StatsTask: + err = s.stats(t) + case *UpdateTask: + err = s.updateContainer(t) + case *UpdateProcessTask: + err = s.updateProcess(t) + default: + err = ErrUnknownTask + } + if err != errDeferedResponse { + i.ErrorCh() <- err + close(i.ErrorCh()) + } } // Machine returns the machine information for which the @@ -228,15 +254,16 @@ func (s *Supervisor) Machine() Machine { } // SendTask sends the provided event the the supervisors main event loop -func (s *Supervisor) SendTask(evt *Task) { +func (s *Supervisor) SendTask(evt Task) { TasksCounter.Inc(1) - s.el.Send(&commonTask{data: evt, sv: s}) + s.tasks <- evt } func (s *Supervisor) exitHandler() { for p := range s.monitor.Exits() { - e := NewTask(ExitTaskType) - e.Process = p + e := &ExitTask{ + Process: p, + } s.SendTask(e) } } @@ -283,8 +310,9 @@ func (s *Supervisor) restore() error { // exit events sort.Sort(&processSorter{exitedProcesses}) for _, p := range exitedProcesses { - e := NewTask(ExitTaskType) - e.Process = p + e := &ExitTask{ + Process: p, + } s.SendTask(e) } } diff --git a/supervisor/task.go b/supervisor/task.go index 2844e77..4d6d1c5 100644 --- a/supervisor/task.go +++ b/supervisor/task.go @@ -1,89 +1,33 @@ package supervisor import ( - "os" - "time" + "sync" "github.com/docker/containerd/runtime" - "github.com/opencontainers/specs" ) -type TaskType string - -const ( - ExecExitTaskType TaskType = "execExit" - ExitTaskType TaskType = "exit" - StartContainerTaskType TaskType = "startContainer" - DeleteTaskType TaskType = "deleteContainerEvent" - GetContainerTaskType TaskType = "getContainer" - SignalTaskType TaskType = "signal" - AddProcessTaskType TaskType = "addProcess" - UpdateContainerTaskType TaskType = "updateContainer" - UpdateProcessTaskType TaskType = "updateProcess" - CreateCheckpointTaskType TaskType = "createCheckpoint" - DeleteCheckpointTaskType TaskType = "deleteCheckpoint" - StatsTaskType TaskType = "events" - OOMTaskType TaskType = "oom" -) - -func NewTask(t TaskType) *Task { - return &Task{ - Type: t, - Timestamp: time.Now(), - Err: make(chan error, 1), - } -} - +// StartResponse is the response containing a started container type StartResponse struct { Container runtime.Container } -type Task struct { - Type TaskType - Timestamp time.Time - ID string - BundlePath string - Stdout string - Stderr string - Stdin string - Console string - Pid string - Status int - Signal os.Signal - Process runtime.Process - State runtime.State - ProcessSpec *specs.Process - Containers []runtime.Container - Checkpoint *runtime.Checkpoint - Err chan error - StartResponse chan StartResponse - Stat chan *runtime.Stat - CloseStdin bool - ResizeTty bool - Width int - Height int - Labels []string +// Task executes an action returning an error chan with either nil or +// the error from executing the task +type Task interface { + // ErrorCh returns a channel used to report and error from an async task + ErrorCh() chan error } -type Handler interface { - Handle(*Task) error +type baseTask struct { + errCh chan error + mu sync.Mutex } -type commonTask struct { - data *Task - sv *Supervisor -} - -func (e *commonTask) Handle() { - h, ok := e.sv.handlers[e.data.Type] - if !ok { - e.data.Err <- ErrUnknownTask - return - } - err := h.Handle(e.data) - if err != errDeferedResponse { - e.data.Err <- err - close(e.data.Err) - return +func (t *baseTask) ErrorCh() chan error { + t.mu.Lock() + defer t.mu.Unlock() + if t.errCh == nil { + t.errCh = make(chan error, 1) } + return t.errCh } diff --git a/supervisor/update.go b/supervisor/update.go index f4a178e..4ef2360 100644 --- a/supervisor/update.go +++ b/supervisor/update.go @@ -7,23 +7,25 @@ import ( ) type UpdateTask struct { - s *Supervisor + baseTask + ID string + State runtime.State } -func (h *UpdateTask) Handle(e *Task) error { - i, ok := h.s.containers[e.ID] +func (s *Supervisor) updateContainer(t *UpdateTask) error { + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } container := i.container - if e.State != "" { - switch e.State { + if t.State != "" { + switch t.State { case runtime.Running: if err := container.Resume(); err != nil { return ErrUnknownContainerStatus } - h.s.notifySubscribers(Event{ - ID: e.ID, + s.notifySubscribers(Event{ + ID: t.ID, Type: "resume", Timestamp: time.Now(), }) @@ -31,8 +33,8 @@ func (h *UpdateTask) Handle(e *Task) error { if err := container.Pause(); err != nil { return ErrUnknownContainerStatus } - h.s.notifySubscribers(Event{ - ID: e.ID, + s.notifySubscribers(Event{ + ID: t.ID, Type: "pause", Timestamp: time.Now(), }) @@ -44,11 +46,16 @@ func (h *UpdateTask) Handle(e *Task) error { } type UpdateProcessTask struct { - s *Supervisor + baseTask + ID string + PID string + CloseStdin bool + Width int + Height int } -func (h *UpdateProcessTask) Handle(e *Task) error { - i, ok := h.s.containers[e.ID] +func (s *Supervisor) updateProcess(t *UpdateProcessTask) error { + i, ok := s.containers[t.ID] if !ok { return ErrContainerNotFound } @@ -58,7 +65,7 @@ func (h *UpdateProcessTask) Handle(e *Task) error { } var process runtime.Process for _, p := range processes { - if p.ID() == e.Pid { + if p.ID() == t.PID { process = p break } @@ -66,13 +73,13 @@ func (h *UpdateProcessTask) Handle(e *Task) error { if process == nil { return ErrProcessNotFound } - if e.CloseStdin { + if t.CloseStdin { if err := process.CloseStdin(); err != nil { return err } } - if e.Width > 0 || e.Height > 0 { - if err := process.Resize(e.Width, e.Height); err != nil { + if t.Width > 0 || t.Height > 0 { + if err := process.Resize(t.Width, t.Height); err != nil { return err } } diff --git a/supervisor/worker.go b/supervisor/worker.go index a481c73..d08fdb4 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -36,12 +36,13 @@ type worker struct { func (w *worker) Start() { defer w.wg.Done() - for t := range w.s.tasks { + for t := range w.s.startTasks { started := time.Now() process, err := t.Container.Start(t.Checkpoint, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) if err != nil { - evt := NewTask(DeleteTaskType) - evt.ID = t.Container.ID() + evt := &DeleteTask{ + ID: t.Container.ID(), + } w.s.SendTask(evt) t.Err <- err continue