Merge pull request #56 from LK4D4/fix_chan
Fix adding to chanotify on blocking select
This commit is contained in:
commit
f51744f8c0
2 changed files with 25 additions and 6 deletions
|
@ -19,7 +19,7 @@ type dataPair struct {
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
c chan string
|
c chan string
|
||||||
chMap map[<-chan struct{}]*dataPair
|
chMap map[<-chan struct{}]*dataPair
|
||||||
exit chan struct{}
|
ctrl chan struct{}
|
||||||
m sync.Mutex
|
m sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ func New() *Notifier {
|
||||||
s := &Notifier{
|
s := &Notifier{
|
||||||
c: make(chan string),
|
c: make(chan string),
|
||||||
chMap: make(map[<-chan struct{}]*dataPair),
|
chMap: make(map[<-chan struct{}]*dataPair),
|
||||||
exit: make(chan struct{}),
|
ctrl: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go s.start()
|
go s.start()
|
||||||
return s
|
return s
|
||||||
|
@ -51,12 +51,13 @@ func (s *Notifier) Add(ch <-chan struct{}, id string) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s.m.Unlock()
|
s.m.Unlock()
|
||||||
|
s.ctrl <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close stops Notifier to listen for any notifications and closes its
|
// Close stops Notifier to listen for any notifications and closes its
|
||||||
// "client-side" channel.
|
// "client-side" channel.
|
||||||
func (s *Notifier) Close() {
|
func (s *Notifier) Close() {
|
||||||
close(s.exit)
|
close(s.ctrl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Notifier) start() {
|
func (s *Notifier) start() {
|
||||||
|
@ -64,7 +65,10 @@ func (s *Notifier) start() {
|
||||||
c := s.createCase()
|
c := s.createCase()
|
||||||
i, _, ok := reflect.Select(c)
|
i, _, ok := reflect.Select(c)
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
// exit was closed, we can safely close output
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// ctrl was closed, we can safely close output
|
||||||
close(s.c)
|
close(s.c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -81,11 +85,11 @@ func (s *Notifier) start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Notifier) createCase() []reflect.SelectCase {
|
func (s *Notifier) createCase() []reflect.SelectCase {
|
||||||
// put exit channel as 0 element of select
|
// put ctrl channel as 0 element of select
|
||||||
out := []reflect.SelectCase{
|
out := []reflect.SelectCase{
|
||||||
reflect.SelectCase{
|
reflect.SelectCase{
|
||||||
Dir: reflect.SelectRecv,
|
Dir: reflect.SelectRecv,
|
||||||
Chan: reflect.ValueOf(s.exit),
|
Chan: reflect.ValueOf(s.ctrl),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s.m.Lock()
|
s.m.Lock()
|
||||||
|
|
|
@ -78,3 +78,18 @@ func TestConcurrentNotifier(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddToBlocked(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
// give some time to start first select
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
s.Add(ch, "1")
|
||||||
|
ch <- struct{}{}
|
||||||
|
}()
|
||||||
|
val := <-s.Chan()
|
||||||
|
if val != "1" {
|
||||||
|
t.Fatalf("Expected 1, got %s", val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue