76cf593212
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
80 lines
1.6 KiB
Go
80 lines
1.6 KiB
Go
package containerd
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
)
|
|
|
|
func newNotifier(s *Supervisor) *notifier {
|
|
n := ¬ifier{
|
|
s: s,
|
|
channels: make(map[<-chan struct{}]string),
|
|
controller: make(chan struct{}),
|
|
}
|
|
go n.start()
|
|
return n
|
|
}
|
|
|
|
type notifier struct {
|
|
m sync.Mutex
|
|
channels map[<-chan struct{}]string
|
|
controller chan struct{}
|
|
s *Supervisor
|
|
}
|
|
|
|
func (n *notifier) start() {
|
|
for {
|
|
c := n.createCase()
|
|
i, _, ok := reflect.Select(c)
|
|
if i == 0 {
|
|
continue
|
|
}
|
|
if ok {
|
|
ch := c[i].Chan.Interface().(<-chan struct{})
|
|
id := n.channels[ch]
|
|
e := NewEvent(OOMEventType)
|
|
e.ID = id
|
|
n.s.SendEvent(e)
|
|
continue
|
|
}
|
|
// the channel was closed and we should remove it
|
|
ch := c[i].Chan.Interface().(<-chan struct{})
|
|
n.removeChan(ch)
|
|
}
|
|
}
|
|
|
|
func (n *notifier) Add(ch <-chan struct{}, id string) {
|
|
n.m.Lock()
|
|
n.channels[ch] = id
|
|
n.m.Unlock()
|
|
// signal the main loop to break and add the new
|
|
// channels
|
|
n.controller <- struct{}{}
|
|
}
|
|
|
|
func (n *notifier) createCase() []reflect.SelectCase {
|
|
var out []reflect.SelectCase
|
|
// add controller chan so that we can signal when we need to make
|
|
// changes in the select. The controller chan will always be at
|
|
// index 0 in the slice
|
|
out = append(out, reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(n.controller),
|
|
})
|
|
n.m.Lock()
|
|
for ch := range n.channels {
|
|
v := reflect.ValueOf(ch)
|
|
out = append(out, reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: v,
|
|
})
|
|
}
|
|
n.m.Unlock()
|
|
return out
|
|
}
|
|
|
|
func (n *notifier) removeChan(ch <-chan struct{}) {
|
|
n.m.Lock()
|
|
delete(n.channels, ch)
|
|
n.m.Unlock()
|
|
}
|