Use eventloop package in containerd
It's first attempt to decouple code into independent modules for more effective unit-testing. Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
parent
7332e6e847
commit
79591b2ae1
2 changed files with 25 additions and 25 deletions
19
event.go
19
event.go
|
@ -63,3 +63,22 @@ type Event struct {
|
|||
type Handler interface {
|
||||
Handle(*Event) error
|
||||
}
|
||||
|
||||
type commonEvent struct {
|
||||
data *Event
|
||||
sv *Supervisor
|
||||
}
|
||||
|
||||
func (e *commonEvent) Handle() {
|
||||
h, ok := e.sv.handlers[e.data.Type]
|
||||
if !ok {
|
||||
e.data.Err <- ErrUnknownEvent
|
||||
return
|
||||
}
|
||||
err := h.Handle(e.data)
|
||||
if err != errDeferedResponse {
|
||||
e.data.Err <- err
|
||||
close(e.data.Err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,12 +4,12 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
goruntime "runtime"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/containerd/eventloop"
|
||||
"github.com/docker/containerd/runtime"
|
||||
"github.com/opencontainers/runc/libcontainer"
|
||||
)
|
||||
|
@ -36,10 +36,10 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super
|
|||
processes: make(map[int]*containerInfo),
|
||||
runtime: r,
|
||||
tasks: tasks,
|
||||
events: make(chan *Event, DefaultBufferSize),
|
||||
machine: machine,
|
||||
subscribers: make(map[chan *Event]struct{}),
|
||||
statsCollector: newStatsCollector(statsInterval),
|
||||
el: eventloop.NewChanLoop(DefaultBufferSize),
|
||||
}
|
||||
if oom {
|
||||
s.notifier = newNotifier(s)
|
||||
|
@ -86,6 +86,7 @@ type Supervisor struct {
|
|||
containerGroup sync.WaitGroup
|
||||
statsCollector *statsCollector
|
||||
notifier *notifier
|
||||
el eventloop.EventLoop
|
||||
}
|
||||
|
||||
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
|
||||
|
@ -174,32 +175,11 @@ func (s *Supervisor) notifySubscribers(e *Event) {
|
|||
// therefore it is save to do operations in the handlers that modify state of the system or
|
||||
// state of the Supervisor
|
||||
func (s *Supervisor) Start() error {
|
||||
go func() {
|
||||
// allocate an entire thread to this goroutine for the main event loop
|
||||
// so that nothing else is scheduled over the top of it.
|
||||
goruntime.LockOSThread()
|
||||
for e := range s.events {
|
||||
EventsCounter.Inc(1)
|
||||
h, ok := s.handlers[e.Type]
|
||||
if !ok {
|
||||
e.Err <- ErrUnknownEvent
|
||||
continue
|
||||
}
|
||||
if err := h.Handle(e); err != nil {
|
||||
if err != errDeferedResponse {
|
||||
e.Err <- err
|
||||
close(e.Err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
close(e.Err)
|
||||
}
|
||||
}()
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"runtime": s.runtime.Type(),
|
||||
"stateDir": s.stateDir,
|
||||
}).Debug("Supervisor started")
|
||||
return nil
|
||||
return s.el.Start()
|
||||
}
|
||||
|
||||
// Machine returns the machine information for which the
|
||||
|
@ -231,7 +211,8 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
|
|||
|
||||
// SendEvent sends the provided event the the supervisors main event loop
|
||||
func (s *Supervisor) SendEvent(evt *Event) {
|
||||
s.events <- evt
|
||||
EventsCounter.Inc(1)
|
||||
s.el.Send(&commonEvent{data: evt, sv: s})
|
||||
}
|
||||
|
||||
func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {
|
||||
|
|
Loading…
Reference in a new issue