From 0cd5c21a50fd2a25f11352ec50c6b38540a37a0a Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Wed, 20 Jan 2016 13:42:29 -0800 Subject: [PATCH] Remove reflect from chanotify and fix a deadlock case. With the change, the read rate from (*Notifier).Chan() is 10-20 times faster and more consistent. Consider the following program: s := chanotify.New() for i := 0; i < n; i++ { ch := make(chan struct{}, 1) s.Add(ch, fmt.Sprintf("%d", i)) go func(ch chan struct{}) { time.Sleep(time.Second) // because your original code has a deadlock case. ch <- struct{}{} }(ch) } avgs := make([]int64, n) go func() { for i := 0; i < n; i++ { start := time.Now() <-s.Chan() avgs[i] = time.Now().Sub(start).Nanoseconds() } }() time.Sleep(10 * time.Second) s.Close() fmt.Println(avgs) The output without the change; ignore the first value: [1000469322 739 100492 200 75412 77733 316 88873 695 137905 244 72197 196 84444 175 80858 169 125514 165 73509 885 739963 248 72569 169 90094 159 110571 68954 143 145616 148 83563 149 86154 132 82722 154 79740 170 86688 143 97033 158 87126 187 69839 125 100043 148 72633 133 80690 149 41841 113509 346 134876 247 80720 153473 414352 293 103906 276 140524 233 88041 236 123732 242 89870 238 105342 213 110773 319 121004 228 89237 793 94458 235 604864 400129 412 82639 598 72319 178 64423 157 35779 44536 235 55411 129 46051 29032 231] The output with the change; ignore the first value: [999893266 3189 2514 2257 2119 2252 2780 3402 2689 2916 2218 2385 4459 495 309 1289 578 4432 590 421 387 335 1156 272 1566 4933 1271 537 391 792 373 329 411 527 1764 782 322 1044 718 533 405 1183 337 230 1827 848 575 692 321 3514 504 491 772 1952 575 2931 1754 1279 781 403 1137 451 953 914 369 387 289 1796 473 1237 798 816 1215 690 495 389 403 1235 373 364 515 509 321 349 319 41810 27359 2582 2055 2177 2486 2181 1903 2207 2207 2005 1746 1802 1864 2169] The change also fixes the deadlock case pointed in the program above. Signed-off-by: Burcu Dogan --- chanotify/chanotify.go | 102 ++++++++++++++---------------------- chanotify/chanotify_test.go | 8 +-- 2 files changed, 43 insertions(+), 67 deletions(-) diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go index 182b8a3..75cea31 100644 --- a/chanotify/chanotify.go +++ b/chanotify/chanotify.go @@ -1,15 +1,9 @@ package chanotify import ( - "reflect" "sync" ) -type dataPair struct { - id string - selectCase reflect.SelectCase -} - // Notifier can effectively notify you about receiving from particular channels. // It operates with pairs <-chan struct{} <-> string which is notification // channel and its identificator respectively. @@ -17,85 +11,67 @@ type dataPair struct { // notification from Notifier, close doesn't spawn anything and removes channel // from Notifier. type Notifier struct { - c chan string - chMap map[<-chan struct{}]*dataPair - ctrl chan struct{} - m sync.Mutex + c chan string + + m sync.Mutex // guards doneCh + doneCh map[string]chan struct{} } -// New returns already running *Notifier. +// New returns a new *Notifier. func New() *Notifier { s := &Notifier{ - c: make(chan string), - chMap: make(map[<-chan struct{}]*dataPair), - ctrl: make(chan struct{}), + c: make(chan string), + doneCh: make(map[string]chan struct{}), } - go s.start() return s } // Chan returns channel on which client listen for notifications. -// Ids of notifications is sent to that channel. +// IDs of notifications is sent to the returned channel. func (s *Notifier) Chan() <-chan string { return s.c } +func (s *Notifier) killWorker(id string, done chan struct{}) { + s.m.Lock() + delete(s.doneCh, id) + s.m.Unlock() +} + // Add adds new notification channel to Notifier. func (s *Notifier) Add(ch <-chan struct{}, id string) { + done := make(chan struct{}) s.m.Lock() - s.chMap[ch] = &dataPair{ - id: id, - selectCase: reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ch), - }, - } + s.doneCh[id] = done s.m.Unlock() - s.ctrl <- struct{}{} -} -// Close stops Notifier to listen for any notifications and closes its -// "client-side" channel. -func (s *Notifier) Close() { - close(s.ctrl) -} - -func (s *Notifier) start() { - for { - c := s.createCase() - i, _, ok := reflect.Select(c) - if i == 0 { - if ok { - continue + go func(ch <-chan struct{}, id string, done chan struct{}) { + for { + select { + case _, ok := <-ch: + if !ok { + // If the channel is closed, we don't need the goroutine + // or the done channel mechanism running anymore. + s.killWorker(id, done) + return + } + s.c <- id + case <-done: + // We don't need this goroutine running anymore, return. + s.killWorker(id, done) + return } - // ctrl was closed, we can safely close output - close(s.c) - return } - ch := c[i].Chan.Interface().(<-chan struct{}) - if ok { - s.c <- s.chMap[ch].id - continue - } - // the channel was closed and we should remove it - s.m.Lock() - delete(s.chMap, ch) - s.m.Unlock() - } + }(ch, id, done) } -func (s *Notifier) createCase() []reflect.SelectCase { - // put ctrl channel as 0 element of select - out := []reflect.SelectCase{ - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(s.ctrl), - }, - } +// Close closes the notifier and releases its underlying resources. +func (s *Notifier) Close() { s.m.Lock() - for _, pair := range s.chMap { - out = append(out, pair.selectCase) + defer s.m.Unlock() + for _, done := range s.doneCh { + close(done) } - s.m.Unlock() - return out + close(s.c) + // TODO(jbd): Don't allow Add after Close returns. } diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go index ffb55b5..c88fc3e 100644 --- a/chanotify/chanotify_test.go +++ b/chanotify/chanotify_test.go @@ -14,8 +14,8 @@ func TestNotifier(t *testing.T) { s.Add(ch1, "1") s.Add(ch2, "2") s.m.Lock() - if len(s.chMap) != 2 { - t.Fatalf("expected 2 channels, got %d", len(s.chMap)) + if len(s.doneCh) != 2 { + t.Fatalf("expected 2 channels, got %d", len(s.doneCh)) } s.m.Unlock() ch1 <- struct{}{} @@ -32,8 +32,8 @@ func TestNotifier(t *testing.T) { close(ch2) time.Sleep(100 * time.Millisecond) s.m.Lock() - if len(s.chMap) != 0 { - t.Fatalf("expected 0 channels, got %d", len(s.chMap)) + if len(s.doneCh) != 0 { + t.Fatalf("expected 0 channels, got %d", len(s.doneCh)) } s.m.Unlock() }