From e0da266b7120e593c12524199a978bcaeb88de23 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 10 Dec 2015 14:11:00 -0800 Subject: [PATCH] Add comments to various functions Don't export the notify subscribers method Signed-off-by: Michael Crosby --- containerd/main.go | 10 ++-------- delete.go | 2 +- exit.go | 2 +- supervisor.go | 33 ++++++++++++++++++++++++--------- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/containerd/main.go b/containerd/main.go index 5bc4017..e910d24 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -43,11 +43,6 @@ var daemonFlags = []cli.Flag{ Value: "/run/containerd", Usage: "runtime state directory", }, - cli.IntFlag{ - Name: "buffer-size", - Value: 2048, - Usage: "set the channel buffer size for events and signals", - }, cli.IntFlag{ Name: "c,concurrency", Value: 10, @@ -79,7 +74,6 @@ func main() { context.String("id"), context.String("state-dir"), context.Int("concurrency"), - context.Int("buffer-size"), ); err != nil { logrus.Fatal(err) } @@ -121,7 +115,7 @@ func processMetrics() { }() } -func daemon(id, stateDir string, concurrency, bufferSize int) error { +func daemon(id, stateDir string, concurrency int) error { tasks := make(chan *containerd.StartTask, concurrency*100) supervisor, err := containerd.NewSupervisor(id, stateDir, tasks) if err != nil { @@ -143,7 +137,7 @@ func daemon(id, stateDir string, concurrency, bufferSize int) error { } } // start the signal handler in the background. - go startSignalHandler(supervisor, bufferSize) + go startSignalHandler(supervisor, containerd.DefaultBufferSize) if err := supervisor.Start(); err != nil { return err } diff --git a/delete.go b/delete.go index dd30d18..12d2595 100644 --- a/delete.go +++ b/delete.go @@ -14,7 +14,7 @@ func (h *DeleteEvent) Handle(e *Event) error { if err := h.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - h.s.NotifySubscribers(&Event{ + h.s.notifySubscribers(&Event{ Type: ExitEventType, ID: e.ID, Status: e.Status, diff --git a/exit.go b/exit.go index 1cc309c..6bb758f 100644 --- a/exit.go +++ b/exit.go @@ -47,6 +47,6 @@ func (h *ExecExitEvent) Handle(e *Event) error { logrus.WithField("error", err).Error("containerd: find container for pid") } delete(h.s.processes, e.Pid) - h.s.NotifySubscribers(e) + h.s.notifySubscribers(e) return nil } diff --git a/supervisor.go b/supervisor.go index 428356c..bbbbfa5 100644 --- a/supervisor.go +++ b/supervisor.go @@ -56,19 +56,24 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. - stateDir string - containers map[string]runtime.Container - processes map[int]runtime.Container - handlers map[EventType]Handler - runtime runtime.Runtime - events chan *Event - tasks chan *StartTask + stateDir string + containers map[string]runtime.Container + processes map[int]runtime.Container + handlers map[EventType]Handler + runtime runtime.Runtime + events chan *Event + tasks chan *StartTask + // we need a lock around the subscribers map only because additions and deletions from + // the map are via the API so we cannot really control the concurrency subscriberLock sync.RWMutex subscribers map[chan *Event]struct{} machine Machine containerGroup sync.WaitGroup } +// Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to +// terminate. After it has handled all the SIGCHILD events it will close the signals chan +// and exit. Stop is a non-blocking call and will return after the containers have been signaled func (s *Supervisor) Stop(sig chan os.Signal) { // Close the tasks channel so that no new containers get started close(s.tasks) @@ -109,6 +114,8 @@ func (s *Supervisor) Close() error { return nil } +// Events returns an event channel that external consumers can use to receive updates +// on container events func (s *Supervisor) Events() chan *Event { s.subscriberLock.Lock() defer s.subscriberLock.Unlock() @@ -118,6 +125,7 @@ func (s *Supervisor) Events() chan *Event { return c } +// Unsubscribe removes the provided channel from receiving any more events func (s *Supervisor) Unsubscribe(sub chan *Event) { s.subscriberLock.Lock() defer s.subscriberLock.Unlock() @@ -126,7 +134,9 @@ func (s *Supervisor) Unsubscribe(sub chan *Event) { EventSubscriberCounter.Dec(1) } -func (s *Supervisor) NotifySubscribers(e *Event) { +// notifySubscribers will send the provided event to the external subscribers +// of the events channel +func (s *Supervisor) notifySubscribers(e *Event) { s.subscriberLock.RLock() defer s.subscriberLock.RUnlock() for sub := range s.subscribers { @@ -142,7 +152,9 @@ func (s *Supervisor) NotifySubscribers(e *Event) { // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and // executing new containers. // -// This event loop is the only thing that is allowed to modify state of containers and processes. +// This event loop is the only thing that is allowed to modify state of containers and processes +// therefore it is save to do operations in the handlers that modify state of the system or +// state of the Supervisor func (s *Supervisor) Start() error { go func() { // allocate an entire thread to this goroutine for the main event loop @@ -178,6 +190,8 @@ func (s *Supervisor) Machine() Machine { return s.machine } +// getContainerForPid returns the container where the provided pid is the pid1 or main +// process in the container func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { for _, container := range s.containers { cpid, err := container.Pid() @@ -196,6 +210,7 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { return nil, errNoContainerForPid } +// SendEvent sends the provided event the the supervisors main event loop func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt }