This commit is contained in:
Mrunal Patel 2017-11-14 01:57:41 +00:00 committed by GitHub
commit 958164438f
3 changed files with 75 additions and 30 deletions

View file

@ -78,12 +78,18 @@ Example:
**conmon_env**=[]
Environment variable list for conmon process (default: ["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",])
**container_status_queue_size**=""
Number of container status events that could be buffered in the job queue.
**log_size_max**=""
Maximum sized allowed for the container log file (default: -1)
Negative numbers indicate that no size limit is imposed.
If it is positive, it must be >= 8192 (to match/exceed conmon read buffer).
The file is truncated and re-opened so the limit is never exceeded.
**num_status_workers**=""
Number of goroutines spawned to update container status asynchronously.
**pids_limit**=""
Maximum number of processes allowed in a container (default: 1024)

View file

@ -55,6 +55,14 @@ const (
// DefaultLogSizeMax is the default value for the maximum log size
// allowed for a container. Negative values mean that no limit is imposed.
DefaultLogSizeMax = -1
// DefaultNumStatusWorkers is the default value for number of goroutines
// spawned for updating container status.
DefaultNumStatusWorkers = 10
// DefaultContainerStatusQueueSize is the default value for the queue size
// for container status update jobs.
DefaultContainerStatusQueueSize = 100
)
// This structure is necessary to fake the TOML tables when parsing,
@ -165,6 +173,14 @@ type RuntimeConfig struct {
// ContainerExitsDir is the directory in which container exit files are
// written to by conmon.
ContainerExitsDir string `toml:"container_exits_dir"`
// NumStatusWorkers is the number of goroutines spawned to update container status
// asynchronously.
NumStatusWorkers uint `toml:"num_status_workers"`
// ContainerStatusQueueSize is the number of container status events that could
// be buffered in the job queue.
ContainerStatusQueueSize uint `toml:"container_status_queue_size"`
}
// ImageConfig represents the "crio.image" TOML config table.
@ -284,14 +300,16 @@ func DefaultConfig() *Config {
ConmonEnv: []string{
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
},
SELinux: selinux.GetEnabled(),
SeccompProfile: seccompProfilePath,
ApparmorProfile: apparmorProfileName,
CgroupManager: cgroupManager,
PidsLimit: DefaultPidsLimit,
ContainerExitsDir: containerExitsDir,
HooksDirPath: DefaultHooksDirPath,
LogSizeMax: DefaultLogSizeMax,
SELinux: selinux.GetEnabled(),
SeccompProfile: seccompProfilePath,
ApparmorProfile: apparmorProfileName,
CgroupManager: cgroupManager,
PidsLimit: DefaultPidsLimit,
ContainerExitsDir: containerExitsDir,
HooksDirPath: DefaultHooksDirPath,
LogSizeMax: DefaultLogSizeMax,
NumStatusWorkers: DefaultNumStatusWorkers,
ContainerStatusQueueSize: DefaultContainerStatusQueueSize,
},
ImageConfig: ImageConfig{
DefaultTransport: defaultTransport,

View file

@ -195,6 +195,14 @@ func New(config *Config) (*Server, error) {
return nil, err
}
if config.ContainerStatusQueueSize == 0 {
return nil, fmt.Errorf("container_status_queue_size should be atleast 1")
}
if config.NumStatusWorkers == 0 {
return nil, fmt.Errorf("num_status_workers should be atleast 1")
}
netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir)
if err != nil {
return nil, err
@ -374,6 +382,11 @@ func (s *Server) StartExitMonitor() {
}
defer watcher.Close()
containerStoppedJobs := make(chan string, s.config.ContainerStatusQueueSize)
for w := 1; w <= int(s.config.NumStatusWorkers); w++ {
go s.startStatusUpdateWorker(w, containerStoppedJobs)
}
done := make(chan struct{})
go func() {
for {
@ -383,28 +396,7 @@ func (s *Server) StartExitMonitor() {
if event.Op&fsnotify.Create == fsnotify.Create {
containerID := filepath.Base(event.Name)
logrus.Debugf("container or sandbox exited: %v", containerID)
c := s.GetContainer(containerID)
if c != nil {
logrus.Debugf("container exited and found: %v", containerID)
err := s.Runtime().UpdateStatus(c)
if err != nil {
logrus.Warnf("Failed to update container status %s: %v", c, err)
} else {
s.ContainerStateToDisk(c)
}
} else {
sb := s.GetSandbox(containerID)
if sb != nil {
c := sb.InfraContainer()
logrus.Debugf("sandbox exited and found: %v", containerID)
err := s.Runtime().UpdateStatus(c)
if err != nil {
logrus.Warnf("Failed to update sandbox infra container status %s: %v", c, err)
} else {
s.ContainerStateToDisk(c)
}
}
}
containerStoppedJobs <- containerID
}
case err := <-watcher.Errors:
logrus.Debugf("watch error: %v", err)
@ -423,3 +415,32 @@ func (s *Server) StartExitMonitor() {
}
<-done
}
// startStatusUpdateWorker accepts jobs in the form of container IDs whose
// status is updated by the function.
func (s *Server) startStatusUpdateWorker(workerID int, containerJobs <-chan string) {
for containerID := range containerJobs {
logrus.Debugf("StatusUpdateWorker %v processing %v", workerID, containerID)
// Find the container from ID. It could either be a regular container
// or a sandbox infra container.
c := s.GetContainer(containerID)
if c != nil {
logrus.Debugf("container exited and found: %v", containerID)
} else {
sb := s.GetSandbox(containerID)
if sb == nil {
return
}
c = sb.InfraContainer()
logrus.Debugf("sandbox exited and found: %v", containerID)
}
// Update the status of the container and save state to disk.
if err := s.Runtime().UpdateStatus(c); err != nil {
logrus.Warnf("Failed to update container status %s: %v", c, err)
} else {
s.ContainerStateToDisk(c)
}
}
}