containerd/api/grpc/server/server.go
Michael Crosby 0dd075a47b Add update rpc for resource updates
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
2016-03-07 15:23:52 -08:00

251 lines
6.1 KiB
Go

package server
import (
"errors"
"fmt"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/runtime"
"github.com/docker/containerd/specs"
"github.com/docker/containerd/supervisor"
"golang.org/x/net/context"
)
type apiServer struct {
sv *supervisor.Supervisor
}
// NewServer returns grpc server instance
func NewServer(sv *supervisor.Supervisor) types.APIServer {
return &apiServer{
sv: sv,
}
}
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
if c.BundlePath == "" {
return nil, errors.New("empty bundle path")
}
e := &supervisor.StartTask{}
e.ID = c.Id
e.BundlePath = c.BundlePath
e.Stdin = c.Stdin
e.Stdout = c.Stdout
e.Stderr = c.Stderr
e.Labels = c.Labels
e.StartResponse = make(chan supervisor.StartResponse, 1)
createContainerConfigCheckpoint(e, c)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
r := <-e.StartResponse
apiC, err := createAPIContainer(r.Container, false)
if err != nil {
return nil, err
}
return &types.CreateContainerResponse{
Container: apiC,
}, nil
}
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
e := &supervisor.SignalTask{}
e.ID = r.Id
e.PID = r.Pid
e.Signal = syscall.Signal(int(r.Signal))
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.SignalResponse{}, nil
}
func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) (*types.AddProcessResponse, error) {
process := &specs.ProcessSpec{
Terminal: r.Terminal,
Args: r.Args,
Env: r.Env,
Cwd: r.Cwd,
}
setPlatformRuntimeProcessSpecUserFields(r, process)
if r.Id == "" {
return nil, fmt.Errorf("container id cannot be empty")
}
if r.Pid == "" {
return nil, fmt.Errorf("process id cannot be empty")
}
e := &supervisor.AddProcessTask{}
e.ID = r.Id
e.PID = r.Pid
e.ProcessSpec = process
e.Stdin = r.Stdin
e.Stdout = r.Stdout
e.Stderr = r.Stderr
e.StartResponse = make(chan supervisor.StartResponse, 1)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
<-e.StartResponse
return &types.AddProcessResponse{}, nil
}
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
e := &supervisor.GetContainersTask{}
e.ID = r.Id
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
m := s.sv.Machine()
state := &types.StateResponse{
Machine: &types.Machine{
Cpus: uint32(m.Cpus),
Memory: uint64(m.Memory),
},
}
for _, c := range e.Containers {
apiC, err := createAPIContainer(c, true)
if err != nil {
return nil, err
}
state.Containers = append(state.Containers, apiC)
}
return state, nil
}
func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, error) {
processes, err := c.Processes()
if err != nil {
return nil, grpc.Errorf(codes.Internal, "get processes for container")
}
var procs []*types.Process
for _, p := range processes {
oldProc := p.Spec()
stdio := p.Stdio()
appendToProcs := &types.Process{
Pid: p.ID(),
SystemPid: uint32(p.SystemPid()),
Terminal: oldProc.Terminal,
Args: oldProc.Args,
Env: oldProc.Env,
Cwd: oldProc.Cwd,
Stdin: stdio.Stdin,
Stdout: stdio.Stdout,
Stderr: stdio.Stderr,
}
setUserFieldsInProcess(appendToProcs, oldProc)
procs = append(procs, appendToProcs)
}
var pids []int
state := c.State()
if getPids && (state == runtime.Running || state == runtime.Paused) {
if pids, err = c.Pids(); err != nil {
return nil, grpc.Errorf(codes.Internal, "get all pids for container")
}
}
return &types.Container{
Id: c.ID(),
BundlePath: c.Path(),
Processes: procs,
Labels: c.Labels(),
Status: string(state),
Pids: toUint32(pids),
Runtime: c.Runtime(),
}, nil
}
func toUint32(its []int) []uint32 {
o := []uint32{}
for _, i := range its {
o = append(o, uint32(i))
}
return o
}
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
e := &supervisor.UpdateTask{}
e.ID = r.Id
e.State = runtime.State(r.Status)
if r.Resources != nil {
rs := r.Resources
e.Resources = &runtime.Resource{}
if rs.CpuShares != 0 {
e.Resources.CPUShares = int64(rs.CpuShares)
}
if rs.BlkioWeight != 0 {
e.Resources.BlkioWeight = uint16(rs.BlkioWeight)
}
if rs.CpuPeriod != 0 {
e.Resources.CPUPeriod = int64(rs.CpuPeriod)
}
if rs.CpuQuota != 0 {
e.Resources.CPUQuota = int64(rs.CpuQuota)
}
if rs.CpusetCpus != "" {
e.Resources.CpusetCpus = rs.CpusetCpus
}
if rs.CpusetMems != "" {
e.Resources.CpusetMems = rs.CpusetMems
}
if rs.KernelMemoryLimit != 0 {
e.Resources.KernelMemory = int64(rs.KernelMemoryLimit)
}
if rs.MemoryLimit != 0 {
e.Resources.Memory = int64(rs.MemoryLimit)
}
if rs.MemoryReservation != 0 {
e.Resources.MemoryReservation = int64(rs.MemoryReservation)
}
if rs.MemorySwap != 0 {
e.Resources.MemorySwap = int64(rs.MemorySwap)
}
}
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.UpdateContainerResponse{}, nil
}
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
e := &supervisor.UpdateProcessTask{}
e.ID = r.Id
e.PID = r.Pid
e.Height = int(r.Height)
e.Width = int(r.Width)
e.CloseStdin = r.CloseStdin
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.UpdateProcessResponse{}, nil
}
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
t := time.Time{}
if r.Timestamp != 0 {
t = time.Unix(int64(r.Timestamp), 0)
}
events := s.sv.Events(t)
defer s.sv.Unsubscribe(events)
for e := range events {
if err := stream.Send(&types.Event{
Id: e.ID,
Type: e.Type,
Timestamp: uint64(e.Timestamp.Unix()),
Pid: e.PID,
Status: uint32(e.Status),
}); err != nil {
return err
}
}
return nil
}