Remove eventloop package

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2016-02-17 10:55:54 -08:00
parent 4de168877b
commit 4e05bf491a
15 changed files with 265 additions and 359 deletions

View file

@ -12,7 +12,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify"
"github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime"
)
@ -22,7 +21,7 @@ const (
// New returns an initialized Process supervisor.
func New(stateDir string, oom bool) (*Supervisor, error) {
tasks := make(chan *startTask, 10)
startTasks := make(chan *startTask, 10)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
@ -37,10 +36,10 @@ func New(stateDir string, oom bool) (*Supervisor, error) {
s := &Supervisor{
stateDir: stateDir,
containers: make(map[string]*containerInfo),
tasks: tasks,
startTasks: startTasks,
machine: machine,
subscribers: make(map[chan Event]struct{}),
el: eventloop.NewChanLoop(defaultBufferSize),
tasks: make(chan Task, defaultBufferSize),
monitor: monitor,
}
if err := setupEventLog(s); err != nil {
@ -48,28 +47,16 @@ func New(stateDir string, oom bool) (*Supervisor, error) {
}
if oom {
s.notifier = chanotify.New()
go func() {
for id := range s.notifier.Chan() {
e := NewTask(OOMTaskType)
e.ID = id.(string)
s.SendTask(e)
}
}()
}
// register default event handlers
s.handlers = map[TaskType]Handler{
ExecExitTaskType: &ExecExitTask{s},
ExitTaskType: &ExitTask{s},
StartContainerTaskType: &StartTask{s},
DeleteTaskType: &DeleteTask{s},
GetContainerTaskType: &GetContainersTask{s},
SignalTaskType: &SignalTask{s},
AddProcessTaskType: &AddProcessTask{s},
UpdateContainerTaskType: &UpdateTask{s},
CreateCheckpointTaskType: &CreateCheckpointTask{s},
DeleteCheckpointTaskType: &DeleteCheckpointTask{s},
StatsTaskType: &StatsTask{s},
UpdateProcessTaskType: &UpdateProcessTask{s},
/*
go func() {
for id := range s.notifier.Chan() {
e := NewTask(OOMTaskType)
e.ID = id.(string)
s.SendTask(e)
}
}()
*/
}
go s.exitHandler()
if err := s.restore(); err != nil {
@ -131,26 +118,24 @@ type Supervisor struct {
// stateDir is the directory on the system to store container runtime state information.
stateDir string
containers map[string]*containerInfo
handlers map[TaskType]Handler
events chan *Task
tasks chan *startTask
startTasks chan *startTask
// we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex
subscribers map[chan Event]struct{}
machine Machine
notifier *chanotify.Notifier
el eventloop.EventLoop
tasks chan Task
monitor *Monitor
eventLog []Event
}
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
// Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
// terminate. After it has handled all the SIGCHILD events it will close the signals chan
// and exit. Stop is a non-blocking call and will return after the containers have been signaled
func (s *Supervisor) Stop() {
// Close the tasks channel so that no new containers get started
close(s.tasks)
// Close the startTasks channel so that no new containers get started
close(s.startTasks)
}
// Close closes any open files in the supervisor but expects that Stop has been
@ -163,7 +148,7 @@ type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Pid string `json:"pid,omitempty"`
PID string `json:"pid,omitempty"`
Status int `json:"status,omitempty"`
}
@ -218,7 +203,48 @@ func (s *Supervisor) notifySubscribers(e Event) {
// state of the Supervisor
func (s *Supervisor) Start() error {
logrus.WithField("stateDir", s.stateDir).Debug("containerd: supervisor running")
return s.el.Start()
go func() {
for i := range s.tasks {
s.handleTask(i)
}
}()
return nil
}
func (s *Supervisor) handleTask(i Task) {
var err error
switch t := i.(type) {
case *AddProcessTask:
err = s.addProcess(t)
case *CreateCheckpointTask:
err = s.createCheckpoint(t)
case *DeleteCheckpointTask:
err = s.deleteCheckpoint(t)
case *StartTask:
err = s.start(t)
case *DeleteTask:
err = s.delete(t)
case *ExitTask:
err = s.exit(t)
case *ExecExitTask:
err = s.execExit(t)
case *GetContainersTask:
err = s.getContainers(t)
case *SignalTask:
err = s.signal(t)
case *StatsTask:
err = s.stats(t)
case *UpdateTask:
err = s.updateContainer(t)
case *UpdateProcessTask:
err = s.updateProcess(t)
default:
err = ErrUnknownTask
}
if err != errDeferedResponse {
i.ErrorCh() <- err
close(i.ErrorCh())
}
}
// Machine returns the machine information for which the
@ -228,15 +254,16 @@ func (s *Supervisor) Machine() Machine {
}
// SendTask sends the provided event the the supervisors main event loop
func (s *Supervisor) SendTask(evt *Task) {
func (s *Supervisor) SendTask(evt Task) {
TasksCounter.Inc(1)
s.el.Send(&commonTask{data: evt, sv: s})
s.tasks <- evt
}
func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() {
e := NewTask(ExitTaskType)
e.Process = p
e := &ExitTask{
Process: p,
}
s.SendTask(e)
}
}
@ -283,8 +310,9 @@ func (s *Supervisor) restore() error {
// exit events
sort.Sort(&processSorter{exitedProcesses})
for _, p := range exitedProcesses {
e := NewTask(ExitTaskType)
e.Process = p
e := &ExitTask{
Process: p,
}
s.SendTask(e)
}
}