From 5eac8891ed3c73be4cf91ab9c4b035faea6ead94 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 2 Dec 2015 17:42:28 -0800 Subject: [PATCH] Move task workers to new type Signed-off-by: Michael Crosby --- containerd/main.go | 10 +++++++++- start.go | 6 +++--- supervisor.go | 36 ++++-------------------------------- worker.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 36 deletions(-) create mode 100644 worker.go diff --git a/containerd/main.go b/containerd/main.go index d7be927..353efe8 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "runtime" + "sync" "syscall" "time" @@ -77,10 +78,17 @@ func main() { } func daemon(stateDir string, concurrency, bufferSize int) error { - supervisor, err := containerd.NewSupervisor(stateDir, concurrency) + tasks := make(chan *containerd.StartTask, concurrency*100) + supervisor, err := containerd.NewSupervisor(stateDir, tasks) if err != nil { return err } + wg := &sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + w := containerd.NewWorker(supervisor, wg) + go w.Start() + } events := make(chan *containerd.Event, bufferSize) // start the signal handler in the background. go startSignalHandler(supervisor, bufferSize) diff --git a/start.go b/start.go index 894927b..ed85cd6 100644 --- a/start.go +++ b/start.go @@ -11,9 +11,9 @@ func (h *StartEvent) Handle(e *Event) error { } h.s.containers[e.ID] = container ContainersCounter.Inc(1) - h.s.tasks <- &startTask{ - err: e.Err, - container: container, + h.s.tasks <- &StartTask{ + Err: e.Err, + Container: container, } return errDeferedResponse } diff --git a/supervisor.go b/supervisor.go index bfc92c8..532ad3f 100644 --- a/supervisor.go +++ b/supervisor.go @@ -4,8 +4,6 @@ import ( "os" "path/filepath" goruntime "runtime" - "sync" - "time" "github.com/Sirupsen/logrus" "github.com/docker/containerd/runtime" @@ -13,7 +11,7 @@ import ( ) // NewSupervisor returns an initialized Process supervisor. -func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { +func NewSupervisor(stateDir string, tasks chan *StartTask) (*Supervisor, error) { if err := os.MkdirAll(stateDir, 0755); err != nil { return nil, err } @@ -31,8 +29,8 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { containers: make(map[string]runtime.Container), processes: make(map[int]runtime.Container), runtime: r, - tasks: make(chan *startTask, concurrency*100), journal: j, + tasks: tasks, } // register default event handlers s.handlers = map[EventType]Handler{ @@ -46,10 +44,6 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) { UpdateContainerEventType: &UpdateEvent{s}, } // start the container workers for concurrent container starts - for i := 0; i < concurrency; i++ { - s.workerGroup.Add(1) - go s.startContainerWorker(s.tasks) - } return s, nil } @@ -62,8 +56,7 @@ type Supervisor struct { runtime runtime.Runtime journal *journal events chan *Event - tasks chan *startTask - workerGroup sync.WaitGroup + tasks chan *StartTask subscribers map[subscriber]bool } @@ -104,6 +97,7 @@ func (s *Supervisor) Start(events chan *Event) error { // so that nothing else is scheduled over the top of it. goruntime.LockOSThread() for e := range events { + EventsCounter.Inc(1) s.journal.write(e) h, ok := s.handlers[e.Type] if !ok { @@ -142,27 +136,5 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { } func (s *Supervisor) SendEvent(evt *Event) { - EventsCounter.Inc(1) s.events <- evt } - -type startTask struct { - container runtime.Container - err chan error -} - -func (s *Supervisor) startContainerWorker(tasks chan *startTask) { - defer s.workerGroup.Done() - for t := range tasks { - started := time.Now() - if err := t.container.Start(); err != nil { - e := NewEvent(StartContainerEventType) - e.ID = t.container.ID() - s.SendEvent(e) - t.err <- err - continue - } - ContainerStartTimer.UpdateSince(started) - t.err <- nil - } -} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..eae402b --- /dev/null +++ b/worker.go @@ -0,0 +1,45 @@ +package containerd + +import ( + "sync" + "time" + + "github.com/docker/containerd/runtime" +) + +type Worker interface { + Start() +} + +type StartTask struct { + Container runtime.Container + Err chan error +} + +func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker { + return &worker{ + s: s, + wg: wg, + } +} + +type worker struct { + wg *sync.WaitGroup + s *Supervisor +} + +func (w *worker) Start() { + defer w.wg.Done() + for t := range w.s.tasks { + started := time.Now() + if err := t.Container.Start(); err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } + ContainerStartTimer.UpdateSince(started) + t.Err <- nil + } +}