Reject duplicate keys on the notifier.

Signed-off-by: Burcu Dogan <jbd@google.com>

mend
This commit is contained in:
Burcu Dogan 2016-02-17 12:07:54 -08:00
parent 8a8e29bb7b
commit e3a6c2b9ea
2 changed files with 49 additions and 16 deletions

View file

@ -1,6 +1,7 @@
package chanotify package chanotify
import ( import (
"errors"
"sync" "sync"
) )
@ -33,23 +34,33 @@ func (n *Notifier) Chan() <-chan interface{} {
return n.c return n.c
} }
// Add adds new notification channel to Notifier.
func (n *Notifier) Add(id interface{}, ch <-chan struct{}) error {
n.m.Lock()
defer n.m.Unlock()
if n.closed {
return errors.New("notifier closed; cannot add the channel on the notifier")
}
if _, ok := n.doneCh[id]; ok {
return errors.New("cannot register duplicate key")
}
done := make(chan struct{})
n.doneCh[id] = done
n.startWorker(ch, id, done)
return nil
}
func (n *Notifier) killWorker(id interface{}, done chan struct{}) { func (n *Notifier) killWorker(id interface{}, done chan struct{}) {
n.m.Lock() n.m.Lock()
delete(n.doneCh, id) delete(n.doneCh, id)
n.m.Unlock() n.m.Unlock()
} }
// Add adds new notification channel to Notifier. func (n *Notifier) startWorker(ch <-chan struct{}, id interface{}, done chan struct{}) {
func (n *Notifier) Add(id interface{}, ch <-chan struct{}) { go func() {
done := make(chan struct{})
n.m.Lock()
if n.closed {
panic("notifier closed; cannot add the channel")
}
n.doneCh[id] = done
n.m.Unlock()
go func(ch <-chan struct{}, id interface{}, done chan struct{}) {
for { for {
select { select {
case _, ok := <-ch: case _, ok := <-ch:
@ -66,7 +77,7 @@ func (n *Notifier) Add(id interface{}, ch <-chan struct{}) {
return return
} }
} }
}(ch, id, done) }()
} }
// Close closes the notifier and releases its underlying resources. // Close closes the notifier and releases its underlying resources.

View file

@ -13,8 +13,12 @@ func TestNotifier(t *testing.T) {
id1 := "1" id1 := "1"
id2 := "2" id2 := "2"
s.Add(id1, ch1) if err := s.Add(id1, ch1); err != nil {
s.Add(id2, ch2) t.Fatal(err)
}
if err := s.Add(id2, ch2); err != nil {
t.Fatal(err)
}
s.m.Lock() s.m.Lock()
if len(s.doneCh) != 2 { if len(s.doneCh) != 2 {
t.Fatalf("want 2 channels, got %d", len(s.doneCh)) t.Fatalf("want 2 channels, got %d", len(s.doneCh))
@ -43,7 +47,9 @@ func TestConcurrentNotifier(t *testing.T) {
var chs []chan struct{} var chs []chan struct{}
for i := 0; i < 8; i++ { for i := 0; i < 8; i++ {
ch := make(chan struct{}, 2) ch := make(chan struct{}, 2)
s.Add(i, ch) if err := s.Add(i, ch); err != nil {
t.Fatal(err)
}
chs = append(chs, ch) chs = append(chs, ch)
} }
testCounter := make(map[interface{}]int) testCounter := make(map[interface{}]int)
@ -86,10 +92,26 @@ func TestAddToBlocked(t *testing.T) {
go func() { go func() {
// give some time to start first select // give some time to start first select
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
s.Add(id, ch) if err := s.Add(id, ch); err != nil {
t.Fatal(err)
}
ch <- struct{}{} ch <- struct{}{}
}() }()
if got, want := <-s.Chan(), id; got != want { if got, want := <-s.Chan(), id; got != want {
t.Fatalf("got %v; want %v", got, want) t.Fatalf("got %v; want %v", got, want)
} }
} }
func TestAddDuplicate(t *testing.T) {
s := New()
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
if err := s.Add(1, ch1); err != nil {
t.Fatalf("cannot add; err = %v", err)
}
if err := s.Add(1, ch2); err == nil {
t.Fatalf("duplicate keys are not allowed; but Add succeeded")
}
}