diff --git a/docs/crio.conf.5.md b/docs/crio.conf.5.md index 32cac7a4..8e65fe28 100644 --- a/docs/crio.conf.5.md +++ b/docs/crio.conf.5.md @@ -78,12 +78,18 @@ Example: **conmon_env**=[] Environment variable list for conmon process (default: ["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",]) +**container_status_queue_size**="" + Number of container status events that could be buffered in the job queue. + **log_size_max**="" Maximum sized allowed for the container log file (default: -1) Negative numbers indicate that no size limit is imposed. If it is positive, it must be >= 8192 (to match/exceed conmon read buffer). The file is truncated and re-opened so the limit is never exceeded. +**num_status_workers**="" + Number of goroutines spawned to update container status asynchronously. + **pids_limit**="" Maximum number of processes allowed in a container (default: 1024) diff --git a/libkpod/config.go b/libkpod/config.go index 687b4b38..3be1e7b8 100644 --- a/libkpod/config.go +++ b/libkpod/config.go @@ -55,6 +55,14 @@ const ( // DefaultLogSizeMax is the default value for the maximum log size // allowed for a container. Negative values mean that no limit is imposed. DefaultLogSizeMax = -1 + + // DefaultNumStatusWorkers is the default value for number of goroutines + // spawned for updating container status. + DefaultNumStatusWorkers = 10 + + // DefaultContainerStatusQueueSize is the default value for the queue size + // for container status update jobs. + DefaultContainerStatusQueueSize = 100 ) // This structure is necessary to fake the TOML tables when parsing, @@ -165,6 +173,14 @@ type RuntimeConfig struct { // ContainerExitsDir is the directory in which container exit files are // written to by conmon. ContainerExitsDir string `toml:"container_exits_dir"` + + // NumStatusWorkers is the number of goroutines spawned to update container status + // asynchronously. + NumStatusWorkers uint `toml:"num_status_workers"` + + // ContainerStatusQueueSize is the number of container status events that could + // be buffered in the job queue. + ContainerStatusQueueSize uint `toml:"container_status_queue_size"` } // ImageConfig represents the "crio.image" TOML config table. @@ -284,14 +300,16 @@ func DefaultConfig() *Config { ConmonEnv: []string{ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", }, - SELinux: selinux.GetEnabled(), - SeccompProfile: seccompProfilePath, - ApparmorProfile: apparmorProfileName, - CgroupManager: cgroupManager, - PidsLimit: DefaultPidsLimit, - ContainerExitsDir: containerExitsDir, - HooksDirPath: DefaultHooksDirPath, - LogSizeMax: DefaultLogSizeMax, + SELinux: selinux.GetEnabled(), + SeccompProfile: seccompProfilePath, + ApparmorProfile: apparmorProfileName, + CgroupManager: cgroupManager, + PidsLimit: DefaultPidsLimit, + ContainerExitsDir: containerExitsDir, + HooksDirPath: DefaultHooksDirPath, + LogSizeMax: DefaultLogSizeMax, + NumStatusWorkers: DefaultNumStatusWorkers, + ContainerStatusQueueSize: DefaultContainerStatusQueueSize, }, ImageConfig: ImageConfig{ DefaultTransport: defaultTransport, diff --git a/server/server.go b/server/server.go index 709209b3..cdba69d5 100644 --- a/server/server.go +++ b/server/server.go @@ -195,6 +195,14 @@ func New(config *Config) (*Server, error) { return nil, err } + if config.ContainerStatusQueueSize == 0 { + return nil, fmt.Errorf("container_status_queue_size should be atleast 1") + } + + if config.NumStatusWorkers == 0 { + return nil, fmt.Errorf("num_status_workers should be atleast 1") + } + netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir) if err != nil { return nil, err @@ -374,6 +382,11 @@ func (s *Server) StartExitMonitor() { } defer watcher.Close() + containerStoppedJobs := make(chan string, s.config.ContainerStatusQueueSize) + for w := 1; w <= int(s.config.NumStatusWorkers); w++ { + go s.startStatusUpdateWorker(w, containerStoppedJobs) + } + done := make(chan struct{}) go func() { for { @@ -383,28 +396,7 @@ func (s *Server) StartExitMonitor() { if event.Op&fsnotify.Create == fsnotify.Create { containerID := filepath.Base(event.Name) logrus.Debugf("container or sandbox exited: %v", containerID) - c := s.GetContainer(containerID) - if c != nil { - logrus.Debugf("container exited and found: %v", containerID) - err := s.Runtime().UpdateStatus(c) - if err != nil { - logrus.Warnf("Failed to update container status %s: %v", c, err) - } else { - s.ContainerStateToDisk(c) - } - } else { - sb := s.GetSandbox(containerID) - if sb != nil { - c := sb.InfraContainer() - logrus.Debugf("sandbox exited and found: %v", containerID) - err := s.Runtime().UpdateStatus(c) - if err != nil { - logrus.Warnf("Failed to update sandbox infra container status %s: %v", c, err) - } else { - s.ContainerStateToDisk(c) - } - } - } + containerStoppedJobs <- containerID } case err := <-watcher.Errors: logrus.Debugf("watch error: %v", err) @@ -423,3 +415,32 @@ func (s *Server) StartExitMonitor() { } <-done } + +// startStatusUpdateWorker accepts jobs in the form of container IDs whose +// status is updated by the function. +func (s *Server) startStatusUpdateWorker(workerID int, containerJobs <-chan string) { + for containerID := range containerJobs { + logrus.Debugf("StatusUpdateWorker %v processing %v", workerID, containerID) + + // Find the container from ID. It could either be a regular container + // or a sandbox infra container. + c := s.GetContainer(containerID) + if c != nil { + logrus.Debugf("container exited and found: %v", containerID) + } else { + sb := s.GetSandbox(containerID) + if sb == nil { + return + } + c = sb.InfraContainer() + logrus.Debugf("sandbox exited and found: %v", containerID) + } + + // Update the status of the container and save state to disk. + if err := s.Runtime().UpdateStatus(c); err != nil { + logrus.Warnf("Failed to update container status %s: %v", c, err) + } else { + s.ContainerStateToDisk(c) + } + } +}