From dacc5c3ece43ca645b2177171704e19a23a7a1e0 Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Wed, 6 Sep 2017 16:02:05 +0200 Subject: [PATCH] *: correctly wait and close servers Signed-off-by: Antonio Murdaca --- cmd/crio/main.go | 46 ++++++++++++++++++++++++++++++++++------------ server/server.go | 47 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 72 insertions(+), 21 deletions(-) diff --git a/cmd/crio/main.go b/cmd/crio/main.go index f15b37c2..4273e126 100644 --- a/cmd/crio/main.go +++ b/cmd/crio/main.go @@ -142,6 +142,12 @@ func catchShutdown(gserver *grpc.Server, sserver *server.Server, hserver *http.S *signalled = true gserver.GracefulStop() hserver.Shutdown(context.Background()) + // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377 + //sserver.StopStreamServer() + sserver.StopExitMonitor() + if err := sserver.Shutdown(); err != nil { + logrus.Warnf("error shutting down main service %v", err) + } return } }() @@ -426,22 +432,38 @@ func main() { go s.Serve(grpcL) go srv.Serve(httpL) - err = m.Serve() - if err != nil { - if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { - err = nil - } else { - logrus.Fatal(err) + serverCloseCh := make(chan struct{}) + go func() { + defer close(serverCloseCh) + if err := m.Serve(); err != nil { + if graceful && strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { + err = nil + } else { + logrus.Errorf("Failed to serve grpc grpc request: %v", err) + } } + }() + + // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377 + //streamServerCloseCh := service.StreamingServerCloseChan() + serverExitMonitorCh := service.ExitMonitorCloseChan() + select { + // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377 + //case <-streamServerCloseCh: + case <-serverExitMonitorCh: + case <-serverCloseCh: } - if err2 := service.Shutdown(); err2 != nil { - logrus.Infof("error shutting down layer storage: %v", err2) - } + service.Shutdown() + + // TODO(runcom): enable this after https://github.com/kubernetes/kubernetes/pull/51377 + //<-streamServerCloseCh + //logrus.Debug("closed stream server") + <-serverExitMonitorCh + logrus.Debug("closed exit monitor") + <-serverCloseCh + logrus.Debug("closed main server") - if err != nil { - logrus.Fatal(err) - } return nil } diff --git a/server/server.go b/server/server.go index 63016ab6..b32220ab 100644 --- a/server/server.go +++ b/server/server.go @@ -45,8 +45,9 @@ func isTrue(annotaton string) bool { // streamService implements streaming.Runtime. type streamService struct { - runtimeServer *Server // needed by Exec() endpoint - streamServer streaming.Server + runtimeServer *Server // needed by Exec() endpoint + streamServer streaming.Server + streamServerCloseCh chan struct{} streaming.Runtime } @@ -65,9 +66,19 @@ type Server struct { appArmorEnabled bool appArmorProfile string - stream streamService + bindAddress string + stream streamService + exitMonitorChan chan struct{} +} - bindAddress string +// StopStreamServer stops the stream server +func (s *Server) StopStreamServer() error { + return s.stream.streamServer.Stop() +} + +// StreamingServerCloseChan returns the close channel for the streaming server +func (s *Server) StreamingServerCloseChan() chan struct{} { + return s.stream.streamServerCloseCh } // GetExec returns exec stream request @@ -201,6 +212,7 @@ func New(config *Config) (*Server, error) { seccompEnabled: seccomp.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(), appArmorProfile: config.ApparmorProfile, + exitMonitorChan: make(chan struct{}), } if s.seccompEnabled { @@ -251,9 +263,12 @@ func New(config *Config) (*Server, error) { return nil, fmt.Errorf("unable to create streaming server") } - // TODO: Is it should be started somewhere else? + s.stream.streamServerCloseCh = make(chan struct{}) go func() { - s.stream.streamServer.Start(true) + defer close(s.stream.streamServerCloseCh) + if err := s.stream.streamServer.Start(true); err != nil { + logrus.Errorf("Failed to start streaming server: %v", err) + } }() logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes()) @@ -340,6 +355,15 @@ func (s *Server) CreateMetricsEndpoint() (*http.ServeMux, error) { return mux, nil } +// StopExitMonitor stops the exit monitor +func (s *Server) StopExitMonitor() { + close(s.exitMonitorChan) +} + +func (s *Server) ExitMonitorCloseChan() chan struct{} { + return s.exitMonitorChan +} + // StartExitMonitor start a routine that monitors container exits // and updates the container status func (s *Server) StartExitMonitor() { @@ -349,7 +373,7 @@ func (s *Server) StartExitMonitor() { } defer watcher.Close() - done := make(chan bool) + done := make(chan struct{}) go func() { for { select { @@ -383,12 +407,17 @@ func (s *Server) StartExitMonitor() { } case err := <-watcher.Errors: logrus.Debugf("watch error: %v", err) - done <- true + close(done) + return + case <-s.exitMonitorChan: + logrus.Debug("closing exit monitor...") + close(done) + return } } }() if err := watcher.Add(s.config.ContainerExitsDir); err != nil { - logrus.Fatalf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err) + logrus.Errorf("watcher.Add(%q) failed: %s", s.config.ContainerExitsDir, err) } <-done }