Merge pull request #54 from LK4D4/chanotify_package

Chanotify package
This commit is contained in:
Michael Crosby 2015-12-18 15:44:39 -08:00
commit d08b005ca9
4 changed files with 190 additions and 82 deletions

97
chanotify/chanotify.go Normal file
View file

@ -0,0 +1,97 @@
package chanotify
import (
"reflect"
"sync"
)
type dataPair struct {
id string
selectCase reflect.SelectCase
}
// Notifier can effectively notify you about receiving from particular channels.
// It operates with pairs <-chan struct{} <-> string which is notification
// channel and its identificator respectively.
// Notification channel is <-chan struc{}, each send to which is spawn
// notification from Notifier, close doesn't spawn anything and removes channel
// from Notifier.
type Notifier struct {
c chan string
chMap map[<-chan struct{}]*dataPair
exit chan struct{}
m sync.Mutex
}
// New returns already running *Notifier.
func New() *Notifier {
s := &Notifier{
c: make(chan string),
chMap: make(map[<-chan struct{}]*dataPair),
exit: make(chan struct{}),
}
go s.start()
return s
}
// Chan returns channel on which client listen for notifications.
// Ids of notifications is sent to that channel.
func (s *Notifier) Chan() <-chan string {
return s.c
}
// Add adds new notification channel to Notifier.
func (s *Notifier) Add(ch <-chan struct{}, id string) {
s.m.Lock()
s.chMap[ch] = &dataPair{
id: id,
selectCase: reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
},
}
s.m.Unlock()
}
// Close stops Notifier to listen for any notifications and closes its
// "client-side" channel.
func (s *Notifier) Close() {
close(s.exit)
}
func (s *Notifier) start() {
for {
c := s.createCase()
i, _, ok := reflect.Select(c)
if i == 0 {
// exit was closed, we can safely close output
close(s.c)
return
}
ch := c[i].Chan.Interface().(<-chan struct{})
if ok {
s.c <- s.chMap[ch].id
continue
}
// the channel was closed and we should remove it
s.m.Lock()
delete(s.chMap, ch)
s.m.Unlock()
}
}
func (s *Notifier) createCase() []reflect.SelectCase {
// put exit channel as 0 element of select
out := []reflect.SelectCase{
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.exit),
},
}
s.m.Lock()
for _, pair := range s.chMap {
out = append(out, pair.selectCase)
}
s.m.Unlock()
return out
}

View file

@ -0,0 +1,80 @@
package chanotify
import (
"strconv"
"sync"
"testing"
"time"
)
func TestNotifier(t *testing.T) {
s := New()
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
s.Add(ch1, "1")
s.Add(ch2, "2")
s.m.Lock()
if len(s.chMap) != 2 {
t.Fatalf("expected 2 channels, got %d", len(s.chMap))
}
s.m.Unlock()
ch1 <- struct{}{}
id1 := <-s.Chan()
if id1 != "1" {
t.Fatalf("1 should be spawned, got %s", id1)
}
ch2 <- struct{}{}
id2 := <-s.Chan()
if id2 != "2" {
t.Fatalf("2 should be spawned, got %s", id2)
}
close(ch1)
close(ch2)
time.Sleep(100 * time.Millisecond)
s.m.Lock()
if len(s.chMap) != 0 {
t.Fatalf("expected 0 channels, got %d", len(s.chMap))
}
s.m.Unlock()
}
func TestConcurrentNotifier(t *testing.T) {
s := New()
var chs []chan struct{}
for i := 0; i < 8; i++ {
ch := make(chan struct{}, 2)
s.Add(ch, strconv.Itoa(i))
chs = append(chs, ch)
}
testCounter := make(map[string]int)
done := make(chan struct{})
go func() {
for id := range s.Chan() {
testCounter[id]++
}
close(done)
}()
var wg sync.WaitGroup
for _, ch := range chs {
wg.Add(1)
go func(ch chan struct{}) {
ch <- struct{}{}
ch <- struct{}{}
close(ch)
wg.Done()
}(ch)
}
wg.Wait()
// wait for notifications
time.Sleep(1 * time.Second)
s.Close()
<-done
if len(testCounter) != 8 {
t.Fatalf("expect to find exactly 8 distinct ids, got %d", len(testCounter))
}
for id, c := range testCounter {
if c != 2 {
t.Fatalf("Expected to find exactly 2 id %s, but got %d", id, c)
}
}
}

View file

@ -1,80 +0,0 @@
package supervisor
import (
"reflect"
"sync"
)
func newNotifier(s *Supervisor) *notifier {
n := &notifier{
s: s,
channels: make(map[<-chan struct{}]string),
controller: make(chan struct{}),
}
go n.start()
return n
}
type notifier struct {
m sync.Mutex
channels map[<-chan struct{}]string
controller chan struct{}
s *Supervisor
}
func (n *notifier) start() {
for {
c := n.createCase()
i, _, ok := reflect.Select(c)
if i == 0 {
continue
}
if ok {
ch := c[i].Chan.Interface().(<-chan struct{})
id := n.channels[ch]
e := NewEvent(OOMEventType)
e.ID = id
n.s.SendEvent(e)
continue
}
// the channel was closed and we should remove it
ch := c[i].Chan.Interface().(<-chan struct{})
n.removeChan(ch)
}
}
func (n *notifier) Add(ch <-chan struct{}, id string) {
n.m.Lock()
n.channels[ch] = id
n.m.Unlock()
// signal the main loop to break and add the new
// channels
n.controller <- struct{}{}
}
func (n *notifier) createCase() []reflect.SelectCase {
var out []reflect.SelectCase
// add controller chan so that we can signal when we need to make
// changes in the select. The controller chan will always be at
// index 0 in the slice
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(n.controller),
})
n.m.Lock()
for ch := range n.channels {
v := reflect.ValueOf(ch)
out = append(out, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: v,
})
}
n.m.Unlock()
return out
}
func (n *notifier) removeChan(ch <-chan struct{}) {
n.m.Lock()
delete(n.channels, ch)
n.m.Unlock()
}

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/chanotify"
"github.com/docker/containerd/eventloop"
"github.com/docker/containerd/runtime"
"github.com/opencontainers/runc/libcontainer"
@ -45,7 +46,14 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err
el: eventloop.NewChanLoop(defaultBufferSize),
}
if oom {
s.notifier = newNotifier(s)
s.notifier = chanotify.New()
go func() {
for id := range s.notifier.Chan() {
e := NewEvent(OOMEventType)
e.ID = id
s.SendEvent(e)
}
}()
}
// register default event handlers
s.handlers = map[EventType]Handler{
@ -88,7 +96,7 @@ type Supervisor struct {
machine Machine
containerGroup sync.WaitGroup
statsCollector *statsCollector
notifier *notifier
notifier *chanotify.Notifier
el eventloop.EventLoop
}
@ -124,6 +132,9 @@ func (s *Supervisor) Stop(sig chan os.Signal) {
logrus.Debug("waiting for containers to exit")
s.containerGroup.Wait()
logrus.Debug("all containers exited")
if s.notifier != nil {
s.notifier.Close()
}
// stop receiving signals and close the channel
signal.Stop(sig)
close(sig)