diff --git a/chanotify/chanotify.go b/chanotify/chanotify.go new file mode 100644 index 0000000..75d376b --- /dev/null +++ b/chanotify/chanotify.go @@ -0,0 +1,97 @@ +package chanotify + +import ( + "reflect" + "sync" +) + +type dataPair struct { + id string + selectCase reflect.SelectCase +} + +// Notifier can effectively notify you about receiving from particular channels. +// It operates with pairs <-chan struct{} <-> string which is notification +// channel and its identificator respectively. +// Notification channel is <-chan struc{}, each send to which is spawn +// notification from Notifier, close doesn't spawn anything and removes channel +// from Notifier. +type Notifier struct { + c chan string + chMap map[<-chan struct{}]*dataPair + exit chan struct{} + m sync.Mutex +} + +// New returns already running *Notifier. +func New() *Notifier { + s := &Notifier{ + c: make(chan string), + chMap: make(map[<-chan struct{}]*dataPair), + exit: make(chan struct{}), + } + go s.start() + return s +} + +// Chan returns channel on which client listen for notifications. +// Ids of notifications is sent to that channel. +func (s *Notifier) Chan() <-chan string { + return s.c +} + +// Add adds new notification channel to Notifier. +func (s *Notifier) Add(ch <-chan struct{}, id string) { + s.m.Lock() + s.chMap[ch] = &dataPair{ + id: id, + selectCase: reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }, + } + s.m.Unlock() +} + +// Close stops Notifier to listen for any notifications and closes its +// "client-side" channel. +func (s *Notifier) Close() { + close(s.exit) +} + +func (s *Notifier) start() { + for { + c := s.createCase() + i, _, ok := reflect.Select(c) + if i == 0 { + // exit was closed, we can safely close output + close(s.c) + return + } + ch := c[i].Chan.Interface().(<-chan struct{}) + if ok { + s.c <- s.chMap[ch].id + continue + } + // the channel was closed and we should remove it + s.m.Lock() + delete(s.chMap, ch) + s.m.Unlock() + } +} + +func (s *Notifier) createCase() []reflect.SelectCase { + // put exit channel as 0 element of select + out := []reflect.SelectCase{ + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(s.exit), + }, + } + s.m.Lock() + for _, pair := range s.chMap { + out = append(out, pair.selectCase) + } + s.m.Unlock() + return out +} diff --git a/chanotify/chanotify_test.go b/chanotify/chanotify_test.go new file mode 100644 index 0000000..34afd75 --- /dev/null +++ b/chanotify/chanotify_test.go @@ -0,0 +1,80 @@ +package chanotify + +import ( + "strconv" + "sync" + "testing" + "time" +) + +func TestNotifier(t *testing.T) { + s := New() + ch1 := make(chan struct{}, 1) + ch2 := make(chan struct{}, 1) + s.Add(ch1, "1") + s.Add(ch2, "2") + s.m.Lock() + if len(s.chMap) != 2 { + t.Fatalf("expected 2 channels, got %d", len(s.chMap)) + } + s.m.Unlock() + ch1 <- struct{}{} + id1 := <-s.Chan() + if id1 != "1" { + t.Fatalf("1 should be spawned, got %s", id1) + } + ch2 <- struct{}{} + id2 := <-s.Chan() + if id2 != "2" { + t.Fatalf("2 should be spawned, got %s", id2) + } + close(ch1) + close(ch2) + time.Sleep(100 * time.Millisecond) + s.m.Lock() + if len(s.chMap) != 0 { + t.Fatalf("expected 0 channels, got %d", len(s.chMap)) + } + s.m.Unlock() +} + +func TestConcurrentNotifier(t *testing.T) { + s := New() + var chs []chan struct{} + for i := 0; i < 8; i++ { + ch := make(chan struct{}, 2) + s.Add(ch, strconv.Itoa(i)) + chs = append(chs, ch) + } + testCounter := make(map[string]int) + done := make(chan struct{}) + go func() { + for id := range s.Chan() { + testCounter[id]++ + } + close(done) + }() + var wg sync.WaitGroup + for _, ch := range chs { + wg.Add(1) + go func(ch chan struct{}) { + ch <- struct{}{} + ch <- struct{}{} + close(ch) + wg.Done() + }(ch) + } + wg.Wait() + // wait for notifications + time.Sleep(1 * time.Second) + s.Close() + <-done + if len(testCounter) != 8 { + t.Fatalf("expect to find exactly 8 distinct ids, got %d", len(testCounter)) + } + for id, c := range testCounter { + if c != 2 { + t.Fatalf("Expected to find exactly 2 id %s, but got %d", id, c) + } + } +}