Merge pull request #852 from runcom/fixies-42

*: bunch of fixes...
This commit is contained in:
Mrunal Patel 2017-09-07 07:37:37 -07:00 committed by GitHub
commit 7f4f630b98
4 changed files with 183 additions and 39 deletions

View file

@ -142,6 +142,12 @@ func catchShutdown(gserver *grpc.Server, sserver *server.Server, hserver *http.S
*signalled = true *signalled = true
gserver.GracefulStop() gserver.GracefulStop()
hserver.Shutdown(context.Background()) hserver.Shutdown(context.Background())
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//sserver.StopStreamServer()
sserver.StopExitMonitor()
if err := sserver.Shutdown(); err != nil {
logrus.Warnf("error shutting down main service %v", err)
}
return return
} }
}() }()
@ -426,22 +432,38 @@ func main() {
go s.Serve(grpcL) go s.Serve(grpcL)
go srv.Serve(httpL) go srv.Serve(httpL)
err = m.Serve() serverCloseCh := make(chan struct{})
if err != nil { go func() {
defer close(serverCloseCh)
if err := m.Serve(); err != nil {
if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
err = nil err = nil
} else { } else {
logrus.Fatal(err) logrus.Errorf("Failed to serve grpc grpc request: %v", err)
} }
} }
}()
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//streamServerCloseCh := service.StreamingServerCloseChan()
serverExitMonitorCh := service.ExitMonitorCloseChan()
select {
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//case <-streamServerCloseCh:
case <-serverExitMonitorCh:
case <-serverCloseCh:
}
if err2 := service.Shutdown(); err2 != nil { service.Shutdown()
logrus.Infof("error shutting down layer storage: %v", err2)
} // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//<-streamServerCloseCh
//logrus.Debug("closed stream server")
<-serverExitMonitorCh
logrus.Debug("closed exit monitor")
<-serverCloseCh
logrus.Debug("closed main server")
if err != nil {
logrus.Fatal(err)
}
return nil return nil
} }

View file

@ -124,13 +124,57 @@ func addImageVolumes(rootfs string, s *Server, containerInfo *storage.ContainerI
return nil return nil
} }
// resolveSymbolicLink resolves a possbile symlink path. If the path is a symlink, returns resolved
// path; if not, returns the original path.
func resolveSymbolicLink(path string) (string, error) {
info, err := os.Lstat(path)
if err != nil {
return "", err
}
if info.Mode()&os.ModeSymlink != os.ModeSymlink {
return path, nil
}
return filepath.EvalSymlinks(path)
}
func addDevices(sb *sandbox.Sandbox, containerConfig *pb.ContainerConfig, specgen *generate.Generator) error { func addDevices(sb *sandbox.Sandbox, containerConfig *pb.ContainerConfig, specgen *generate.Generator) error {
sp := specgen.Spec() sp := specgen.Spec()
for _, device := range containerConfig.GetDevices() { if containerConfig.GetLinux().GetSecurityContext().Privileged {
dev, err := devices.DeviceFromPath(device.HostPath, device.Permissions) hostDevices, err := devices.HostDevices()
if err != nil { if err != nil {
return fmt.Errorf("failed to add device: %v", err) return err
} }
for _, hostDevice := range hostDevices {
rd := rspec.LinuxDevice{
Path: hostDevice.Path,
Type: string(hostDevice.Type),
Major: hostDevice.Major,
Minor: hostDevice.Minor,
UID: &hostDevice.Uid,
GID: &hostDevice.Gid,
}
if hostDevice.Major == 0 && hostDevice.Minor == 0 {
// Invalid device, most likely a symbolic link, skip it.
continue
}
specgen.AddDevice(rd)
}
sp.Linux.Resources.Devices = []rspec.LinuxDeviceCgroup{
{
Allow: true,
Access: "rwm",
},
}
return nil
}
for _, device := range containerConfig.GetDevices() {
path, err := resolveSymbolicLink(device.HostPath)
if err != nil {
return err
}
dev, err := devices.DeviceFromPath(path, device.Permissions)
// if there was no error, return the device
if err == nil {
rd := rspec.LinuxDevice{ rd := rspec.LinuxDevice{
Path: device.ContainerPath, Path: device.ContainerPath,
Type: string(dev.Type), Type: string(dev.Type),
@ -147,6 +191,44 @@ func addDevices(sb *sandbox.Sandbox, containerConfig *pb.ContainerConfig, specge
Minor: &dev.Minor, Minor: &dev.Minor,
Access: dev.Permissions, Access: dev.Permissions,
}) })
continue
}
// if the device is not a device node
// try to see if it's a directory holding many devices
if err == devices.ErrNotADevice {
// check if it is a directory
if src, e := os.Stat(path); e == nil && src.IsDir() {
// mount the internal devices recursively
filepath.Walk(path, func(dpath string, f os.FileInfo, e error) error {
childDevice, e := devices.DeviceFromPath(dpath, device.Permissions)
if e != nil {
// ignore the device
return nil
}
cPath := strings.Replace(dpath, path, device.ContainerPath, 1)
rd := rspec.LinuxDevice{
Path: cPath,
Type: string(childDevice.Type),
Major: childDevice.Major,
Minor: childDevice.Minor,
UID: &childDevice.Uid,
GID: &childDevice.Gid,
}
specgen.AddDevice(rd)
sp.Linux.Resources.Devices = append(sp.Linux.Resources.Devices, rspec.LinuxDeviceCgroup{
Allow: true,
Type: string(childDevice.Type),
Major: &childDevice.Major,
Minor: &childDevice.Minor,
Access: childDevice.Permissions,
})
return nil
})
}
}
} }
return nil return nil
} }
@ -419,6 +501,9 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
var readOnlyRootfs bool var readOnlyRootfs bool
if containerConfig.GetLinux().GetSecurityContext() != nil { if containerConfig.GetLinux().GetSecurityContext() != nil {
if containerConfig.GetLinux().GetSecurityContext().Privileged { if containerConfig.GetLinux().GetSecurityContext().Privileged {
if !sb.Privileged() {
return nil, fmt.Errorf("no privileged container allowed in sandbox")
}
specgen.SetupPrivileged(true) specgen.SetupPrivileged(true)
} }
@ -451,6 +536,9 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
}).Debugf("setting container's log_path") }).Debugf("setting container's log_path")
specgen.SetProcessTerminal(containerConfig.Tty) specgen.SetProcessTerminal(containerConfig.Tty)
if containerConfig.Tty {
specgen.AddProcessEnv("TERM", "xterm")
}
linux := containerConfig.GetLinux() linux := containerConfig.GetLinux()
if linux != nil { if linux != nil {

View file

@ -35,6 +35,8 @@ const (
// TODO: Remove this const once this value is provided over CRI // TODO: Remove this const once this value is provided over CRI
// See https://github.com/kubernetes/kubernetes/issues/47938 // See https://github.com/kubernetes/kubernetes/issues/47938
PodInfraOOMAdj int = -998 PodInfraOOMAdj int = -998
// PodInfraCPUshares is default cpu shares for sandbox container.
PodInfraCPUshares = 2
) )
// privilegedSandbox returns true if the sandbox configuration // privilegedSandbox returns true if the sandbox configuration
@ -389,6 +391,8 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
// so it doesn't get killed. // so it doesn't get killed.
g.SetProcessOOMScoreAdj(PodInfraOOMAdj) g.SetProcessOOMScoreAdj(PodInfraOOMAdj)
g.SetLinuxResourcesCPUShares(PodInfraCPUshares)
hostNetwork := req.GetConfig().GetLinux().GetSecurityContext().GetNamespaceOptions().HostNetwork hostNetwork := req.GetConfig().GetLinux().GetSecurityContext().GetNamespaceOptions().HostNetwork
// set up namespaces // set up namespaces

View file

@ -47,6 +47,7 @@ func isTrue(annotaton string) bool {
type streamService struct { type streamService struct {
runtimeServer *Server // needed by Exec() endpoint runtimeServer *Server // needed by Exec() endpoint
streamServer streaming.Server streamServer streaming.Server
streamServerCloseCh chan struct{}
streaming.Runtime streaming.Runtime
} }
@ -65,9 +66,19 @@ type Server struct {
appArmorEnabled bool appArmorEnabled bool
appArmorProfile string appArmorProfile string
stream streamService
bindAddress string bindAddress string
stream streamService
exitMonitorChan chan struct{}
}
// StopStreamServer stops the stream server
func (s *Server) StopStreamServer() error {
return s.stream.streamServer.Stop()
}
// StreamingServerCloseChan returns the close channel for the streaming server
func (s *Server) StreamingServerCloseChan() chan struct{} {
return s.stream.streamServerCloseCh
} }
// GetExec returns exec stream request // GetExec returns exec stream request
@ -201,6 +212,7 @@ func New(config *Config) (*Server, error) {
seccompEnabled: seccomp.IsEnabled(), seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile, appArmorProfile: config.ApparmorProfile,
exitMonitorChan: make(chan struct{}),
} }
if s.seccompEnabled { if s.seccompEnabled {
@ -251,9 +263,12 @@ func New(config *Config) (*Server, error) {
return nil, fmt.Errorf("unable to create streaming server") return nil, fmt.Errorf("unable to create streaming server")
} }
// TODO: Is it should be started somewhere else? s.stream.streamServerCloseCh = make(chan struct{})
go func() { go func() {
s.stream.streamServer.Start(true) defer close(s.stream.streamServerCloseCh)
if err := s.stream.streamServer.Start(true); err != nil {
logrus.Errorf("Failed to start streaming server: %v", err)
}
}() }()
logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes()) logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes())
@ -340,6 +355,16 @@ func (s *Server) CreateMetricsEndpoint() (*http.ServeMux, error) {
return mux, nil return mux, nil
} }
// StopExitMonitor stops the exit monitor
func (s *Server) StopExitMonitor() {
close(s.exitMonitorChan)
}
// ExitMonitorCloseChan returns the close chan for the exit monitor
func (s *Server) ExitMonitorCloseChan() chan struct{} {
return s.exitMonitorChan
}
// StartExitMonitor start a routine that monitors container exits // StartExitMonitor start a routine that monitors container exits
// and updates the container status // and updates the container status
func (s *Server) StartExitMonitor() { func (s *Server) StartExitMonitor() {
@ -349,7 +374,7 @@ func (s *Server) StartExitMonitor() {
} }
defer watcher.Close() defer watcher.Close()
done := make(chan bool) done := make(chan struct{})
go func() { go func() {
for { for {
select { select {
@ -383,12 +408,17 @@ func (s *Server) StartExitMonitor() {
} }
case err := <-watcher.Errors: case err := <-watcher.Errors:
logrus.Debugf("watch error: %v", err) logrus.Debugf("watch error: %v", err)
done <- true close(done)
return
case <-s.exitMonitorChan:
logrus.Debug("closing exit monitor...")
close(done)
return
} }
} }
}() }()
if err := watcher.Add(s.config.ContainerExitsDir); err != nil { if err := watcher.Add(s.config.ContainerExitsDir); err != nil {
logrus.Fatalf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err) logrus.Errorf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err)
} }
<-done <-done
} }