From 7332e6e847266772e2e4710ae3594f69805fdd76 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 16 Dec 2015 13:53:03 -0800 Subject: [PATCH] Add eventloop package Signed-off-by: Alexander Morozov --- eventloop/eventloop.go | 51 ++++++++++++++++++++++++++ eventloop/eventloop_test.go | 73 +++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 eventloop/eventloop.go create mode 100644 eventloop/eventloop_test.go diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go new file mode 100644 index 0000000..7a80a27 --- /dev/null +++ b/eventloop/eventloop.go @@ -0,0 +1,51 @@ +package eventloop + +import ( + "runtime" + "sync" +) + +// Event is receiving notification from loop with Handle() call. +type Event interface { + Handle() +} + +// EventLoop is interface for event loops. +// Start starting events processing +// Send adding event to loop +type EventLoop interface { + Start() error + Send(Event) error +} + +// ChanLoop is implementation of EventLoop based on channels. +type ChanLoop struct { + events chan Event + once sync.Once +} + +// NewChanLoop returns ChanLoop with internal channel buffer set to q. +func NewChanLoop(q int) EventLoop { + return &ChanLoop{ + events: make(chan Event, q), + } +} + +// Start starting to read events from channel in separate goroutines. +// All calls after first is no-op. +func (el *ChanLoop) Start() error { + go el.once.Do(func() { + // allocate whole OS thread, so nothing can get scheduled over eventloop + runtime.LockOSThread() + for ev := range el.events { + ev.Handle() + } + }) + return nil +} + +// Send sends event to channel. Will block if buffer is full. +func (el *ChanLoop) Send(ev Event) error { + el.events <- ev + return nil +} diff --git a/eventloop/eventloop_test.go b/eventloop/eventloop_test.go new file mode 100644 index 0000000..a4fda47 --- /dev/null +++ b/eventloop/eventloop_test.go @@ -0,0 +1,73 @@ +package eventloop + +import ( + "sync" + "testing" + "time" +) + +type racyEvent struct { + m map[int]struct{} + wg *sync.WaitGroup +} + +func (e *racyEvent) Handle() { + e.m[0] = struct{}{} + e.wg.Done() +} + +func simulateRacyEvents(el EventLoop) { + wg := &sync.WaitGroup{} + raceMap := make(map[int]struct{}) + var evs []*racyEvent + for i := 0; i < 1024; i++ { + wg.Add(1) + evs = append(evs, &racyEvent{m: raceMap, wg: wg}) + } + for _, ev := range evs { + el.Send(ev) + } + wg.Wait() +} + +// run with -race +func TestChanRace(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + simulateRacyEvents(e) +} + +// run with -race +func TestChanStartTwiceRace(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + e.Start() + simulateRacyEvents(e) +} + +type testEvent struct { + wg *sync.WaitGroup +} + +func (e *testEvent) Handle() { + e.wg.Done() +} + +func TestChanEventSpawn(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + wg := &sync.WaitGroup{} + wg.Add(2) + e.Send(&testEvent{wg: wg}) + e.Send(&testEvent{wg: wg}) + waitCh := make(chan struct{}) + go func() { + wg.Wait() + close(waitCh) + }() + select { + case <-waitCh: + case <-time.After(1 * time.Second): + t.Fatal("Events was not handled in loop") + } +}