e2f3518af8
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
101 lines
2.1 KiB
Go
101 lines
2.1 KiB
Go
package chanotify
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
)
|
|
|
|
type dataPair struct {
|
|
id string
|
|
selectCase reflect.SelectCase
|
|
}
|
|
|
|
// Notifier can effectively notify you about receiving from particular channels.
|
|
// It operates with pairs <-chan struct{} <-> string which is notification
|
|
// channel and its identificator respectively.
|
|
// Notification channel is <-chan struc{}, each send to which is spawn
|
|
// notification from Notifier, close doesn't spawn anything and removes channel
|
|
// from Notifier.
|
|
type Notifier struct {
|
|
c chan string
|
|
chMap map[<-chan struct{}]*dataPair
|
|
ctrl chan struct{}
|
|
m sync.Mutex
|
|
}
|
|
|
|
// New returns already running *Notifier.
|
|
func New() *Notifier {
|
|
s := &Notifier{
|
|
c: make(chan string),
|
|
chMap: make(map[<-chan struct{}]*dataPair),
|
|
ctrl: make(chan struct{}),
|
|
}
|
|
go s.start()
|
|
return s
|
|
}
|
|
|
|
// Chan returns channel on which client listen for notifications.
|
|
// Ids of notifications is sent to that channel.
|
|
func (s *Notifier) Chan() <-chan string {
|
|
return s.c
|
|
}
|
|
|
|
// Add adds new notification channel to Notifier.
|
|
func (s *Notifier) Add(ch <-chan struct{}, id string) {
|
|
s.m.Lock()
|
|
s.chMap[ch] = &dataPair{
|
|
id: id,
|
|
selectCase: reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(ch),
|
|
},
|
|
}
|
|
s.m.Unlock()
|
|
s.ctrl <- struct{}{}
|
|
}
|
|
|
|
// Close stops Notifier to listen for any notifications and closes its
|
|
// "client-side" channel.
|
|
func (s *Notifier) Close() {
|
|
close(s.ctrl)
|
|
}
|
|
|
|
func (s *Notifier) start() {
|
|
for {
|
|
c := s.createCase()
|
|
i, _, ok := reflect.Select(c)
|
|
if i == 0 {
|
|
if ok {
|
|
continue
|
|
}
|
|
// ctrl was closed, we can safely close output
|
|
close(s.c)
|
|
return
|
|
}
|
|
ch := c[i].Chan.Interface().(<-chan struct{})
|
|
if ok {
|
|
s.c <- s.chMap[ch].id
|
|
continue
|
|
}
|
|
// the channel was closed and we should remove it
|
|
s.m.Lock()
|
|
delete(s.chMap, ch)
|
|
s.m.Unlock()
|
|
}
|
|
}
|
|
|
|
func (s *Notifier) createCase() []reflect.SelectCase {
|
|
// put ctrl channel as 0 element of select
|
|
out := []reflect.SelectCase{
|
|
reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(s.ctrl),
|
|
},
|
|
}
|
|
s.m.Lock()
|
|
for _, pair := range s.chMap {
|
|
out = append(out, pair.selectCase)
|
|
}
|
|
s.m.Unlock()
|
|
return out
|
|
}
|