Replace rest api with grpc api

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-12-09 15:39:27 -08:00
parent 11c27935d0
commit 1d3349128e
4 changed files with 6 additions and 572 deletions

View File

@ -1,101 +0,0 @@
package v1
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
)
func NewClient(addr string) *Client {
return &Client{
addr: addr,
}
}
type Client struct {
addr string
}
type StartOpts struct {
Path string
Checkpoint string
}
// Start starts a container with the specified id and path to the container's
// bundle on the system.
func (c *Client) Start(id string, opts StartOpts) error {
container := Container{
BundlePath: opts.Path,
Checkpoint: opts.Checkpoint,
}
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(container); err != nil {
return err
}
r, err := http.Post(c.addr+"/containers/"+id, "application/json", buf)
if err != nil {
return err
}
r.Body.Close()
if r.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status %d", r.StatusCode)
}
return nil
}
func (c *Client) State() ([]Container, error) {
r, err := http.Get(c.addr + "/state")
if err != nil {
return nil, err
}
var s State
if err := json.NewDecoder(r.Body).Decode(&s); err != nil {
return nil, err
}
r.Body.Close()
return s.Containers, nil
}
func (c *Client) SignalProcess(id string, pid, signal int) error {
sig := Signal{
Signal: signal,
}
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(sig); err != nil {
return err
}
r, err := http.Post(c.addr+"/containers/"+id+"/process/"+strconv.Itoa(pid), "application/json", buf)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) Checkpoints(id string) ([]Checkpoint, error) {
r, err := http.Get(c.addr + "/containers/" + id + "/checkpoint")
if err != nil {
return nil, err
}
defer r.Body.Close()
var checkpoints []Checkpoint
if err := json.NewDecoder(r.Body).Decode(&checkpoints); err != nil {
return nil, err
}
return checkpoints, nil
}
func (c *Client) CreateCheckpoint(id, name string, cp Checkpoint) error {
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(cp); err != nil {
return err
}
r, err := http.Post(c.addr+"/containers/"+id+"/checkpoint", "application/json", buf)
if err != nil {
return err
}
r.Body.Close()
return nil
}

View File

@ -1,387 +0,0 @@
package v1
import (
"encoding/json"
"net/http"
"strconv"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd"
"github.com/docker/containerd/runtime"
"github.com/gorilla/mux"
"github.com/opencontainers/specs"
)
func NewServer(supervisor *containerd.Supervisor) http.Handler {
r := mux.NewRouter()
s := &server{
supervisor: supervisor,
r: r,
}
// process handlers
r.HandleFunc("/containers/{id:.*}/process/{pid:.*}", s.signalPid).Methods("POST")
r.HandleFunc("/containers/{id:.*}/process", s.addProcess).Methods("PUT")
// checkpoint and restore handlers
// TODO: PUT handler for adding a checkpoint to containerd??
r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.createCheckpoint).Methods("POST")
r.HandleFunc("/containers/{id:.*}/checkpoint/{name:.*}", s.deleteCheckpoint).Methods("DELETE")
r.HandleFunc("/containers/{id:.*}/checkpoint", s.listCheckpoints).Methods("GET")
// container handlers
r.HandleFunc("/containers/{id:.*}", s.createContainer).Methods("POST")
r.HandleFunc("/containers/{id:.*}", s.updateContainer).Methods("PATCH")
// internal method for replaying the journal
// r.HandleFunc("/event", s.event).Methods("POST")
r.HandleFunc("/events", s.events).Methods("GET")
// containerd handlers
r.HandleFunc("/state", s.state).Methods("GET")
return s
}
type server struct {
r *mux.Router
supervisor *containerd.Supervisor
}
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.r.ServeHTTP(w, r)
}
func (s *server) updateContainer(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var state ContainerState
if err := json.NewDecoder(r.Body).Decode(&state); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e := containerd.NewEvent(containerd.UpdateContainerEventType)
e.ID = id
if state.Signal != 0 {
e.Signal = syscall.Signal(state.Signal)
}
e.State = &runtime.State{
Status: runtime.Status(string(state.Status)),
}
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *server) events(w http.ResponseWriter, r *http.Request) {
events := s.supervisor.Events()
enc := json.NewEncoder(w)
for evt := range events {
var v interface{}
switch evt.Type {
case containerd.ExitEventType:
v = createExitEvent(evt)
}
if err := enc.Encode(v); err != nil {
// TODO: handled closed conn
logrus.WithField("error", err).Error("encode event")
}
}
}
func createExitEvent(e *containerd.Event) *Event {
return &Event{
Type: "exit",
ID: e.ID,
Status: e.Status,
}
}
func (s *server) event(w http.ResponseWriter, r *http.Request) {
var e containerd.Event
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e.Err = make(chan error, 1)
s.supervisor.SendEvent(&e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if e.Containers != nil && len(e.Containers) > 0 {
if err := s.writeState(w, &e); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func (s *server) addProcess(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var process specs.Process
if err := json.NewDecoder(r.Body).Decode(&process); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e := containerd.NewEvent(containerd.AddProcessEventType)
e.ID = id
e.Process = &process
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
p := Process{
Pid: e.Pid,
Terminal: process.Terminal,
Args: process.Args,
Env: process.Env,
Cwd: process.Cwd,
}
p.User.UID = process.User.UID
p.User.GID = process.User.GID
p.User.AdditionalGids = process.User.AdditionalGids
if err := json.NewEncoder(w).Encode(p); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *server) signalPid(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
id = vars["id"]
spid = vars["pid"]
)
pid, err := strconv.Atoi(spid)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var signal Signal
if err := json.NewDecoder(r.Body).Decode(&signal); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e := containerd.NewEvent(containerd.SignalEventType)
e.ID = id
e.Pid = pid
e.Signal = syscall.Signal(signal.Signal)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
status := http.StatusInternalServerError
if err == containerd.ErrContainerNotFound {
status = http.StatusNotFound
}
http.Error(w, err.Error(), status)
return
}
}
func (s *server) state(w http.ResponseWriter, r *http.Request) {
e := containerd.NewEvent(containerd.GetContainerEventType)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := s.writeState(w, e); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *server) writeState(w http.ResponseWriter, e *containerd.Event) error {
m := s.supervisor.Machine()
state := State{
Containers: []Container{},
Machine: Machine{
ID: m.ID,
Cpus: m.Cpus,
Memory: m.Memory,
},
}
for _, c := range e.Containers {
processes, err := c.Processes()
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"container": c.ID(),
}).Error("get processes for container")
}
var pids []Process
for _, p := range processes {
if proc := createProcess(p); proc != nil {
pids = append(pids, *proc)
}
}
state.Containers = append(state.Containers, Container{
ID: c.ID(),
BundlePath: c.Path(),
Processes: pids,
State: &ContainerState{
Status: Status(c.State().Status),
},
})
}
return json.NewEncoder(w).Encode(&state)
}
func createProcess(in runtime.Process) *Process {
pid, err := in.Pid()
if err != nil {
logrus.WithField("error", err).Error("get process pid")
return nil
}
process := in.Spec()
p := &Process{
Pid: pid,
Terminal: process.Terminal,
Args: process.Args,
Env: process.Env,
Cwd: process.Cwd,
}
p.User.UID = process.User.UID
p.User.GID = process.User.GID
p.User.AdditionalGids = process.User.AdditionalGids
return p
}
func (s *server) createContainer(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
var c Container
if err := json.NewDecoder(r.Body).Decode(&c); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if c.BundlePath == "" {
http.Error(w, "empty bundle path", http.StatusBadRequest)
return
}
e := containerd.NewEvent(containerd.StartContainerEventType)
e.ID = id
e.BundlePath = c.BundlePath
if c.Checkpoint != "" {
e.Checkpoint = &runtime.Checkpoint{
Name: c.Checkpoint,
}
}
e.Stdio = &runtime.Stdio{
Stderr: c.Stderr,
Stdout: c.Stdout,
}
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
code := http.StatusInternalServerError
if err == containerd.ErrBundleNotFound {
code = http.StatusNotFound
}
http.Error(w, err.Error(), code)
return
}
w.WriteHeader(http.StatusCreated)
}
func (s *server) listCheckpoints(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
e := containerd.NewEvent(containerd.GetContainerEventType)
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var container runtime.Container
for _, c := range e.Containers {
if c.ID() == id {
container = c
break
}
}
if container == nil {
http.Error(w, "container not found", http.StatusNotFound)
return
}
checkpoints, err := container.Checkpoints()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
out := []Checkpoint{}
for _, c := range checkpoints {
out = append(out, Checkpoint{
Name: c.Name,
Tcp: c.Tcp,
Shell: c.Shell,
UnixSockets: c.UnixSockets,
Timestamp: c.Timestamp,
})
}
if err := json.NewEncoder(w).Encode(out); err != nil {
logrus.WithField("error", err).Error("encode checkpoints")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
id = vars["id"]
name = vars["name"]
)
var cp Checkpoint
// most options to the checkpoint action can be left out so don't
// decode unless the client passed anything in the body.
if r.ContentLength > 0 {
if err := json.NewDecoder(r.Body).Decode(&cp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
e := containerd.NewEvent(containerd.CreateCheckpointEventType)
e.ID = id
e.Checkpoint = &runtime.Checkpoint{
Name: name,
Exit: cp.Exit,
Tcp: cp.Tcp,
UnixSockets: cp.UnixSockets,
Shell: cp.Shell,
}
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
}
func (s *server) deleteCheckpoint(w http.ResponseWriter, r *http.Request) {
var (
vars = mux.Vars(r)
id = vars["id"]
name = vars["name"]
)
if name == "" {
http.Error(w, "checkpoint name cannot be empty", http.StatusBadRequest)
return
}
var cp Checkpoint
if r.ContentLength > 0 {
if err := json.NewDecoder(r.Body).Decode(&cp); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
e := containerd.NewEvent(containerd.DeleteCheckpointEventType)
e.ID = id
e.Checkpoint = &runtime.Checkpoint{
Name: name,
}
s.supervisor.SendEvent(e)
if err := <-e.Err; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

View File

@ -1,70 +0,0 @@
package v1
import "time"
type State struct {
Containers []Container `json:"containers"`
Machine Machine `json:"machine"`
}
type Status string
const (
Paused Status = "paused"
Running Status = "running"
)
type Machine struct {
ID string `json:"id"`
Cpus int `json:"cpus"`
Memory int64 `json:"memory"`
}
type ContainerState struct {
Status Status `json:"status,omitempty"`
Signal int `json:"signal,omitempty"`
}
type Container struct {
ID string `json:"id,omitempty"`
BundlePath string `json:"bundlePath,omitempty"`
Processes []Process `json:"processes,omitempty"`
Stdout string `json:"stdout,omitempty"`
Stderr string `json:"stderr,omitempty"`
State *ContainerState `json:"state,omitempty"`
Checkpoint string `json:"checkpoint,omitempty"`
}
type User struct {
UID uint32 `json:"uid"`
GID uint32 `json:"gid"`
AdditionalGids []uint32 `json:"additionalGids,omitempty"`
}
type Process struct {
Terminal bool `json:"terminal"`
User User `json:"user"`
Args []string `json:"args,omitempty"`
Env []string `json:"env,omitempty"`
Cwd string `json:"cwd,omitempty"`
Pid int `json:"pid,omitempty"`
}
type Signal struct {
Signal int `json:"signal"`
}
type Event struct {
Type string `json:"type"`
ID string `json:"id,omitempty"`
Status int `json:"status,omitempty"`
}
type Checkpoint struct {
Name string `json:"name,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
Exit bool `json:"exit,omitempty"`
Tcp bool `json:"tcp"`
UnixSockets bool `json:"unixSockets"`
Shell bool `json:"shell"`
}

View File

@ -3,21 +3,18 @@ package main
import (
"log"
"net"
"net/http"
"os"
"runtime"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/docker/containerd"
"github.com/docker/containerd/api/grpc/server"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/api/v1"
"github.com/docker/containerd/util"
"github.com/rcrowley/go-metrics"
)
@ -150,18 +147,13 @@ func daemon(id, stateDir string, concurrency, bufferSize int) error {
if err := supervisor.Start(); err != nil {
return err
}
if os.Getenv("GRPC") != "" {
lis, err := net.Listen("tcp", ":8888")
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
types.RegisterAPIServer(grpcServer, server.NewServer(supervisor))
return grpcServer.Serve(lis)
l, err := net.Listen("tcp", ":8888")
if err != nil {
return err
}
server := v1.NewServer(supervisor)
return http.ListenAndServe("localhost:8888", server)
s := grpc.NewServer()
types.RegisterAPIServer(s, server.NewServer(supervisor))
return s.Serve(l)
}
// getDefaultID returns the hostname for the instance host