Remove reflect from chanotify and fix a deadlock case.

With the change, the read rate from (*Notifier).Chan()
is 10-20 times faster and more consistent.

Consider the following program:

  s := chanotify.New()
  for i := 0; i < n; i++ {
    ch := make(chan struct{}, 1)
    s.Add(ch, fmt.Sprintf("%d", i))
    go func(ch chan struct{}) {
      time.Sleep(time.Second) // because your original code has a deadlock case.
      ch <- struct{}{}
    }(ch)
  }

  avgs := make([]int64, n)
  go func() {
    for i := 0; i < n; i++ {
      start := time.Now()
      <-s.Chan()
      avgs[i] = time.Now().Sub(start).Nanoseconds()
    }
  }()

  time.Sleep(10 * time.Second)
  s.Close()

  fmt.Println(avgs)

The output without the change; ignore the first value:

[1000469322 739 100492 200 75412 77733 316 88873 695 137905 244 72197 196 84444 175 80858 169 125514 165 73509 885 739963 248 72569 169 90094 159 110571 68954 143 145616 148 83563 149 86154 132 82722 154 79740 170 86688 143 97033 158 87126 187 69839 125 100043 148 72633 133 80690 149 41841 113509 346 134876 247 80720 153473 414352 293 103906 276 140524 233 88041 236 123732 242 89870 238 105342 213 110773 319 121004 228 89237 793 94458 235 604864 400129 412 82639 598 72319 178 64423 157 35779 44536 235 55411 129 46051 29032 231]

The output with the change; ignore the first value:

[999893266 3189 2514 2257 2119 2252 2780 3402 2689 2916 2218 2385 4459 495 309 1289 578 4432 590 421 387 335 1156 272 1566 4933 1271 537 391 792 373 329 411 527 1764 782 322 1044 718 533 405 1183 337 230 1827 848 575 692 321 3514 504 491 772 1952 575 2931 1754 1279 781 403 1137 451 953 914 369 387 289 1796 473 1237 798 816 1215 690 495 389 403 1235 373 364 515 509 321 349 319 41810 27359 2582 2055 2177 2486 2181 1903 2207 2207 2005 1746 1802 1864 2169]

The change also fixes the deadlock case pointed in the program above.

Signed-off-by: Burcu Dogan <jbd@google.com>
This commit is contained in:
Burcu Dogan 2016-01-20 13:42:29 -08:00
parent fbb69b2fa0
commit 0cd5c21a50
2 changed files with 43 additions and 67 deletions

View file

@ -1,15 +1,9 @@
package chanotify package chanotify
import ( import (
"reflect"
"sync" "sync"
) )
type dataPair struct {
id string
selectCase reflect.SelectCase
}
// Notifier can effectively notify you about receiving from particular channels. // Notifier can effectively notify you about receiving from particular channels.
// It operates with pairs <-chan struct{} <-> string which is notification // It operates with pairs <-chan struct{} <-> string which is notification
// channel and its identificator respectively. // channel and its identificator respectively.
@ -17,85 +11,67 @@ type dataPair struct {
// notification from Notifier, close doesn't spawn anything and removes channel // notification from Notifier, close doesn't spawn anything and removes channel
// from Notifier. // from Notifier.
type Notifier struct { type Notifier struct {
c chan string c chan string
chMap map[<-chan struct{}]*dataPair
ctrl chan struct{} m sync.Mutex // guards doneCh
m sync.Mutex doneCh map[string]chan struct{}
} }
// New returns already running *Notifier. // New returns a new *Notifier.
func New() *Notifier { func New() *Notifier {
s := &Notifier{ s := &Notifier{
c: make(chan string), c: make(chan string),
chMap: make(map[<-chan struct{}]*dataPair), doneCh: make(map[string]chan struct{}),
ctrl: make(chan struct{}),
} }
go s.start()
return s return s
} }
// Chan returns channel on which client listen for notifications. // Chan returns channel on which client listen for notifications.
// Ids of notifications is sent to that channel. // IDs of notifications is sent to the returned channel.
func (s *Notifier) Chan() <-chan string { func (s *Notifier) Chan() <-chan string {
return s.c return s.c
} }
func (s *Notifier) killWorker(id string, done chan struct{}) {
s.m.Lock()
delete(s.doneCh, id)
s.m.Unlock()
}
// Add adds new notification channel to Notifier. // Add adds new notification channel to Notifier.
func (s *Notifier) Add(ch <-chan struct{}, id string) { func (s *Notifier) Add(ch <-chan struct{}, id string) {
done := make(chan struct{})
s.m.Lock() s.m.Lock()
s.chMap[ch] = &dataPair{ s.doneCh[id] = done
id: id,
selectCase: reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
},
}
s.m.Unlock() s.m.Unlock()
s.ctrl <- struct{}{}
}
// Close stops Notifier to listen for any notifications and closes its go func(ch <-chan struct{}, id string, done chan struct{}) {
// "client-side" channel. for {
func (s *Notifier) Close() { select {
close(s.ctrl) case _, ok := <-ch:
} if !ok {
// If the channel is closed, we don't need the goroutine
func (s *Notifier) start() { // or the done channel mechanism running anymore.
for { s.killWorker(id, done)
c := s.createCase() return
i, _, ok := reflect.Select(c) }
if i == 0 { s.c <- id
if ok { case <-done:
continue // We don't need this goroutine running anymore, return.
s.killWorker(id, done)
return
} }
// ctrl was closed, we can safely close output
close(s.c)
return
} }
ch := c[i].Chan.Interface().(<-chan struct{}) }(ch, id, done)
if ok {
s.c <- s.chMap[ch].id
continue
}
// the channel was closed and we should remove it
s.m.Lock()
delete(s.chMap, ch)
s.m.Unlock()
}
} }
func (s *Notifier) createCase() []reflect.SelectCase { // Close closes the notifier and releases its underlying resources.
// put ctrl channel as 0 element of select func (s *Notifier) Close() {
out := []reflect.SelectCase{
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.ctrl),
},
}
s.m.Lock() s.m.Lock()
for _, pair := range s.chMap { defer s.m.Unlock()
out = append(out, pair.selectCase) for _, done := range s.doneCh {
close(done)
} }
s.m.Unlock() close(s.c)
return out // TODO(jbd): Don't allow Add after Close returns.
} }

View file

@ -14,8 +14,8 @@ func TestNotifier(t *testing.T) {
s.Add(ch1, "1") s.Add(ch1, "1")
s.Add(ch2, "2") s.Add(ch2, "2")
s.m.Lock() s.m.Lock()
if len(s.chMap) != 2 { if len(s.doneCh) != 2 {
t.Fatalf("expected 2 channels, got %d", len(s.chMap)) t.Fatalf("expected 2 channels, got %d", len(s.doneCh))
} }
s.m.Unlock() s.m.Unlock()
ch1 <- struct{}{} ch1 <- struct{}{}
@ -32,8 +32,8 @@ func TestNotifier(t *testing.T) {
close(ch2) close(ch2)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
s.m.Lock() s.m.Lock()
if len(s.chMap) != 0 { if len(s.doneCh) != 0 {
t.Fatalf("expected 0 channels, got %d", len(s.chMap)) t.Fatalf("expected 0 channels, got %d", len(s.doneCh))
} }
s.m.Unlock() s.m.Unlock()
} }