From 277cc920a4e1b88907dd28ca589bc66c60121649 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 1 Feb 2016 15:07:02 -0800 Subject: [PATCH] Implement checkpoint / restore for shim Signed-off-by: Michael Crosby --- api/grpc/server/server.go | 30 ++++---- containerd-shim/main.go | 16 ++++- containerd-shim/process.go | 140 ++++++++++++++++++++++++++++++++----- ctr/checkpoint.go | 7 +- runtime/container.go | 88 ++++++++++++++++++++--- runtime/runtime.go | 12 ++-- supervisor/checkpoint.go | 26 +++---- supervisor/worker.go | 32 +++------ 8 files changed, 258 insertions(+), 93 deletions(-) diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index bbc45fc..f89380d 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -149,22 +149,20 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR return nil, grpc.Errorf(codes.NotFound, "no such containers") } var out []*types.Checkpoint - /* - checkpoints, err := container.Checkpoints() - if err != nil { - return nil, err - } - for _, c := range checkpoints { - out = append(out, &types.Checkpoint{ - Name: c.Name, - Tcp: c.Tcp, - Shell: c.Shell, - UnixSockets: c.UnixSockets, - // TODO: figure out timestamp - //Timestamp: c.Timestamp, - }) - } - */ + checkpoints, err := container.Checkpoints() + if err != nil { + return nil, err + } + for _, c := range checkpoints { + out = append(out, &types.Checkpoint{ + Name: c.Name, + Tcp: c.Tcp, + Shell: c.Shell, + UnixSockets: c.UnixSockets, + // TODO: figure out timestamp + //Timestamp: c.Timestamp, + }) + } return &types.ListCheckpointResponse{Checkpoints: out}, nil } diff --git a/containerd-shim/main.go b/containerd-shim/main.go index 7450525..b3311e4 100644 --- a/containerd-shim/main.go +++ b/containerd-shim/main.go @@ -11,13 +11,25 @@ import ( "github.com/docker/containerd/util" ) -var fexec bool +var ( + fexec bool + fcheckpoint string +) func init() { flag.BoolVar(&fexec, "exec", false, "exec a process instead of starting the init") + flag.StringVar(&fcheckpoint, "checkpoint", "", "start container from an existing checkpoint") flag.Parse() } +func setupLogger() { + f, err := os.OpenFile("/tmp/shim.log", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0755) + if err != nil { + panic(err) + } + logrus.SetOutput(f) +} + // containerd-shim is a small shim that sits in front of a runc implementation // that allows it to be repartented to init and handle reattach from the caller. // @@ -38,7 +50,7 @@ func main() { logrus.WithField("error", err).Fatal("shim: open exit pipe") } defer f.Close() - p, err := newProcess(flag.Arg(0), flag.Arg(1), fexec) + p, err := newProcess(flag.Arg(0), flag.Arg(1), fexec, fcheckpoint) if err != nil { logrus.WithField("error", err).Fatal("shim: create new process") } diff --git a/containerd-shim/process.go b/containerd-shim/process.go index cc7e592..53daf77 100644 --- a/containerd-shim/process.go +++ b/containerd-shim/process.go @@ -10,6 +10,7 @@ import ( "strconv" "syscall" + "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" "github.com/opencontainers/specs" ) @@ -21,28 +22,59 @@ type process struct { s specs.Process exec bool containerPid int + checkpoint *runtime.Checkpoint } -func newProcess(id, bundle string, exec bool) (*process, error) { - f, err := os.Open("process.json") - if err != nil { - return nil, err - } - defer f.Close() +func newProcess(id, bundle string, exec bool, checkpoint string) (*process, error) { p := &process{ id: id, bundle: bundle, exec: exec, } - if err := json.NewDecoder(f).Decode(&p.s); err != nil { + s, err := loadProcess() + if err != nil { return nil, err } + p.s = *s + if checkpoint != "" { + cpt, err := loadCheckpoint(bundle, checkpoint) + if err != nil { + return nil, err + } + p.checkpoint = cpt + } if err := p.openIO(); err != nil { return nil, err } return p, nil } +func loadProcess() (*specs.Process, error) { + f, err := os.Open("process.json") + if err != nil { + return nil, err + } + defer f.Close() + var s specs.Process + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func loadCheckpoint(bundle, name string) (*runtime.Checkpoint, error) { + f, err := os.Open(filepath.Join(bundle, "checkpoints", name, "config.json")) + if err != nil { + return nil, err + } + defer f.Close() + var cpt runtime.Checkpoint + if err := json.NewDecoder(f).Decode(&cpt); err != nil { + return nil, err + } + return &cpt, nil +} + func (p *process) start() error { cwd, err := os.Getwd() if err != nil { @@ -53,17 +85,37 @@ func (p *process) start() error { } if p.exec { args = append(args, "exec", - "--process", filepath.Join(cwd, "process.json")) + "--process", filepath.Join(cwd, "process.json"), + "--console", p.stdio.console, + ) + } else if p.checkpoint != nil { + args = append(args, "restore", + "--image-path", filepath.Join(p.bundle, "checkpoints", p.checkpoint.Name), + ) + add := func(flags ...string) { + args = append(args, flags...) + } + if p.checkpoint.Shell { + add("--shell-job") + } + if p.checkpoint.Tcp { + add("--tcp-established") + } + if p.checkpoint.UnixSockets { + add("--ext-unix-sk") + } } else { args = append(args, "start", - "--bundle", p.bundle) + "--bundle", p.bundle, + "--console", p.stdio.console, + ) } args = append(args, "-d", - "--console", p.stdio.console, "--pid-file", filepath.Join(cwd, "pid"), ) cmd := exec.Command("runc", args...) + cmd.Dir = p.bundle cmd.Stdin = p.stdio.stdin cmd.Stdout = p.stdio.stdout cmd.Stderr = p.stdio.stderr @@ -114,9 +166,7 @@ func (p *process) openIO() error { if err != nil { return err } - go func() { - io.Copy(console, stdin) - }() + go io.Copy(console, stdin) stdout, err := os.OpenFile("stdout", syscall.O_RDWR, 0) if err != nil { return err @@ -127,21 +177,75 @@ func (p *process) openIO() error { }() return nil } + i, err := p.initializeIO(int(p.s.User.UID)) + if err != nil { + return err + } // non-tty - for name, dest := range map[string]**os.File{ - "stdin": &p.stdio.stdin, - "stdout": &p.stdio.stdout, - "stderr": &p.stdio.stderr, + for name, dest := range map[string]func(f *os.File){ + "stdin": func(f *os.File) { + go io.Copy(i.Stdin, f) + }, + "stdout": func(f *os.File) { + go io.Copy(f, i.Stdout) + }, + "stderr": func(f *os.File) { + go io.Copy(f, i.Stderr) + }, } { f, err := os.OpenFile(name, syscall.O_RDWR, 0) if err != nil { return err } - *dest = f + dest(f) } return nil } +type IO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func (p *process) initializeIO(rootuid int) (i *IO, err error) { + var fds []uintptr + i = &IO{} + // cleanup in case of an error + defer func() { + if err != nil { + for _, fd := range fds { + syscall.Close(int(fd)) + } + } + }() + // STDIN + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + fds = append(fds, r.Fd(), w.Fd()) + p.stdio.stdin, i.Stdin = r, w + // STDOUT + if r, w, err = os.Pipe(); err != nil { + return nil, err + } + fds = append(fds, r.Fd(), w.Fd()) + p.stdio.stdout, i.Stdout = w, r + // STDERR + if r, w, err = os.Pipe(); err != nil { + return nil, err + } + fds = append(fds, r.Fd(), w.Fd()) + p.stdio.stderr, i.Stderr = w, r + // change ownership of the pipes incase we are in a user namespace + for _, fd := range fds { + if err := syscall.Fchown(int(fd), rootuid, rootuid); err != nil { + return nil, err + } + } + return i, nil +} func (p *process) Close() error { return p.stdio.Close() } diff --git a/ctr/checkpoint.go b/ctr/checkpoint.go index 43049ee..5332a84 100644 --- a/ctr/checkpoint.go +++ b/ctr/checkpoint.go @@ -16,6 +16,7 @@ var checkpointCommand = cli.Command{ Subcommands: []cli.Command{ listCheckpointCommand, createCheckpointCommand, + deleteCheckpointCommand, }, Action: listCheckpoints, } @@ -86,7 +87,11 @@ var createCheckpointCommand = cli.Command{ if _, err := c.CreateCheckpoint(netcontext.Background(), &types.CreateCheckpointRequest{ Id: containerID, Checkpoint: &types.Checkpoint{ - Name: name, + Name: name, + Exit: context.Bool("exit"), + Tcp: context.Bool("tcp"), + Shell: context.Bool("shell"), + UnixSockets: context.Bool("unix-sockets"), }, }); err != nil { fatal(err.Error(), 1) diff --git a/runtime/container.go b/runtime/container.go index c7667f0..5ca4b83 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -7,6 +7,7 @@ import ( "os/exec" "path/filepath" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/opencontainers/specs" @@ -18,7 +19,7 @@ type Container interface { // Path returns the path to the bundle Path() string // Start starts the init process of the container - Start() (Process, error) + Start(checkpoint string) (Process, error) // Exec starts another process in an existing container Exec(string, specs.Process) (Process, error) // Delete removes the container's state and any resources @@ -31,18 +32,16 @@ type Container interface { Resume() error // Pause pauses a running container Pause() error + // RemoveProcess removes the specified process from the container RemoveProcess(string) error // Checkpoints returns all the checkpoints for a container - // Checkpoints() ([]Checkpoint, error) + Checkpoints() ([]Checkpoint, error) // Checkpoint creates a new checkpoint - // Checkpoint(Checkpoint) error + Checkpoint(Checkpoint) error // DeleteCheckpoint deletes the checkpoint for the provided name - // DeleteCheckpoint(name string) error - // Restore restores the container to that of the checkpoint provided by name - // Restore(name string) error + DeleteCheckpoint(name string) error // Stats returns realtime container stats and resource information - // Stats() (*Stat, error) - // OOM signals the channel if the container received an OOM notification + // Stats() (*Stat, error) // OOM signals the channel if the container received an OOM notification // OOM() (<-chan struct{}, error) } @@ -138,12 +137,12 @@ func (c *container) Path() string { return c.bundle } -func (c *container) Start() (Process, error) { +func (c *container) Start(checkpoint string) (Process, error) { processRoot := filepath.Join(c.root, c.id, InitProcessID) if err := os.MkdirAll(processRoot, 0755); err != nil { return nil, err } - cmd := exec.Command("containerd-shim", c.id, c.bundle) + cmd := exec.Command("containerd-shim", "-checkpoint", checkpoint, c.id, c.bundle) cmd.Dir = processRoot cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, @@ -225,3 +224,72 @@ func (c *container) RemoveProcess(pid string) error { delete(c.processes, pid) return nil } + +func (c *container) Checkpoints() ([]Checkpoint, error) { + dirs, err := ioutil.ReadDir(filepath.Join(c.bundle, "checkpoints")) + if err != nil { + return nil, err + } + var out []Checkpoint + for _, d := range dirs { + if !d.IsDir() { + continue + } + path := filepath.Join(c.bundle, "checkpoints", d.Name(), "config.json") + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var cpt Checkpoint + if err := json.Unmarshal(data, &cpt); err != nil { + return nil, err + } + out = append(out, cpt) + } + return out, nil +} + +func (c *container) Checkpoint(cpt Checkpoint) error { + if err := os.MkdirAll(filepath.Join(c.bundle, "checkpoints"), 0755); err != nil { + return err + } + path := filepath.Join(c.bundle, "checkpoints", cpt.Name) + if err := os.Mkdir(path, 0755); err != nil { + return err + } + f, err := os.Create(filepath.Join(path, "config.json")) + if err != nil { + return err + } + cpt.Created = time.Now() + err = json.NewEncoder(f).Encode(cpt) + f.Close() + if err != nil { + return err + } + args := []string{ + "--id", c.id, + "checkpoint", + "--image-path", path, + } + add := func(flags ...string) { + args = append(args, flags...) + } + if !cpt.Exit { + add("--leave-running") + } + if cpt.Shell { + add("--shell-job") + } + if cpt.Tcp { + add("--tcp-established") + } + if cpt.UnixSockets { + add("--ext-unix-sk") + } + return exec.Command("runc", args...).Run() +} + +func (c *container) DeleteCheckpoint(name string) error { + return os.RemoveAll(filepath.Join(c.bundle, "checkpoints", name)) +} diff --git a/runtime/runtime.go b/runtime/runtime.go index 7c5949a..6fa1dc2 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -48,15 +48,15 @@ type Stat struct { type Checkpoint struct { // Timestamp is the time that checkpoint happened - Timestamp time.Time + Created time.Time `json:"created"` // Name is the name of the checkpoint - Name string + Name string `json:"name"` // Tcp checkpoints open tcp connections - Tcp bool + Tcp bool `json:"tcp"` // UnixSockets persists unix sockets in the checkpoint - UnixSockets bool + UnixSockets bool `json:"unixSockets"` // Shell persists tty sessions in the checkpoint - Shell bool + Shell bool `json:"shell"` // Exit exits the container after the checkpoint is finished - Exit bool + Exit bool `json:"exit"` } diff --git a/supervisor/checkpoint.go b/supervisor/checkpoint.go index d65febf..a5ae540 100644 --- a/supervisor/checkpoint.go +++ b/supervisor/checkpoint.go @@ -5,14 +5,11 @@ type CreateCheckpointEvent struct { } func (h *CreateCheckpointEvent) Handle(e *Event) error { - /* - i, ok := h.s.containers[e.ID] - if !ok { - return ErrContainerNotFound - } - */ - return nil - // return i.container.Checkpoint(*e.Checkpoint) + i, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + return i.container.Checkpoint(*e.Checkpoint) } type DeleteCheckpointEvent struct { @@ -20,12 +17,9 @@ type DeleteCheckpointEvent struct { } func (h *DeleteCheckpointEvent) Handle(e *Event) error { - /* - i, ok := h.s.containers[e.ID] - if !ok { - return ErrContainerNotFound - } - */ - return nil - // return i.container.DeleteCheckpoint(e.Checkpoint.Name) + i, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + return i.container.DeleteCheckpoint(e.Checkpoint.Name) } diff --git a/supervisor/worker.go b/supervisor/worker.go index 80b4e1d..597dbac 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -37,30 +37,14 @@ type worker struct { func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { - var ( - err error - process runtime.Process - started = time.Now() - ) - if t.Checkpoint != "" { - /* - if err := t.Container.Restore(t.Checkpoint); err != nil { - evt := NewEvent(DeleteEventType) - evt.ID = t.Container.ID() - w.s.SendEvent(evt) - t.Err <- err - continue - } - */ - } else { - process, err = t.Container.Start() - if err != nil { - evt := NewEvent(DeleteEventType) - evt.ID = t.Container.ID() - w.s.SendEvent(evt) - t.Err <- err - continue - } + started := time.Now() + process, err := t.Container.Start(t.Checkpoint) + if err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue } /* if w.s.notifier != nil {