chanotify should use interface{} keys
Fixes #79. Signed-off-by: Burcu Dogan <jbd@golang.org>
This commit is contained in:
parent
85bc51df33
commit
bc4f1aae01
4 changed files with 17 additions and 17 deletions
|
@ -11,41 +11,41 @@ import (
|
||||||
// notification from Notifier, close doesn't spawn anything and removes channel
|
// notification from Notifier, close doesn't spawn anything and removes channel
|
||||||
// from Notifier.
|
// from Notifier.
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
c chan string
|
c chan interface{}
|
||||||
|
|
||||||
m sync.Mutex // guards doneCh
|
m sync.Mutex // guards doneCh
|
||||||
doneCh map[string]chan struct{}
|
doneCh map[interface{}]chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new *Notifier.
|
// New returns a new *Notifier.
|
||||||
func New() *Notifier {
|
func New() *Notifier {
|
||||||
s := &Notifier{
|
s := &Notifier{
|
||||||
c: make(chan string),
|
c: make(chan interface{}),
|
||||||
doneCh: make(map[string]chan struct{}),
|
doneCh: make(map[interface{}]chan struct{}),
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chan returns channel on which client listen for notifications.
|
// Chan returns channel on which client listen for notifications.
|
||||||
// IDs of notifications is sent to the returned channel.
|
// IDs of notifications is sent to the returned channel.
|
||||||
func (n *Notifier) Chan() <-chan string {
|
func (n *Notifier) Chan() <-chan interface{} {
|
||||||
return n.c
|
return n.c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notifier) killWorker(id string, done chan struct{}) {
|
func (n *Notifier) killWorker(id interface{}, done chan struct{}) {
|
||||||
n.m.Lock()
|
n.m.Lock()
|
||||||
delete(n.doneCh, id)
|
delete(n.doneCh, id)
|
||||||
n.m.Unlock()
|
n.m.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds new notification channel to Notifier.
|
// Add adds new notification channel to Notifier.
|
||||||
func (n *Notifier) Add(ch <-chan struct{}, id string) {
|
func (n *Notifier) Add(id interface{}, ch <-chan struct{}) {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
n.m.Lock()
|
n.m.Lock()
|
||||||
n.doneCh[id] = done
|
n.doneCh[id] = done
|
||||||
n.m.Unlock()
|
n.m.Unlock()
|
||||||
|
|
||||||
go func(ch <-chan struct{}, id string, done chan struct{}) {
|
go func(ch <-chan struct{}, id interface{}, done chan struct{}) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case _, ok := <-ch:
|
case _, ok := <-ch:
|
||||||
|
|
|
@ -11,8 +11,8 @@ func TestNotifier(t *testing.T) {
|
||||||
s := New()
|
s := New()
|
||||||
ch1 := make(chan struct{}, 1)
|
ch1 := make(chan struct{}, 1)
|
||||||
ch2 := make(chan struct{}, 1)
|
ch2 := make(chan struct{}, 1)
|
||||||
s.Add(ch1, "1")
|
s.Add("1", ch1)
|
||||||
s.Add(ch2, "2")
|
s.Add("2", ch2)
|
||||||
s.m.Lock()
|
s.m.Lock()
|
||||||
if len(s.doneCh) != 2 {
|
if len(s.doneCh) != 2 {
|
||||||
t.Fatalf("expected 2 channels, got %d", len(s.doneCh))
|
t.Fatalf("expected 2 channels, got %d", len(s.doneCh))
|
||||||
|
@ -20,12 +20,12 @@ func TestNotifier(t *testing.T) {
|
||||||
s.m.Unlock()
|
s.m.Unlock()
|
||||||
ch1 <- struct{}{}
|
ch1 <- struct{}{}
|
||||||
id1 := <-s.Chan()
|
id1 := <-s.Chan()
|
||||||
if id1 != "1" {
|
if id1.(string) != "1" {
|
||||||
t.Fatalf("1 should be spawned, got %s", id1)
|
t.Fatalf("1 should be spawned, got %s", id1)
|
||||||
}
|
}
|
||||||
ch2 <- struct{}{}
|
ch2 <- struct{}{}
|
||||||
id2 := <-s.Chan()
|
id2 := <-s.Chan()
|
||||||
if id2 != "2" {
|
if id2.(string) != "2" {
|
||||||
t.Fatalf("2 should be spawned, got %s", id2)
|
t.Fatalf("2 should be spawned, got %s", id2)
|
||||||
}
|
}
|
||||||
close(ch1)
|
close(ch1)
|
||||||
|
@ -43,10 +43,10 @@ func TestConcurrentNotifier(t *testing.T) {
|
||||||
var chs []chan struct{}
|
var chs []chan struct{}
|
||||||
for i := 0; i < 8; i++ {
|
for i := 0; i < 8; i++ {
|
||||||
ch := make(chan struct{}, 2)
|
ch := make(chan struct{}, 2)
|
||||||
s.Add(ch, strconv.Itoa(i))
|
s.Add(strconv.Itoa(i), ch)
|
||||||
chs = append(chs, ch)
|
chs = append(chs, ch)
|
||||||
}
|
}
|
||||||
testCounter := make(map[string]int)
|
testCounter := make(map[interface{}]int)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
for id := range s.Chan() {
|
for id := range s.Chan() {
|
||||||
|
@ -85,7 +85,7 @@ func TestAddToBlocked(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
// give some time to start first select
|
// give some time to start first select
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
s.Add(ch, "1")
|
s.Add("1", ch)
|
||||||
ch <- struct{}{}
|
ch <- struct{}{}
|
||||||
}()
|
}()
|
||||||
val := <-s.Chan()
|
val := <-s.Chan()
|
||||||
|
|
|
@ -50,7 +50,7 @@ func New(id, stateDir string, tasks chan *StartTask, oom bool) (*Supervisor, err
|
||||||
go func() {
|
go func() {
|
||||||
for id := range s.notifier.Chan() {
|
for id := range s.notifier.Chan() {
|
||||||
e := NewEvent(OOMEventType)
|
e := NewEvent(OOMEventType)
|
||||||
e.ID = id
|
e.ID = id.(string)
|
||||||
s.SendEvent(e)
|
s.SendEvent(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (w *worker) Start() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
logrus.WithField("error", err).Error("containerd: notify OOM events")
|
||||||
} else {
|
} else {
|
||||||
w.s.notifier.Add(n, t.Container.ID())
|
w.s.notifier.Add(t.Container.ID(), n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ContainerStartTimer.UpdateSince(started)
|
ContainerStartTimer.UpdateSince(started)
|
||||||
|
|
Loading…
Reference in a new issue