Merge pull request #33 from LK4D4/event_loop

Event loop package
This commit is contained in:
Michael Crosby 2015-12-16 14:07:17 -08:00
commit 9bddc01b78
4 changed files with 149 additions and 25 deletions

View file

@ -63,3 +63,22 @@ type Event struct {
type Handler interface { type Handler interface {
Handle(*Event) error Handle(*Event) error
} }
type commonEvent struct {
data *Event
sv *Supervisor
}
func (e *commonEvent) Handle() {
h, ok := e.sv.handlers[e.data.Type]
if !ok {
e.data.Err <- ErrUnknownEvent
return
}
err := h.Handle(e.data)
if err != errDeferedResponse {
e.data.Err <- err
close(e.data.Err)
return
}
}

51
eventloop/eventloop.go Normal file
View file

@ -0,0 +1,51 @@
package eventloop
import (
"runtime"
"sync"
)
// Event is receiving notification from loop with Handle() call.
type Event interface {
Handle()
}
// EventLoop is interface for event loops.
// Start starting events processing
// Send adding event to loop
type EventLoop interface {
Start() error
Send(Event) error
}
// ChanLoop is implementation of EventLoop based on channels.
type ChanLoop struct {
events chan Event
once sync.Once
}
// NewChanLoop returns ChanLoop with internal channel buffer set to q.
func NewChanLoop(q int) EventLoop {
return &ChanLoop{
events: make(chan Event, q),
}
}
// Start starting to read events from channel in separate goroutines.
// All calls after first is no-op.
func (el *ChanLoop) Start() error {
go el.once.Do(func() {
// allocate whole OS thread, so nothing can get scheduled over eventloop
runtime.LockOSThread()
for ev := range el.events {
ev.Handle()
}
})
return nil
}
// Send sends event to channel. Will block if buffer is full.
func (el *ChanLoop) Send(ev Event) error {
el.events <- ev
return nil
}

View file

@ -0,0 +1,73 @@
package eventloop
import (
"sync"
"testing"
"time"
)
type racyEvent struct {
m map[int]struct{}
wg *sync.WaitGroup
}
func (e *racyEvent) Handle() {
e.m[0] = struct{}{}
e.wg.Done()
}
func simulateRacyEvents(el EventLoop) {
wg := &sync.WaitGroup{}
raceMap := make(map[int]struct{})
var evs []*racyEvent
for i := 0; i < 1024; i++ {
wg.Add(1)
evs = append(evs, &racyEvent{m: raceMap, wg: wg})
}
for _, ev := range evs {
el.Send(ev)
}
wg.Wait()
}
// run with -race
func TestChanRace(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
simulateRacyEvents(e)
}
// run with -race
func TestChanStartTwiceRace(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
e.Start()
simulateRacyEvents(e)
}
type testEvent struct {
wg *sync.WaitGroup
}
func (e *testEvent) Handle() {
e.wg.Done()
}
func TestChanEventSpawn(t *testing.T) {
e := NewChanLoop(1024)
e.Start()
wg := &sync.WaitGroup{}
wg.Add(2)
e.Send(&testEvent{wg: wg})
e.Send(&testEvent{wg: wg})
waitCh := make(chan struct{})
go func() {
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
case <-time.After(1 * time.Second):
t.Fatal("Events was not handled in loop")
}
}

View file

@ -4,12 +4,12 @@ import (
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
goruntime "runtime"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime" "github.com/docker/containerd/runtime"
"github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/runc/libcontainer"
) )
@ -36,10 +36,10 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super
processes: make(map[int]*containerInfo), processes: make(map[int]*containerInfo),
runtime: r, runtime: r,
tasks: tasks, tasks: tasks,
events: make(chan *Event, DefaultBufferSize),
machine: machine, machine: machine,
subscribers: make(map[chan *Event]struct{}), subscribers: make(map[chan *Event]struct{}),
statsCollector: newStatsCollector(statsInterval), statsCollector: newStatsCollector(statsInterval),
el: eventloop.NewChanLoop(DefaultBufferSize),
} }
if oom { if oom {
s.notifier = newNotifier(s) s.notifier = newNotifier(s)
@ -86,6 +86,7 @@ type Supervisor struct {
containerGroup sync.WaitGroup containerGroup sync.WaitGroup
statsCollector *statsCollector statsCollector *statsCollector
notifier *notifier notifier *notifier
el eventloop.EventLoop
} }
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to // Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
@ -174,32 +175,11 @@ func (s *Supervisor) notifySubscribers(e *Event) {
// therefore it is save to do operations in the handlers that modify state of the system or // therefore it is save to do operations in the handlers that modify state of the system or
// state of the Supervisor // state of the Supervisor
func (s *Supervisor) Start() error { func (s *Supervisor) Start() error {
go func() {
// allocate an entire thread to this goroutine for the main event loop
// so that nothing else is scheduled over the top of it.
goruntime.LockOSThread()
for e := range s.events {
EventsCounter.Inc(1)
h, ok := s.handlers[e.Type]
if !ok {
e.Err <- ErrUnknownEvent
continue
}
if err := h.Handle(e); err != nil {
if err != errDeferedResponse {
e.Err <- err
close(e.Err)
}
continue
}
close(e.Err)
}
}()
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"runtime": s.runtime.Type(), "runtime": s.runtime.Type(),
"stateDir": s.stateDir, "stateDir": s.stateDir,
}).Debug("Supervisor started") }).Debug("Supervisor started")
return nil return s.el.Start()
} }
// Machine returns the machine information for which the // Machine returns the machine information for which the
@ -231,7 +211,8 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
// SendEvent sends the provided event the the supervisors main event loop // SendEvent sends the provided event the the supervisors main event loop
func (s *Supervisor) SendEvent(evt *Event) { func (s *Supervisor) SendEvent(evt *Event) {
s.events <- evt EventsCounter.Inc(1)
s.el.Send(&commonEvent{data: evt, sv: s})
} }
func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) { func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {