Add status update workers
This allows us to process more stop inotify exit events concurrently. Signed-off-by: Mrunal Patel <mpatel@redhat.com>
This commit is contained in:
parent
8d78e3cfac
commit
a283ade632
2 changed files with 69 additions and 30 deletions
|
@ -55,6 +55,14 @@ const (
|
|||
// DefaultLogSizeMax is the default value for the maximum log size
|
||||
// allowed for a container. Negative values mean that no limit is imposed.
|
||||
DefaultLogSizeMax = -1
|
||||
|
||||
// DefaultNumStatusWorkers is the default value for number of goroutines
|
||||
// spawned for updating container status.
|
||||
DefaultNumStatusWorkers = 10
|
||||
|
||||
// DefaultContainerStatusQueueSize is the default value for the queue size
|
||||
// for container status update jobs.
|
||||
DefaultContainerStatusQueueSize = 100
|
||||
)
|
||||
|
||||
// This structure is necessary to fake the TOML tables when parsing,
|
||||
|
@ -165,6 +173,14 @@ type RuntimeConfig struct {
|
|||
// ContainerExitsDir is the directory in which container exit files are
|
||||
// written to by conmon.
|
||||
ContainerExitsDir string `toml:"container_exits_dir"`
|
||||
|
||||
// NumStatusWorkers is the number of goroutines spawned to update container status
|
||||
// asynchronously.
|
||||
NumStatusWorkers uint `toml:"num_status_workers"`
|
||||
|
||||
// ContainerStatusQueueSize is the number of container status events that could
|
||||
// be buffered in the job queue.
|
||||
ContainerStatusQueueSize uint `toml:"container_status_queue_size"`
|
||||
}
|
||||
|
||||
// ImageConfig represents the "crio.image" TOML config table.
|
||||
|
@ -284,14 +300,16 @@ func DefaultConfig() *Config {
|
|||
ConmonEnv: []string{
|
||||
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
||||
},
|
||||
SELinux: selinux.GetEnabled(),
|
||||
SeccompProfile: seccompProfilePath,
|
||||
ApparmorProfile: apparmorProfileName,
|
||||
CgroupManager: cgroupManager,
|
||||
PidsLimit: DefaultPidsLimit,
|
||||
ContainerExitsDir: containerExitsDir,
|
||||
HooksDirPath: DefaultHooksDirPath,
|
||||
LogSizeMax: DefaultLogSizeMax,
|
||||
SELinux: selinux.GetEnabled(),
|
||||
SeccompProfile: seccompProfilePath,
|
||||
ApparmorProfile: apparmorProfileName,
|
||||
CgroupManager: cgroupManager,
|
||||
PidsLimit: DefaultPidsLimit,
|
||||
ContainerExitsDir: containerExitsDir,
|
||||
HooksDirPath: DefaultHooksDirPath,
|
||||
LogSizeMax: DefaultLogSizeMax,
|
||||
NumStatusWorkers: DefaultNumStatusWorkers,
|
||||
ContainerStatusQueueSize: DefaultContainerStatusQueueSize,
|
||||
},
|
||||
ImageConfig: ImageConfig{
|
||||
DefaultTransport: defaultTransport,
|
||||
|
|
|
@ -194,6 +194,14 @@ func New(config *Config) (*Server, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if config.ContainerStatusQueueSize == 0 {
|
||||
return nil, fmt.Errorf("container_status_queue_size should be atleast 1")
|
||||
}
|
||||
|
||||
if config.NumStatusWorkers == 0 {
|
||||
return nil, fmt.Errorf("num_status_workers should be atleast 1")
|
||||
}
|
||||
|
||||
netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -373,6 +381,11 @@ func (s *Server) StartExitMonitor() {
|
|||
}
|
||||
defer watcher.Close()
|
||||
|
||||
containerStoppedJobs := make(chan string, s.config.ContainerStatusQueueSize)
|
||||
for w := 1; w <= int(s.config.NumStatusWorkers); w++ {
|
||||
go s.startStatusUpdateWorker(w, containerStoppedJobs)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
|
@ -382,28 +395,7 @@ func (s *Server) StartExitMonitor() {
|
|||
if event.Op&fsnotify.Create == fsnotify.Create {
|
||||
containerID := filepath.Base(event.Name)
|
||||
logrus.Debugf("container or sandbox exited: %v", containerID)
|
||||
c := s.GetContainer(containerID)
|
||||
if c != nil {
|
||||
logrus.Debugf("container exited and found: %v", containerID)
|
||||
err := s.Runtime().UpdateStatus(c)
|
||||
if err != nil {
|
||||
logrus.Warnf("Failed to update container status %s: %v", c, err)
|
||||
} else {
|
||||
s.ContainerStateToDisk(c)
|
||||
}
|
||||
} else {
|
||||
sb := s.GetSandbox(containerID)
|
||||
if sb != nil {
|
||||
c := sb.InfraContainer()
|
||||
logrus.Debugf("sandbox exited and found: %v", containerID)
|
||||
err := s.Runtime().UpdateStatus(c)
|
||||
if err != nil {
|
||||
logrus.Warnf("Failed to update sandbox infra container status %s: %v", c, err)
|
||||
} else {
|
||||
s.ContainerStateToDisk(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
containerStoppedJobs <- containerID
|
||||
}
|
||||
case err := <-watcher.Errors:
|
||||
logrus.Debugf("watch error: %v", err)
|
||||
|
@ -421,3 +413,32 @@ func (s *Server) StartExitMonitor() {
|
|||
}
|
||||
<-done
|
||||
}
|
||||
|
||||
// startStatusUpdateWorker accepts jobs in the form of container IDs whose
|
||||
// status is updated by the function.
|
||||
func (s *Server) startStatusUpdateWorker(workerID int, containerJobs <-chan string) {
|
||||
for containerID := range containerJobs {
|
||||
logrus.Debugf("StatusUpdateWorker %v processing %v", workerID, containerID)
|
||||
|
||||
// Find the container from ID. It could either be a regular container
|
||||
// or a sandbox infra container.
|
||||
c := s.GetContainer(containerID)
|
||||
if c != nil {
|
||||
logrus.Debugf("container exited and found: %v", containerID)
|
||||
} else {
|
||||
sb := s.GetSandbox(containerID)
|
||||
if sb == nil {
|
||||
return
|
||||
}
|
||||
c = sb.InfraContainer()
|
||||
logrus.Debugf("sandbox exited and found: %v", containerID)
|
||||
}
|
||||
|
||||
// Update the status of the container and save state to disk.
|
||||
if err := s.Runtime().UpdateStatus(c); err != nil {
|
||||
logrus.Warnf("Failed to update container status %s: %v", c, err)
|
||||
} else {
|
||||
s.ContainerStateToDisk(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue