Merge pull request #77 from rakyll/master

Remove reflect from chanotify and fix a deadlock case
This commit is contained in:
Michael Crosby 2016-01-21 11:53:45 -08:00
commit e5ea745aa4
2 changed files with 43 additions and 67 deletions

View file

@ -1,15 +1,9 @@
package chanotify package chanotify
import ( import (
"reflect"
"sync" "sync"
) )
type dataPair struct {
id string
selectCase reflect.SelectCase
}
// Notifier can effectively notify you about receiving from particular channels. // Notifier can effectively notify you about receiving from particular channels.
// It operates with pairs <-chan struct{} <-> string which is notification // It operates with pairs <-chan struct{} <-> string which is notification
// channel and its identificator respectively. // channel and its identificator respectively.
@ -18,84 +12,66 @@ type dataPair struct {
// from Notifier. // from Notifier.
type Notifier struct { type Notifier struct {
c chan string c chan string
chMap map[<-chan struct{}]*dataPair
ctrl chan struct{} m sync.Mutex // guards doneCh
m sync.Mutex doneCh map[string]chan struct{}
} }
// New returns already running *Notifier. // New returns a new *Notifier.
func New() *Notifier { func New() *Notifier {
s := &Notifier{ s := &Notifier{
c: make(chan string), c: make(chan string),
chMap: make(map[<-chan struct{}]*dataPair), doneCh: make(map[string]chan struct{}),
ctrl: make(chan struct{}),
} }
go s.start()
return s return s
} }
// Chan returns channel on which client listen for notifications. // Chan returns channel on which client listen for notifications.
// Ids of notifications is sent to that channel. // IDs of notifications is sent to the returned channel.
func (s *Notifier) Chan() <-chan string { func (s *Notifier) Chan() <-chan string {
return s.c return s.c
} }
func (s *Notifier) killWorker(id string, done chan struct{}) {
s.m.Lock()
delete(s.doneCh, id)
s.m.Unlock()
}
// Add adds new notification channel to Notifier. // Add adds new notification channel to Notifier.
func (s *Notifier) Add(ch <-chan struct{}, id string) { func (s *Notifier) Add(ch <-chan struct{}, id string) {
done := make(chan struct{})
s.m.Lock() s.m.Lock()
s.chMap[ch] = &dataPair{ s.doneCh[id] = done
id: id,
selectCase: reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
},
}
s.m.Unlock() s.m.Unlock()
s.ctrl <- struct{}{}
}
// Close stops Notifier to listen for any notifications and closes its go func(ch <-chan struct{}, id string, done chan struct{}) {
// "client-side" channel.
func (s *Notifier) Close() {
close(s.ctrl)
}
func (s *Notifier) start() {
for { for {
c := s.createCase() select {
i, _, ok := reflect.Select(c) case _, ok := <-ch:
if i == 0 { if !ok {
if ok { // If the channel is closed, we don't need the goroutine
continue // or the done channel mechanism running anymore.
} s.killWorker(id, done)
// ctrl was closed, we can safely close output
close(s.c)
return return
} }
ch := c[i].Chan.Interface().(<-chan struct{}) s.c <- id
if ok { case <-done:
s.c <- s.chMap[ch].id // We don't need this goroutine running anymore, return.
continue s.killWorker(id, done)
return
} }
// the channel was closed and we should remove it
s.m.Lock()
delete(s.chMap, ch)
s.m.Unlock()
} }
}(ch, id, done)
} }
func (s *Notifier) createCase() []reflect.SelectCase { // Close closes the notifier and releases its underlying resources.
// put ctrl channel as 0 element of select func (s *Notifier) Close() {
out := []reflect.SelectCase{
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.ctrl),
},
}
s.m.Lock() s.m.Lock()
for _, pair := range s.chMap { defer s.m.Unlock()
out = append(out, pair.selectCase) for _, done := range s.doneCh {
close(done)
} }
s.m.Unlock() close(s.c)
return out // TODO(jbd): Don't allow Add after Close returns.
} }

View file

@ -14,8 +14,8 @@ func TestNotifier(t *testing.T) {
s.Add(ch1, "1") s.Add(ch1, "1")
s.Add(ch2, "2") s.Add(ch2, "2")
s.m.Lock() s.m.Lock()
if len(s.chMap) != 2 { if len(s.doneCh) != 2 {
t.Fatalf("expected 2 channels, got %d", len(s.chMap)) t.Fatalf("expected 2 channels, got %d", len(s.doneCh))
} }
s.m.Unlock() s.m.Unlock()
ch1 <- struct{}{} ch1 <- struct{}{}
@ -32,8 +32,8 @@ func TestNotifier(t *testing.T) {
close(ch2) close(ch2)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
s.m.Lock() s.m.Lock()
if len(s.chMap) != 0 { if len(s.doneCh) != 0 {
t.Fatalf("expected 0 channels, got %d", len(s.chMap)) t.Fatalf("expected 0 channels, got %d", len(s.doneCh))
} }
s.m.Unlock() s.m.Unlock()
} }