Move task workers to new type

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-02 17:42:28 -08:00
parent b344f0a1c6
commit 5eac8891ed
4 changed files with 61 additions and 36 deletions

View File

@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
@ -77,10 +78,17 @@ func main() {
}
func daemon(stateDir string, concurrency, bufferSize int) error {
supervisor, err := containerd.NewSupervisor(stateDir, concurrency)
tasks := make(chan *containerd.StartTask, concurrency*100)
supervisor, err := containerd.NewSupervisor(stateDir, tasks)
if err != nil {
return err
}
wg := &sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
w := containerd.NewWorker(supervisor, wg)
go w.Start()
}
events := make(chan *containerd.Event, bufferSize)
// start the signal handler in the background.
go startSignalHandler(supervisor, bufferSize)

View File

@ -11,9 +11,9 @@ func (h *StartEvent) Handle(e *Event) error {
}
h.s.containers[e.ID] = container
ContainersCounter.Inc(1)
h.s.tasks <- &startTask{
err: e.Err,
container: container,
h.s.tasks <- &StartTask{
Err: e.Err,
Container: container,
}
return errDeferedResponse
}

View File

@ -4,8 +4,6 @@ import (
"os"
"path/filepath"
goruntime "runtime"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/runtime"
@ -13,7 +11,7 @@ import (
)
// NewSupervisor returns an initialized Process supervisor.
func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
func NewSupervisor(stateDir string, tasks chan *StartTask) (*Supervisor, error) {
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, err
}
@ -31,8 +29,8 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
containers: make(map[string]runtime.Container),
processes: make(map[int]runtime.Container),
runtime: r,
tasks: make(chan *startTask, concurrency*100),
journal: j,
tasks: tasks,
}
// register default event handlers
s.handlers = map[EventType]Handler{
@ -46,10 +44,6 @@ func NewSupervisor(stateDir string, concurrency int) (*Supervisor, error) {
UpdateContainerEventType: &UpdateEvent{s},
}
// start the container workers for concurrent container starts
for i := 0; i < concurrency; i++ {
s.workerGroup.Add(1)
go s.startContainerWorker(s.tasks)
}
return s, nil
}
@ -62,8 +56,7 @@ type Supervisor struct {
runtime runtime.Runtime
journal *journal
events chan *Event
tasks chan *startTask
workerGroup sync.WaitGroup
tasks chan *StartTask
subscribers map[subscriber]bool
}
@ -104,6 +97,7 @@ func (s *Supervisor) Start(events chan *Event) error {
// so that nothing else is scheduled over the top of it.
goruntime.LockOSThread()
for e := range events {
EventsCounter.Inc(1)
s.journal.write(e)
h, ok := s.handlers[e.Type]
if !ok {
@ -142,27 +136,5 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
}
func (s *Supervisor) SendEvent(evt *Event) {
EventsCounter.Inc(1)
s.events <- evt
}
type startTask struct {
container runtime.Container
err chan error
}
func (s *Supervisor) startContainerWorker(tasks chan *startTask) {
defer s.workerGroup.Done()
for t := range tasks {
started := time.Now()
if err := t.container.Start(); err != nil {
e := NewEvent(StartContainerEventType)
e.ID = t.container.ID()
s.SendEvent(e)
t.err <- err
continue
}
ContainerStartTimer.UpdateSince(started)
t.err <- nil
}
}

45
worker.go Normal file
View File

@ -0,0 +1,45 @@
package containerd
import (
"sync"
"time"
"github.com/docker/containerd/runtime"
)
type Worker interface {
Start()
}
type StartTask struct {
Container runtime.Container
Err chan error
}
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
return &worker{
s: s,
wg: wg,
}
}
type worker struct {
wg *sync.WaitGroup
s *Supervisor
}
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.tasks {
started := time.Now()
if err := t.Container.Start(); err != nil {
evt := NewEvent(DeleteEventType)
evt.ID = t.Container.ID()
w.s.SendEvent(evt)
t.Err <- err
continue
}
ContainerStartTimer.UpdateSince(started)
t.Err <- nil
}
}