Add locks around subscribers channel
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
71ef776082
commit
d9881ab912
2 changed files with 20 additions and 3 deletions
4
buffer.go
Normal file
4
buffer.go
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
package containerd
|
||||||
|
|
||||||
|
// DefaultBufferSize is the default size for a channel's buffer
|
||||||
|
const DefaultBufferSize = 2048
|
|
@ -33,7 +33,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err
|
||||||
processes: make(map[int]runtime.Container),
|
processes: make(map[int]runtime.Container),
|
||||||
runtime: r,
|
runtime: r,
|
||||||
tasks: tasks,
|
tasks: tasks,
|
||||||
events: make(chan *Event, 2048),
|
events: make(chan *Event, DefaultBufferSize),
|
||||||
machine: machine,
|
machine: machine,
|
||||||
subscribers: make(map[chan *Event]struct{}),
|
subscribers: make(map[chan *Event]struct{}),
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,7 @@ type Supervisor struct {
|
||||||
runtime runtime.Runtime
|
runtime runtime.Runtime
|
||||||
events chan *Event
|
events chan *Event
|
||||||
tasks chan *StartTask
|
tasks chan *StartTask
|
||||||
|
subscriberLock sync.RWMutex
|
||||||
subscribers map[chan *Event]struct{}
|
subscribers map[chan *Event]struct{}
|
||||||
machine Machine
|
machine Machine
|
||||||
containerGroup sync.WaitGroup
|
containerGroup sync.WaitGroup
|
||||||
|
@ -109,20 +110,32 @@ func (s *Supervisor) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Supervisor) Events() chan *Event {
|
func (s *Supervisor) Events() chan *Event {
|
||||||
c := make(chan *Event, 2048)
|
s.subscriberLock.Lock()
|
||||||
|
defer s.subscriberLock.Unlock()
|
||||||
|
c := make(chan *Event, DefaultBufferSize)
|
||||||
EventSubscriberCounter.Inc(1)
|
EventSubscriberCounter.Inc(1)
|
||||||
s.subscribers[c] = struct{}{}
|
s.subscribers[c] = struct{}{}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Supervisor) Unsubscribe(sub chan *Event) {
|
func (s *Supervisor) Unsubscribe(sub chan *Event) {
|
||||||
|
s.subscriberLock.Lock()
|
||||||
|
defer s.subscriberLock.Unlock()
|
||||||
delete(s.subscribers, sub)
|
delete(s.subscribers, sub)
|
||||||
|
close(sub)
|
||||||
EventSubscriberCounter.Dec(1)
|
EventSubscriberCounter.Dec(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Supervisor) NotifySubscribers(e *Event) {
|
func (s *Supervisor) NotifySubscribers(e *Event) {
|
||||||
|
s.subscriberLock.RLock()
|
||||||
|
defer s.subscriberLock.RUnlock()
|
||||||
for sub := range s.subscribers {
|
for sub := range s.subscribers {
|
||||||
sub <- e
|
// do a non-blocking send for the channel
|
||||||
|
select {
|
||||||
|
case sub <- e:
|
||||||
|
default:
|
||||||
|
logrus.WithField("event", e.Type).Warn("event not sent to subscriber")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue