diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go index 182b8a3..75cea31 100644 --- a/chanotify/chanotify.go +++ b/chanotify/chanotify.go @@ -1,15 +1,9 @@ package chanotify import ( - "reflect" "sync" ) -type dataPair struct { - id string - selectCase reflect.SelectCase -} - // Notifier can effectively notify you about receiving from particular channels. // It operates with pairs <-chan struct{} <-> string which is notification // channel and its identificator respectively. @@ -17,85 +11,67 @@ type dataPair struct { // notification from Notifier, close doesn't spawn anything and removes channel // from Notifier. type Notifier struct { - c chan string - chMap map[<-chan struct{}]*dataPair - ctrl chan struct{} - m sync.Mutex + c chan string + + m sync.Mutex // guards doneCh + doneCh map[string]chan struct{} } -// New returns already running *Notifier. +// New returns a new *Notifier. func New() *Notifier { s := &Notifier{ - c: make(chan string), - chMap: make(map[<-chan struct{}]*dataPair), - ctrl: make(chan struct{}), + c: make(chan string), + doneCh: make(map[string]chan struct{}), } - go s.start() return s } // Chan returns channel on which client listen for notifications. -// Ids of notifications is sent to that channel. +// IDs of notifications is sent to the returned channel. func (s *Notifier) Chan() <-chan string { return s.c } +func (s *Notifier) killWorker(id string, done chan struct{}) { + s.m.Lock() + delete(s.doneCh, id) + s.m.Unlock() +} + // Add adds new notification channel to Notifier. func (s *Notifier) Add(ch <-chan struct{}, id string) { + done := make(chan struct{}) s.m.Lock() - s.chMap[ch] = &dataPair{ - id: id, - selectCase: reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ch), - }, - } + s.doneCh[id] = done s.m.Unlock() - s.ctrl <- struct{}{} -} -// Close stops Notifier to listen for any notifications and closes its -// "client-side" channel. -func (s *Notifier) Close() { - close(s.ctrl) -} - -func (s *Notifier) start() { - for { - c := s.createCase() - i, _, ok := reflect.Select(c) - if i == 0 { - if ok { - continue + go func(ch <-chan struct{}, id string, done chan struct{}) { + for { + select { + case _, ok := <-ch: + if !ok { + // If the channel is closed, we don't need the goroutine + // or the done channel mechanism running anymore. + s.killWorker(id, done) + return + } + s.c <- id + case <-done: + // We don't need this goroutine running anymore, return. + s.killWorker(id, done) + return } - // ctrl was closed, we can safely close output - close(s.c) - return } - ch := c[i].Chan.Interface().(<-chan struct{}) - if ok { - s.c <- s.chMap[ch].id - continue - } - // the channel was closed and we should remove it - s.m.Lock() - delete(s.chMap, ch) - s.m.Unlock() - } + }(ch, id, done) } -func (s *Notifier) createCase() []reflect.SelectCase { - // put ctrl channel as 0 element of select - out := []reflect.SelectCase{ - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(s.ctrl), - }, - } +// Close closes the notifier and releases its underlying resources. +func (s *Notifier) Close() { s.m.Lock() - for _, pair := range s.chMap { - out = append(out, pair.selectCase) + defer s.m.Unlock() + for _, done := range s.doneCh { + close(done) } - s.m.Unlock() - return out + close(s.c) + // TODO(jbd): Don't allow Add after Close returns. } diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go index ffb55b5..c88fc3e 100644 --- a/chanotify/chanotify_test.go +++ b/chanotify/chanotify_test.go @@ -14,8 +14,8 @@ func TestNotifier(t *testing.T) { s.Add(ch1, "1") s.Add(ch2, "2") s.m.Lock() - if len(s.chMap) != 2 { - t.Fatalf("expected 2 channels, got %d", len(s.chMap)) + if len(s.doneCh) != 2 { + t.Fatalf("expected 2 channels, got %d", len(s.doneCh)) } s.m.Unlock() ch1 <- struct{}{} @@ -32,8 +32,8 @@ func TestNotifier(t *testing.T) { close(ch2) time.Sleep(100 * time.Millisecond) s.m.Lock() - if len(s.chMap) != 0 { - t.Fatalf("expected 0 channels, got %d", len(s.chMap)) + if len(s.doneCh) != 0 { + t.Fatalf("expected 0 channels, got %d", len(s.doneCh)) } s.m.Unlock() }