diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..96fadaf --- /dev/null +++ b/buffer.go @@ -0,0 +1,4 @@ +package containerd + +// DefaultBufferSize is the default size for a channel's buffer +const DefaultBufferSize = 2048 diff --git a/supervisor.go b/supervisor.go index 1af293b..428356c 100644 --- a/supervisor.go +++ b/supervisor.go @@ -33,7 +33,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err processes: make(map[int]runtime.Container), runtime: r, tasks: tasks, - events: make(chan *Event, 2048), + events: make(chan *Event, DefaultBufferSize), machine: machine, subscribers: make(map[chan *Event]struct{}), } @@ -63,6 +63,7 @@ type Supervisor struct { runtime runtime.Runtime events chan *Event tasks chan *StartTask + subscriberLock sync.RWMutex subscribers map[chan *Event]struct{} machine Machine containerGroup sync.WaitGroup @@ -109,20 +110,32 @@ func (s *Supervisor) Close() error { } func (s *Supervisor) Events() chan *Event { - c := make(chan *Event, 2048) + s.subscriberLock.Lock() + defer s.subscriberLock.Unlock() + c := make(chan *Event, DefaultBufferSize) EventSubscriberCounter.Inc(1) s.subscribers[c] = struct{}{} return c } func (s *Supervisor) Unsubscribe(sub chan *Event) { + s.subscriberLock.Lock() + defer s.subscriberLock.Unlock() delete(s.subscribers, sub) + close(sub) EventSubscriberCounter.Dec(1) } func (s *Supervisor) NotifySubscribers(e *Event) { + s.subscriberLock.RLock() + defer s.subscriberLock.RUnlock() for sub := range s.subscribers { - sub <- e + // do a non-blocking send for the channel + select { + case sub <- e: + default: + logrus.WithField("event", e.Type).Warn("event not sent to subscriber") + } } }