From ae9b2bafd5ecf947204202993009cdc922d1c1be Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 3 Dec 2015 16:07:53 -0800 Subject: [PATCH] Add basic checkpoint and restore support Signed-off-by: Michael Crosby --- api/v1/server.go | 90 ++++++++++++++++++++++++++++++++++++++++++-- api/v1/types.go | 10 +++++ checkpoint.go | 13 +++++++ event.go | 18 +++++---- linux/linux.go | 53 ++++++++++++++++++++++++++ runtime/container.go | 18 +++++++++ start.go | 9 ++++- supervisor.go | 17 +++++---- worker.go | 32 ++++++++++++---- 9 files changed, 231 insertions(+), 29 deletions(-) create mode 100644 checkpoint.go diff --git a/api/v1/server.go b/api/v1/server.go index 16847e7..a105f85 100644 --- a/api/v1/server.go +++ b/api/v1/server.go @@ -19,17 +19,25 @@ func NewServer(supervisor *containerd.Supervisor) http.Handler { supervisor: supervisor, r: r, } - // TODO: add container stats - // TODO: add container checkpoint - // TODO: add container restore - // TODO: set prctl child subreaper + // process handlers r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST") r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT") + + // checkpoint and restore handlers + // TODO: PUT handler for adding a checkpoint to containerd?? + r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.createCheckpoint).Methods("POST") + // r.HandleFunc("/containers/{id:.*}/checkpoint/{cid:.*}", s.deleteCheckpoint).Methods("DELETE") + r.HandleFunc("/containers/{id:.*}/checkpoint", s.listCheckpoints).Methods("GET") + + // container handlers r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST") r.HandleFunc("/containers/{id:.*}", s.updateContainer).Methods("PATCH") + // internal method for replaying the journal r.HandleFunc("/event", s.event).Methods("POST") r.HandleFunc("/events", s.events).Methods("GET") + + // containerd handlers r.HandleFunc("/state", s.state).Methods("GET") return s } @@ -252,6 +260,12 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { e := containerd.NewEvent(containerd.StartContainerEventType) e.ID = id e.BundlePath = c.BundlePath + if c.Checkpoint != nil { + e.Checkpoint = &runtime.Checkpoint{ + Name: c.Checkpoint.Name, + Path: c.Checkpoint.Path, + } + } e.Stdio = &runtime.Stdio{ Stderr: c.Stderr, Stdout: c.Stdout, @@ -267,3 +281,71 @@ func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusCreated) } + +func (s *server) listCheckpoints(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + e := containerd.NewEvent(containerd.GetContainerEventType) + s.supervisor.SendEvent(e) + if err := <-e.Err; err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + var container runtime.Container + for _, c := range e.Containers { + if c.ID() == id { + container = c + break + } + } + if container == nil { + http.Error(w, "container not found", http.StatusNotFound) + return + } + checkpoints, err := container.Checkpoints() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + out := []Checkpoint{} + for _, c := range checkpoints { + out = append(out, Checkpoint{ + Path: c.Path, + Name: c.Name, + Tcp: c.Tcp, + Shell: c.Shell, + UnixSockets: c.UnixSockets, + }) + } +} + +func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) { + var ( + vars = mux.Vars(r) + id = vars["id"] + name = vars["name"] + ) + var cp Checkpoint + if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + e := containerd.NewEvent(containerd.CreateCheckpointEventType) + e.ID = id + e.Checkpoint = &runtime.Checkpoint{ + Name: name, + Path: cp.Path, + Running: cp.Running, + Tcp: cp.Tcp, + UnixSockets: cp.UnixSockets, + Shell: cp.Shell, + } + s.supervisor.SendEvent(e) + if err := <-e.Err; err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusCreated) +} + +func (s *server) deleteCheckpoint(w http.ResponseWriter, r *http.Request) { +} diff --git a/api/v1/types.go b/api/v1/types.go index cbf543d..a9f93ac 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -28,6 +28,7 @@ type Container struct { Stdout string `json:"stdout,omitempty"` Stderr string `json:"stderr,omitempty"` State *ContainerState `json:"state,omitempty"` + Checkpoint *Checkpoint `json:"checkpoint,omitempty"` } type User struct { @@ -54,3 +55,12 @@ type Event struct { ID string `json:"id,omitempty"` Status int `json:"status,omitempty"` } + +type Checkpoint struct { + Name string `json:"name,omitempty"` + Path string `json:"path"` + Running bool `json:"running,omitempty"` + Tcp bool `json:"tcp"` + UnixSockets bool `json:"unixSockets"` + Shell bool `json:"shell"` +} diff --git a/checkpoint.go b/checkpoint.go new file mode 100644 index 0000000..55436c2 --- /dev/null +++ b/checkpoint.go @@ -0,0 +1,13 @@ +package containerd + +type CreateCheckpointEvent struct { + s *Supervisor +} + +func (h *CreateCheckpointEvent) Handle(e *Event) error { + container, ok := h.s.containers[e.ID] + if !ok { + return ErrContainerNotFound + } + return container.Checkpoint(*e.Checkpoint) +} diff --git a/event.go b/event.go index a47a6fa..7425f00 100644 --- a/event.go +++ b/event.go @@ -11,14 +11,15 @@ import ( type EventType string const ( - ExecExitEventType EventType = "execExit" - ExitEventType EventType = "exit" - StartContainerEventType EventType = "startContainer" - DeleteEventType EventType = "deleteContainerEvent" - GetContainerEventType EventType = "getContainer" - SignalEventType EventType = "signal" - AddProcessEventType EventType = "addProcess" - UpdateContainerEventType EventType = "updateContainer" + ExecExitEventType EventType = "execExit" + ExitEventType EventType = "exit" + StartContainerEventType EventType = "startContainer" + DeleteEventType EventType = "deleteContainerEvent" + GetContainerEventType EventType = "getContainer" + SignalEventType EventType = "signal" + AddProcessEventType EventType = "addProcess" + UpdateContainerEventType EventType = "updateContainer" + CreateCheckpointEventType EventType = "createCheckpoint" ) func NewEvent(t EventType) *Event { @@ -41,6 +42,7 @@ type Event struct { Process *specs.Process `json:"process,omitempty"` State *runtime.State `json:"state,omitempty"` Containers []runtime.Container `json:"-"` + Checkpoint *runtime.Checkpoint `json:"checkpoint,omitempty"` Err chan error `json:"-"` } diff --git a/linux/linux.go b/linux/linux.go index e902fa6..de84aec 100644 --- a/linux/linux.go +++ b/linux/linux.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" @@ -190,6 +191,57 @@ type libcontainerContainer struct { exitStatus int exited bool path string + checkpoints map[string]runtime.Checkpoint +} + +func (c *libcontainerContainer) Checkpoints() ([]runtime.Checkpoint, error) { + out := []runtime.Checkpoint{} + for _, cp := range c.checkpoints { + out = append(out, cp) + } + return out, nil +} + +func (c *libcontainerContainer) Checkpoint(cp runtime.Checkpoint) error { + opts := c.createCheckpointOpts(&cp) + if err := os.MkdirAll(opts.ImagesDirectory, 0755); err != nil { + return err + } + if err := c.c.Checkpoint(opts); err != nil { + return err + } + cp.Timestamp = time.Now() + c.checkpoints[cp.Name] = cp + return nil +} + +func (c *libcontainerContainer) createCheckpointOpts(cp *runtime.Checkpoint) *libcontainer.CriuOpts { + opts := libcontainer.CriuOpts{} + opts.LeaveRunning = cp.Running + opts.ShellJob = cp.Shell + opts.TcpEstablished = cp.Tcp + opts.ExternalUnixConnections = cp.UnixSockets + if cp.Path == "" { + cp.Path = filepath.Join(c.path, "checkpoints", cp.Name) + } + opts.ImagesDirectory = cp.Path + return &opts +} + +func (c *libcontainerContainer) Restore(path, name string) error { + if path == "" { + path = filepath.Join(c.path, "checkpoints", name) + } + var opts libcontainer.CriuOpts + if cp, ok := c.checkpoints[name]; ok { + opts = *c.createCheckpointOpts(&cp) + } else { + opts.ImagesDirectory = path + } + if err := c.c.Restore(c.initProcess.process, &opts); err != nil { + return err + } + return nil } func (c *libcontainerContainer) Resume() error { @@ -300,6 +352,7 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio c := &libcontainerContainer{ c: container, additionalProcesses: make(map[int]*libcontainerProcess), + checkpoints: make(map[string]runtime.Checkpoint), initProcess: &libcontainerProcess{ process: process, spec: spec.Process, diff --git a/runtime/container.go b/runtime/container.go index 53d2bc1..379e3b5 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -2,6 +2,7 @@ package runtime import ( "os" + "time" "github.com/opencontainers/specs" ) @@ -11,6 +12,7 @@ type Process interface { Spec() specs.Process Signal(os.Signal) error } + type Status string const ( @@ -27,6 +29,16 @@ type Stdio struct { Stdout string `json:"stdout,omitempty"` } +type Checkpoint struct { + Timestamp time.Time `json:"timestamp,omitempty"` + Path string `json:"path,omitempty"` + Name string `json:"name,omitempty"` + Tcp bool `json:"tcp"` + UnixSockets bool `json:"unixSockets"` + Shell bool `json:"shell"` + Running bool `json:"running,omitempty"` +} + type Container interface { // ID returns the container ID ID() string @@ -50,4 +62,10 @@ type Container interface { Resume() error // Pause pauses a running container Pause() error + + Checkpoints() ([]Checkpoint, error) + + Checkpoint(Checkpoint) error + + Restore(path, name string) error } diff --git a/start.go b/start.go index ed85cd6..8477b50 100644 --- a/start.go +++ b/start.go @@ -11,9 +11,16 @@ func (h *StartEvent) Handle(e *Event) error { } h.s.containers[e.ID] = container ContainersCounter.Inc(1) - h.s.tasks <- &StartTask{ + task := &StartTask{ Err: e.Err, Container: container, } + if e.Checkpoint != nil { + task.Checkpoint = &Checkpoint{ + Name: e.Checkpoint.Name, + Path: e.Checkpoint.Path, + } + } + h.s.tasks <- task return errDeferedResponse } diff --git a/supervisor.go b/supervisor.go index 4e4a153..a6d470e 100644 --- a/supervisor.go +++ b/supervisor.go @@ -40,14 +40,15 @@ func NewSupervisor(stateDir string, tasks chan *StartTask) (*Supervisor, error) } // register default event handlers s.handlers = map[EventType]Handler{ - ExecExitEventType: &ExecExitEvent{s}, - ExitEventType: &ExitEvent{s}, - StartContainerEventType: &StartEvent{s}, - DeleteEventType: &DeleteEvent{s}, - GetContainerEventType: &GetContainersEvent{s}, - SignalEventType: &SignalEvent{s}, - AddProcessEventType: &AddProcessEvent{s}, - UpdateContainerEventType: &UpdateEvent{s}, + ExecExitEventType: &ExecExitEvent{s}, + ExitEventType: &ExitEvent{s}, + StartContainerEventType: &StartEvent{s}, + DeleteEventType: &DeleteEvent{s}, + GetContainerEventType: &GetContainersEvent{s}, + SignalEventType: &SignalEvent{s}, + AddProcessEventType: &AddProcessEvent{s}, + UpdateContainerEventType: &UpdateEvent{s}, + CreateCheckpointEventType: &CreateCheckpointEvent{s}, } // start the container workers for concurrent container starts return s, nil diff --git a/worker.go b/worker.go index eae402b..9256b78 100644 --- a/worker.go +++ b/worker.go @@ -11,9 +11,15 @@ type Worker interface { Start() } +type Checkpoint struct { + Path string + Name string +} + type StartTask struct { - Container runtime.Container - Err chan error + Container runtime.Container + Checkpoint *Checkpoint + Err chan error } func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker { @@ -32,12 +38,22 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() - if err := t.Container.Start(); err != nil { - evt := NewEvent(DeleteEventType) - evt.ID = t.Container.ID() - w.s.SendEvent(evt) - t.Err <- err - continue + if t.Checkpoint != nil { + if err := t.Container.Restore(t.Checkpoint.Path, t.Checkpoint.Name); err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } + } else { + if err := t.Container.Start(); err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } } ContainerStartTimer.UpdateSince(started) t.Err <- nil