*: correctly wait and close servers

Signed-off-by: Antonio Murdaca <runcom@redhat.com>
This commit is contained in:
Antonio Murdaca 2017-09-06 16:02:05 +02:00
parent a81e90a9c9
commit dacc5c3ece
No known key found for this signature in database
GPG key ID: B2BEAD150DE936B9
2 changed files with 72 additions and 21 deletions

View file

@ -142,6 +142,12 @@ func catchShutdown(gserver *grpc.Server, sserver *server.Server, hserver *http.S
*signalled = true *signalled = true
gserver.GracefulStop() gserver.GracefulStop()
hserver.Shutdown(context.Background()) hserver.Shutdown(context.Background())
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//sserver.StopStreamServer()
sserver.StopExitMonitor()
if err := sserver.Shutdown(); err != nil {
logrus.Warnf("error shutting down main service %v", err)
}
return return
} }
}() }()
@ -426,22 +432,38 @@ func main() {
go s.Serve(grpcL) go s.Serve(grpcL)
go srv.Serve(httpL) go srv.Serve(httpL)
err = m.Serve() serverCloseCh := make(chan struct{})
if err != nil { go func() {
if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { defer close(serverCloseCh)
err = nil if err := m.Serve(); err != nil {
} else { if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
logrus.Fatal(err) err = nil
} else {
logrus.Errorf("Failed to serve grpc grpc request: %v", err)
}
} }
}()
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//streamServerCloseCh := service.StreamingServerCloseChan()
serverExitMonitorCh := service.ExitMonitorCloseChan()
select {
// TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//case <-streamServerCloseCh:
case <-serverExitMonitorCh:
case <-serverCloseCh:
} }
if err2 := service.Shutdown(); err2 != nil { service.Shutdown()
logrus.Infof("error shutting down layer storage: %v", err2)
} // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377
//<-streamServerCloseCh
//logrus.Debug("closed stream server")
<-serverExitMonitorCh
logrus.Debug("closed exit monitor")
<-serverCloseCh
logrus.Debug("closed main server")
if err != nil {
logrus.Fatal(err)
}
return nil return nil
} }

View file

@ -45,8 +45,9 @@ func isTrue(annotaton string) bool {
// streamService implements streaming.Runtime. // streamService implements streaming.Runtime.
type streamService struct { type streamService struct {
runtimeServer *Server // needed by Exec() endpoint runtimeServer *Server // needed by Exec() endpoint
streamServer streaming.Server streamServer streaming.Server
streamServerCloseCh chan struct{}
streaming.Runtime streaming.Runtime
} }
@ -65,9 +66,19 @@ type Server struct {
appArmorEnabled bool appArmorEnabled bool
appArmorProfile string appArmorProfile string
stream streamService bindAddress string
stream streamService
exitMonitorChan chan struct{}
}
bindAddress string // StopStreamServer stops the stream server
func (s *Server) StopStreamServer() error {
return s.stream.streamServer.Stop()
}
// StreamingServerCloseChan returns the close channel for the streaming server
func (s *Server) StreamingServerCloseChan() chan struct{} {
return s.stream.streamServerCloseCh
} }
// GetExec returns exec stream request // GetExec returns exec stream request
@ -201,6 +212,7 @@ func New(config *Config) (*Server, error) {
seccompEnabled: seccomp.IsEnabled(), seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile, appArmorProfile: config.ApparmorProfile,
exitMonitorChan: make(chan struct{}),
} }
if s.seccompEnabled { if s.seccompEnabled {
@ -251,9 +263,12 @@ func New(config *Config) (*Server, error) {
return nil, fmt.Errorf("unable to create streaming server") return nil, fmt.Errorf("unable to create streaming server")
} }
// TODO: Is it should be started somewhere else? s.stream.streamServerCloseCh = make(chan struct{})
go func() { go func() {
s.stream.streamServer.Start(true) defer close(s.stream.streamServerCloseCh)
if err := s.stream.streamServer.Start(true); err != nil {
logrus.Errorf("Failed to start streaming server: %v", err)
}
}() }()
logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes()) logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes())
@ -340,6 +355,15 @@ func (s *Server) CreateMetricsEndpoint() (*http.ServeMux, error) {
return mux, nil return mux, nil
} }
// StopExitMonitor stops the exit monitor
func (s *Server) StopExitMonitor() {
close(s.exitMonitorChan)
}
func (s *Server) ExitMonitorCloseChan() chan struct{} {
return s.exitMonitorChan
}
// StartExitMonitor start a routine that monitors container exits // StartExitMonitor start a routine that monitors container exits
// and updates the container status // and updates the container status
func (s *Server) StartExitMonitor() { func (s *Server) StartExitMonitor() {
@ -349,7 +373,7 @@ func (s *Server) StartExitMonitor() {
} }
defer watcher.Close() defer watcher.Close()
done := make(chan bool) done := make(chan struct{})
go func() { go func() {
for { for {
select { select {
@ -383,12 +407,17 @@ func (s *Server) StartExitMonitor() {
} }
case err := <-watcher.Errors: case err := <-watcher.Errors:
logrus.Debugf("watch error: %v", err) logrus.Debugf("watch error: %v", err)
done <- true close(done)
return
case <-s.exitMonitorChan:
logrus.Debug("closing exit monitor...")
close(done)
return
} }
} }
}() }()
if err := watcher.Add(s.config.ContainerExitsDir); err != nil { if err := watcher.Add(s.config.ContainerExitsDir); err != nil {
logrus.Fatalf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err) logrus.Errorf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err)
} }
<-done <-done
} }