package v1 import ( "encoding/json" "net/http" "strconv" "syscall" "github.com/Sirupsen/logrus" "github.com/docker/containerd" "github.com/docker/containerd/runtime" "github.com/gorilla/mux" "github.com/opencontainers/specs" ) func NewServer(supervisor *containerd.Supervisor) http.Handler { r := mux.NewRouter() s := &server{ supervisor: supervisor, r: r, } // process handlers r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST") r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT") // checkpoint and restore handlers // TODO: PUT handler for adding a checkpoint to containerd?? r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.createCheckpoint).Methods("POST") r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.deleteCheckpoint).Methods("DELETE") r.HandleFunc("/containers/{id:.*}/checkpoint", s.listCheckpoints).Methods("GET") // container handlers r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST") r.HandleFunc("/containers/{id:.*}", s.updateContainer).Methods("PATCH") // internal method for replaying the journal // r.HandleFunc("/event", s.event).Methods("POST") r.HandleFunc("/events", s.events).Methods("GET") // containerd handlers r.HandleFunc("/state", s.state).Methods("GET") return s } type server struct { r *mux.Router supervisor *containerd.Supervisor } func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.r.ServeHTTP(w, r) } func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] var state ContainerState if err := json.NewDecoder(r.Body).Decode(&state); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } e := containerd.NewEvent(containerd.UpdateContainerEventType) e.ID = id if state.Signal != 0 { e.Signal = syscall.Signal(state.Signal) } e.State = &runtime.State{ Status: runtime.Status(string(state.Status)), } s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } func (s *server) events(w http.ResponseWriter, r *http.Request) { events := s.supervisor.Events() enc := json.NewEncoder(w) for evt := range events { var v interface{} switch evt.Type { case containerd.ExitEventType: v = createExitEvent(evt) } if err := enc.Encode(v); err != nil { // TODO: handled closed conn logrus.WithField("error", err).Error("encode event") } } } func createExitEvent(e *containerd.Event) *Event { return &Event{ Type: "exit", ID: e.ID, Status: e.Status, } } func (s *server) event(w http.ResponseWriter, r *http.Request) { var e containerd.Event if err := json.NewDecoder(r.Body).Decode(&e); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } e.Err = make(chan error, 1) s.supervisor.SendEvent(&e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if e.Containers != nil && len(e.Containers) > 0 { if err := s.writeState(w, &e); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func (s *server) addProcess(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] var process specs.Process if err := json.NewDecoder(r.Body).Decode(&process); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } e := containerd.NewEvent(containerd.AddProcessEventType) e.ID = id e.Process = &process s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } p := Process{ Pid: e.Pid, Terminal: process.Terminal, Args: process.Args, Env: process.Env, Cwd: process.Cwd, } p.User.UID = process.User.UID p.User.GID = process.User.GID p.User.AdditionalGids = process.User.AdditionalGids if err := json.NewEncoder(w).Encode(p); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } func (s *server) signalPid(w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) id = vars["id"] spid = vars["pid"] ) pid, err := strconv.Atoi(spid) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } var signal Signal if err := json.NewDecoder(r.Body).Decode(&signal); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } e := containerd.NewEvent(containerd.SignalEventType) e.ID = id e.Pid = pid e.Signal = syscall.Signal(signal.Signal) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { status := http.StatusInternalServerError if err == containerd.ErrContainerNotFound { status = http.StatusNotFound } http.Error(w, err.Error(), status) return } } func (s *server) state(w http.ResponseWriter, r *http.Request) { e := containerd.NewEvent(containerd.GetContainerEventType) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if err := s.writeState(w, e); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } func (s *server) writeState(w http.ResponseWriter, e *containerd.Event) error { m := s.supervisor.Machine() state := State{ Containers: []Container{}, Machine: Machine{ ID: m.ID, Cpus: m.Cpus, Memory: m.Memory, }, } for _, c := range e.Containers { processes, err := c.Processes() if err != nil { logrus.WithFields(logrus.Fields{ "error": err, "container": c.ID(), }).Error("get processes for container") } var pids []Process for _, p := range processes { if proc := createProcess(p); proc != nil { pids = append(pids, *proc) } } state.Containers = append(state.Containers, Container{ ID: c.ID(), BundlePath: c.Path(), Processes: pids, State: &ContainerState{ Status: Status(c.State().Status), }, }) } return json.NewEncoder(w).Encode(&state) } func createProcess(in runtime.Process) *Process { pid, err := in.Pid() if err != nil { logrus.WithField("error", err).Error("get process pid") return nil } process := in.Spec() p := &Process{ Pid: pid, Terminal: process.Terminal, Args: process.Args, Env: process.Env, Cwd: process.Cwd, } p.User.UID = process.User.UID p.User.GID = process.User.GID p.User.AdditionalGids = process.User.AdditionalGids return p } func (s *server) createContainer(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] var c Container if err := json.NewDecoder(r.Body).Decode(&c); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if c.BundlePath == "" { http.Error(w, "empty bundle path", http.StatusBadRequest) return } e := containerd.NewEvent(containerd.StartContainerEventType) e.ID = id e.BundlePath = c.BundlePath if c.Checkpoint != "" { e.Checkpoint = &runtime.Checkpoint{ Name: c.Checkpoint, } } e.Stdio = &runtime.Stdio{ Stderr: c.Stderr, Stdout: c.Stdout, } s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { code := http.StatusInternalServerError if err == containerd.ErrBundleNotFound { code = http.StatusNotFound } http.Error(w, err.Error(), code) return } w.WriteHeader(http.StatusCreated) } func (s *server) listCheckpoints(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] e := containerd.NewEvent(containerd.GetContainerEventType) s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } var container runtime.Container for _, c := range e.Containers { if c.ID() == id { container = c break } } if container == nil { http.Error(w, "container not found", http.StatusNotFound) return } checkpoints, err := container.Checkpoints() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } out := []Checkpoint{} for _, c := range checkpoints { out = append(out, Checkpoint{ Name: c.Name, Tcp: c.Tcp, Shell: c.Shell, UnixSockets: c.UnixSockets, Timestamp: c.Timestamp, }) } if err := json.NewEncoder(w).Encode(out); err != nil { logrus.WithField("error", err).Error("encode checkpoints") http.Error(w, err.Error(), http.StatusInternalServerError) return } } func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) id = vars["id"] name = vars["name"] ) var cp Checkpoint // most options to the checkpoint action can be left out so don't // decode unless the client passed anything in the body. if r.ContentLength > 0 { if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } } e := containerd.NewEvent(containerd.CreateCheckpointEventType) e.ID = id e.Checkpoint = &runtime.Checkpoint{ Name: name, Exit: cp.Exit, Tcp: cp.Tcp, UnixSockets: cp.UnixSockets, Shell: cp.Shell, } s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusCreated) } func (s *server) deleteCheckpoint(w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) id = vars["id"] name = vars["name"] ) if name == "" { http.Error(w, "checkpoint name cannot be empty", http.StatusBadRequest) return } var cp Checkpoint if r.ContentLength > 0 { if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } } e := containerd.NewEvent(containerd.DeleteCheckpointEventType) e.ID = id e.Checkpoint = &runtime.Checkpoint{ Name: name, } s.supervisor.SendEvent(e) if err := <-e.Err; err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } }