Implement proper shutdown logic
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
a10aa91051
commit
c10b3cde9f
6 changed files with 67 additions and 20 deletions
|
@ -325,9 +325,13 @@ func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) {
|
||||||
name = vars["name"]
|
name = vars["name"]
|
||||||
)
|
)
|
||||||
var cp Checkpoint
|
var cp Checkpoint
|
||||||
if err := json.NewDecoder(r.Body).Decode(&cp); err != nil {
|
// most options to the checkpoint action can be left out so don't
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
// decode unless the client passed anything in the body.
|
||||||
return
|
if r.ContentLength > 0 {
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&cp); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
e := containerd.NewEvent(containerd.CreateCheckpointEventType)
|
e := containerd.NewEvent(containerd.CreateCheckpointEventType)
|
||||||
e.ID = id
|
e.ID = id
|
||||||
|
|
|
@ -86,8 +86,11 @@ func daemon(stateDir string, concurrency, bufferSize int) error {
|
||||||
w := containerd.NewWorker(supervisor, wg)
|
w := containerd.NewWorker(supervisor, wg)
|
||||||
go w.Start()
|
go w.Start()
|
||||||
}
|
}
|
||||||
if err := setSubReaper(); err != nil {
|
// only set containerd as the subreaper if it is not an init process
|
||||||
return err
|
if pid := os.Getpid(); pid != 1 {
|
||||||
|
if err := setSubReaper(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// start the signal handler in the background.
|
// start the signal handler in the background.
|
||||||
go startSignalHandler(supervisor, bufferSize)
|
go startSignalHandler(supervisor, bufferSize)
|
||||||
|
|
|
@ -34,8 +34,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
|
||||||
for s := range signals {
|
for s := range signals {
|
||||||
switch s {
|
switch s {
|
||||||
case syscall.SIGTERM, syscall.SIGINT:
|
case syscall.SIGTERM, syscall.SIGINT:
|
||||||
supervisor.Close()
|
supervisor.Stop(signals)
|
||||||
os.Exit(0)
|
|
||||||
case syscall.SIGCHLD:
|
case syscall.SIGCHLD:
|
||||||
exits, err := reap()
|
exits, err := reap()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -46,6 +45,8 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
supervisor.Close()
|
||||||
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func reap() (exits []*containerd.Event, err error) {
|
func reap() (exits []*containerd.Event, err error) {
|
||||||
|
|
|
@ -15,6 +15,7 @@ func (h *DeleteEvent) Handle(e *Event) error {
|
||||||
logrus.WithField("error", err).Error("containerd: deleting container")
|
logrus.WithField("error", err).Error("containerd: deleting container")
|
||||||
}
|
}
|
||||||
ContainersCounter.Dec(1)
|
ContainersCounter.Dec(1)
|
||||||
|
h.s.containerGroup.Done()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,11 @@ package containerd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/containerd/runtime"
|
"github.com/docker/containerd/runtime"
|
||||||
|
@ -56,24 +59,58 @@ func NewSupervisor(stateDir string, tasks chan *StartTask) (*Supervisor, error)
|
||||||
|
|
||||||
type Supervisor struct {
|
type Supervisor struct {
|
||||||
// stateDir is the directory on the system to store container runtime state information.
|
// stateDir is the directory on the system to store container runtime state information.
|
||||||
stateDir string
|
stateDir string
|
||||||
containers map[string]runtime.Container
|
containers map[string]runtime.Container
|
||||||
processes map[int]runtime.Container
|
processes map[int]runtime.Container
|
||||||
handlers map[EventType]Handler
|
handlers map[EventType]Handler
|
||||||
runtime runtime.Runtime
|
runtime runtime.Runtime
|
||||||
journal *journal
|
journal *journal
|
||||||
events chan *Event
|
events chan *Event
|
||||||
tasks chan *StartTask
|
tasks chan *StartTask
|
||||||
subscribers map[subscriber]bool
|
subscribers map[subscriber]bool
|
||||||
machine Machine
|
machine Machine
|
||||||
|
containerGroup sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
type subscriber chan *Event
|
type subscriber chan *Event
|
||||||
|
|
||||||
// need proper close logic for jobs and stuff so that sending to the channels dont panic
|
func (s *Supervisor) Stop(sig chan os.Signal) {
|
||||||
// but can complete jobs
|
// Close the tasks channel so that no new containers get started
|
||||||
|
close(s.tasks)
|
||||||
|
// send a SIGTERM to all containers
|
||||||
|
for id, c := range s.containers {
|
||||||
|
logrus.WithField("id", id).Debug("sending TERM to container processes")
|
||||||
|
procs, err := c.Processes()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithField("id", id).Warn("get container processes")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(procs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
mainProc := procs[0]
|
||||||
|
if err := mainProc.Signal(syscall.SIGTERM); err != nil {
|
||||||
|
pid, _ := mainProc.Pid()
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"id": id,
|
||||||
|
"pid": pid,
|
||||||
|
"error": err,
|
||||||
|
}).Error("send SIGTERM to process")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
logrus.Debug("waiting for containers to exit")
|
||||||
|
s.containerGroup.Wait()
|
||||||
|
logrus.Debug("all containers exited")
|
||||||
|
// stop receiving signals and close the channel
|
||||||
|
signal.Stop(sig)
|
||||||
|
close(sig)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes any open files in the supervisor but expects that Stop has been
|
||||||
|
// callsed so that no more containers are started.
|
||||||
func (s *Supervisor) Close() error {
|
func (s *Supervisor) Close() error {
|
||||||
//TODO: unsubscribe all channels
|
|
||||||
return s.journal.Close()
|
return s.journal.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ func (w *worker) Start() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
w.s.containerGroup.Add(1)
|
||||||
ContainerStartTimer.UpdateSince(started)
|
ContainerStartTimer.UpdateSince(started)
|
||||||
t.Err <- nil
|
t.Err <- nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue