Add comments to various functions

Don't export the notify subscribers method

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-10 14:11:00 -08:00
parent d9881ab912
commit e0da266b71
4 changed files with 28 additions and 19 deletions

View file

@ -43,11 +43,6 @@ var daemonFlags = []cli.Flag{
Value: "/run/containerd", Value: "/run/containerd",
Usage: "runtime state directory", Usage: "runtime state directory",
}, },
cli.IntFlag{
Name: "buffer-size",
Value: 2048,
Usage: "set the channel buffer size for events and signals",
},
cli.IntFlag{ cli.IntFlag{
Name: "c,concurrency", Name: "c,concurrency",
Value: 10, Value: 10,
@ -79,7 +74,6 @@ func main() {
context.String("id"), context.String("id"),
context.String("state-dir"), context.String("state-dir"),
context.Int("concurrency"), context.Int("concurrency"),
context.Int("buffer-size"),
); err != nil { ); err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
@ -121,7 +115,7 @@ func processMetrics() {
}() }()
} }
func daemon(id, stateDir string, concurrency, bufferSize int) error { func daemon(id, stateDir string, concurrency int) error {
tasks := make(chan *containerd.StartTask, concurrency*100) tasks := make(chan *containerd.StartTask, concurrency*100)
supervisor, err := containerd.NewSupervisor(id, stateDir, tasks) supervisor, err := containerd.NewSupervisor(id, stateDir, tasks)
if err != nil { if err != nil {
@ -143,7 +137,7 @@ func daemon(id, stateDir string, concurrency, bufferSize int) error {
} }
} }
// start the signal handler in the background. // start the signal handler in the background.
go startSignalHandler(supervisor, bufferSize) go startSignalHandler(supervisor, containerd.DefaultBufferSize)
if err := supervisor.Start(); err != nil { if err := supervisor.Start(); err != nil {
return err return err
} }

View file

@ -14,7 +14,7 @@ func (h *DeleteEvent) Handle(e *Event) error {
if err := h.deleteContainer(container); err != nil { if err := h.deleteContainer(container); err != nil {
logrus.WithField("error", err).Error("containerd: deleting container") logrus.WithField("error", err).Error("containerd: deleting container")
} }
h.s.NotifySubscribers(&Event{ h.s.notifySubscribers(&Event{
Type: ExitEventType, Type: ExitEventType,
ID: e.ID, ID: e.ID,
Status: e.Status, Status: e.Status,

View file

@ -47,6 +47,6 @@ func (h *ExecExitEvent) Handle(e *Event) error {
logrus.WithField("error", err).Error("containerd: find container for pid") logrus.WithField("error", err).Error("containerd: find container for pid")
} }
delete(h.s.processes, e.Pid) delete(h.s.processes, e.Pid)
h.s.NotifySubscribers(e) h.s.notifySubscribers(e)
return nil return nil
} }

View file

@ -63,12 +63,17 @@ type Supervisor struct {
runtime runtime.Runtime runtime runtime.Runtime
events chan *Event events chan *Event
tasks chan *StartTask tasks chan *StartTask
// we need a lock around the subscribers map only because additions and deletions from
// the map are via the API so we cannot really control the concurrency
subscriberLock sync.RWMutex subscriberLock sync.RWMutex
subscribers map[chan *Event]struct{} subscribers map[chan *Event]struct{}
machine Machine machine Machine
containerGroup sync.WaitGroup containerGroup sync.WaitGroup
} }
// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to
// terminate. After it has handled all the SIGCHILD events it will close the signals chan
// and exit. Stop is a non-blocking call and will return after the containers have been signaled
func (s *Supervisor) Stop(sig chan os.Signal) { func (s *Supervisor) Stop(sig chan os.Signal) {
// Close the tasks channel so that no new containers get started // Close the tasks channel so that no new containers get started
close(s.tasks) close(s.tasks)
@ -109,6 +114,8 @@ func (s *Supervisor) Close() error {
return nil return nil
} }
// Events returns an event channel that external consumers can use to receive updates
// on container events
func (s *Supervisor) Events() chan *Event { func (s *Supervisor) Events() chan *Event {
s.subscriberLock.Lock() s.subscriberLock.Lock()
defer s.subscriberLock.Unlock() defer s.subscriberLock.Unlock()
@ -118,6 +125,7 @@ func (s *Supervisor) Events() chan *Event {
return c return c
} }
// Unsubscribe removes the provided channel from receiving any more events
func (s *Supervisor) Unsubscribe(sub chan *Event) { func (s *Supervisor) Unsubscribe(sub chan *Event) {
s.subscriberLock.Lock() s.subscriberLock.Lock()
defer s.subscriberLock.Unlock() defer s.subscriberLock.Unlock()
@ -126,7 +134,9 @@ func (s *Supervisor) Unsubscribe(sub chan *Event) {
EventSubscriberCounter.Dec(1) EventSubscriberCounter.Dec(1)
} }
func (s *Supervisor) NotifySubscribers(e *Event) { // notifySubscribers will send the provided event to the external subscribers
// of the events channel
func (s *Supervisor) notifySubscribers(e *Event) {
s.subscriberLock.RLock() s.subscriberLock.RLock()
defer s.subscriberLock.RUnlock() defer s.subscriberLock.RUnlock()
for sub := range s.subscribers { for sub := range s.subscribers {
@ -142,7 +152,9 @@ func (s *Supervisor) NotifySubscribers(e *Event) {
// Start is a non-blocking call that runs the supervisor for monitoring contianer processes and // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
// executing new containers. // executing new containers.
// //
// This event loop is the only thing that is allowed to modify state of containers and processes. // This event loop is the only thing that is allowed to modify state of containers and processes
// therefore it is save to do operations in the handlers that modify state of the system or
// state of the Supervisor
func (s *Supervisor) Start() error { func (s *Supervisor) Start() error {
go func() { go func() {
// allocate an entire thread to this goroutine for the main event loop // allocate an entire thread to this goroutine for the main event loop
@ -178,6 +190,8 @@ func (s *Supervisor) Machine() Machine {
return s.machine return s.machine
} }
// getContainerForPid returns the container where the provided pid is the pid1 or main
// process in the container
func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
for _, container := range s.containers { for _, container := range s.containers {
cpid, err := container.Pid() cpid, err := container.Pid()
@ -196,6 +210,7 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) {
return nil, errNoContainerForPid return nil, errNoContainerForPid
} }
// SendEvent sends the provided event the the supervisors main event loop
func (s *Supervisor) SendEvent(evt *Event) { func (s *Supervisor) SendEvent(evt *Event) {
s.events <- evt s.events <- evt
} }