Add eventloop package
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
parent
c35cf680b0
commit
7332e6e847
2 changed files with 124 additions and 0 deletions
51
eventloop/eventloop.go
Normal file
51
eventloop/eventloop.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package eventloop
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Event is receiving notification from loop with Handle() call.
|
||||
type Event interface {
|
||||
Handle()
|
||||
}
|
||||
|
||||
// EventLoop is interface for event loops.
|
||||
// Start starting events processing
|
||||
// Send adding event to loop
|
||||
type EventLoop interface {
|
||||
Start() error
|
||||
Send(Event) error
|
||||
}
|
||||
|
||||
// ChanLoop is implementation of EventLoop based on channels.
|
||||
type ChanLoop struct {
|
||||
events chan Event
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewChanLoop returns ChanLoop with internal channel buffer set to q.
|
||||
func NewChanLoop(q int) EventLoop {
|
||||
return &ChanLoop{
|
||||
events: make(chan Event, q),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starting to read events from channel in separate goroutines.
|
||||
// All calls after first is no-op.
|
||||
func (el *ChanLoop) Start() error {
|
||||
go el.once.Do(func() {
|
||||
// allocate whole OS thread, so nothing can get scheduled over eventloop
|
||||
runtime.LockOSThread()
|
||||
for ev := range el.events {
|
||||
ev.Handle()
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send sends event to channel. Will block if buffer is full.
|
||||
func (el *ChanLoop) Send(ev Event) error {
|
||||
el.events <- ev
|
||||
return nil
|
||||
}
|
73
eventloop/eventloop_test.go
Normal file
73
eventloop/eventloop_test.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package eventloop
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type racyEvent struct {
|
||||
m map[int]struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (e *racyEvent) Handle() {
|
||||
e.m[0] = struct{}{}
|
||||
e.wg.Done()
|
||||
}
|
||||
|
||||
func simulateRacyEvents(el EventLoop) {
|
||||
wg := &sync.WaitGroup{}
|
||||
raceMap := make(map[int]struct{})
|
||||
var evs []*racyEvent
|
||||
for i := 0; i < 1024; i++ {
|
||||
wg.Add(1)
|
||||
evs = append(evs, &racyEvent{m: raceMap, wg: wg})
|
||||
}
|
||||
for _, ev := range evs {
|
||||
el.Send(ev)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// run with -race
|
||||
func TestChanRace(t *testing.T) {
|
||||
e := NewChanLoop(1024)
|
||||
e.Start()
|
||||
simulateRacyEvents(e)
|
||||
}
|
||||
|
||||
// run with -race
|
||||
func TestChanStartTwiceRace(t *testing.T) {
|
||||
e := NewChanLoop(1024)
|
||||
e.Start()
|
||||
e.Start()
|
||||
simulateRacyEvents(e)
|
||||
}
|
||||
|
||||
type testEvent struct {
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (e *testEvent) Handle() {
|
||||
e.wg.Done()
|
||||
}
|
||||
|
||||
func TestChanEventSpawn(t *testing.T) {
|
||||
e := NewChanLoop(1024)
|
||||
e.Start()
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
e.Send(&testEvent{wg: wg})
|
||||
e.Send(&testEvent{wg: wg})
|
||||
waitCh := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(waitCh)
|
||||
}()
|
||||
select {
|
||||
case <-waitCh:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Events was not handled in loop")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue