ca7c504068
Prior to this patch, when list containers by "ctr containers" or "ctr containers xxx", it will not get the proper status of conatinser(s). That was caused by the wrong implementation of State() for structure process, it only send a signal "0" to ping the "init" process and do nothing. Since the OCI/runc has implemented an interface Status(), we can use that. And I think this is more compatible with the design for containerd: - containerd -> runtime -> fun() Signed-off-by: Hu Keping <hukeping@huawei.com>
256 lines
6.3 KiB
Go
256 lines
6.3 KiB
Go
package server
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"syscall"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
|
|
"github.com/docker/containerd/api/grpc/types"
|
|
"github.com/docker/containerd/runtime"
|
|
"github.com/docker/containerd/specs"
|
|
"github.com/docker/containerd/supervisor"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type apiServer struct {
|
|
sv *supervisor.Supervisor
|
|
}
|
|
|
|
// NewServer returns grpc server instance
|
|
func NewServer(sv *supervisor.Supervisor) types.APIServer {
|
|
return &apiServer{
|
|
sv: sv,
|
|
}
|
|
}
|
|
|
|
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
|
|
if c.BundlePath == "" {
|
|
return nil, errors.New("empty bundle path")
|
|
}
|
|
e := &supervisor.StartTask{}
|
|
e.ID = c.Id
|
|
e.BundlePath = c.BundlePath
|
|
e.Stdin = c.Stdin
|
|
e.Stdout = c.Stdout
|
|
e.Stderr = c.Stderr
|
|
e.Labels = c.Labels
|
|
e.NoPivotRoot = c.NoPivotRoot
|
|
e.StartResponse = make(chan supervisor.StartResponse, 1)
|
|
createContainerConfigCheckpoint(e, c)
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
r := <-e.StartResponse
|
|
apiC, err := createAPIContainer(r.Container, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.CreateContainerResponse{
|
|
Container: apiC,
|
|
}, nil
|
|
}
|
|
|
|
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
|
|
e := &supervisor.SignalTask{}
|
|
e.ID = r.Id
|
|
e.PID = r.Pid
|
|
e.Signal = syscall.Signal(int(r.Signal))
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.SignalResponse{}, nil
|
|
}
|
|
|
|
func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) (*types.AddProcessResponse, error) {
|
|
process := &specs.ProcessSpec{
|
|
Terminal: r.Terminal,
|
|
Args: r.Args,
|
|
Env: r.Env,
|
|
Cwd: r.Cwd,
|
|
}
|
|
setPlatformRuntimeProcessSpecUserFields(r, process)
|
|
|
|
if r.Id == "" {
|
|
return nil, fmt.Errorf("container id cannot be empty")
|
|
}
|
|
if r.Pid == "" {
|
|
return nil, fmt.Errorf("process id cannot be empty")
|
|
}
|
|
e := &supervisor.AddProcessTask{}
|
|
e.ID = r.Id
|
|
e.PID = r.Pid
|
|
e.ProcessSpec = process
|
|
e.Stdin = r.Stdin
|
|
e.Stdout = r.Stdout
|
|
e.Stderr = r.Stderr
|
|
e.StartResponse = make(chan supervisor.StartResponse, 1)
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
<-e.StartResponse
|
|
return &types.AddProcessResponse{}, nil
|
|
}
|
|
|
|
func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.StateResponse, error) {
|
|
e := &supervisor.GetContainersTask{}
|
|
e.ID = r.Id
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
m := s.sv.Machine()
|
|
state := &types.StateResponse{
|
|
Machine: &types.Machine{
|
|
Cpus: uint32(m.Cpus),
|
|
Memory: uint64(m.Memory),
|
|
},
|
|
}
|
|
for _, c := range e.Containers {
|
|
apiC, err := createAPIContainer(c, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
state.Containers = append(state.Containers, apiC)
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
func createAPIContainer(c runtime.Container, getPids bool) (*types.Container, error) {
|
|
processes, err := c.Processes()
|
|
if err != nil {
|
|
return nil, grpc.Errorf(codes.Internal, "get processes for container: "+err.Error())
|
|
}
|
|
var procs []*types.Process
|
|
for _, p := range processes {
|
|
oldProc := p.Spec()
|
|
stdio := p.Stdio()
|
|
appendToProcs := &types.Process{
|
|
Pid: p.ID(),
|
|
SystemPid: uint32(p.SystemPid()),
|
|
Terminal: oldProc.Terminal,
|
|
Args: oldProc.Args,
|
|
Env: oldProc.Env,
|
|
Cwd: oldProc.Cwd,
|
|
Stdin: stdio.Stdin,
|
|
Stdout: stdio.Stdout,
|
|
Stderr: stdio.Stderr,
|
|
}
|
|
setUserFieldsInProcess(appendToProcs, oldProc)
|
|
procs = append(procs, appendToProcs)
|
|
}
|
|
var pids []int
|
|
state, err := c.Status()
|
|
if err != nil {
|
|
return nil, grpc.Errorf(codes.Internal, "get status for container: "+err.Error())
|
|
}
|
|
|
|
if getPids && (state == runtime.Running || state == runtime.Paused) {
|
|
if pids, err = c.Pids(); err != nil {
|
|
return nil, grpc.Errorf(codes.Internal, "get all pids for container: "+err.Error())
|
|
}
|
|
}
|
|
return &types.Container{
|
|
Id: c.ID(),
|
|
BundlePath: c.Path(),
|
|
Processes: procs,
|
|
Labels: c.Labels(),
|
|
Status: string(state),
|
|
Pids: toUint32(pids),
|
|
Runtime: c.Runtime(),
|
|
}, nil
|
|
}
|
|
|
|
func toUint32(its []int) []uint32 {
|
|
o := []uint32{}
|
|
for _, i := range its {
|
|
o = append(o, uint32(i))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
|
|
e := &supervisor.UpdateTask{}
|
|
e.ID = r.Id
|
|
e.State = runtime.State(r.Status)
|
|
if r.Resources != nil {
|
|
rs := r.Resources
|
|
e.Resources = &runtime.Resource{}
|
|
if rs.CpuShares != 0 {
|
|
e.Resources.CPUShares = int64(rs.CpuShares)
|
|
}
|
|
if rs.BlkioWeight != 0 {
|
|
e.Resources.BlkioWeight = uint16(rs.BlkioWeight)
|
|
}
|
|
if rs.CpuPeriod != 0 {
|
|
e.Resources.CPUPeriod = int64(rs.CpuPeriod)
|
|
}
|
|
if rs.CpuQuota != 0 {
|
|
e.Resources.CPUQuota = int64(rs.CpuQuota)
|
|
}
|
|
if rs.CpusetCpus != "" {
|
|
e.Resources.CpusetCpus = rs.CpusetCpus
|
|
}
|
|
if rs.CpusetMems != "" {
|
|
e.Resources.CpusetMems = rs.CpusetMems
|
|
}
|
|
if rs.KernelMemoryLimit != 0 {
|
|
e.Resources.KernelMemory = int64(rs.KernelMemoryLimit)
|
|
}
|
|
if rs.MemoryLimit != 0 {
|
|
e.Resources.Memory = int64(rs.MemoryLimit)
|
|
}
|
|
if rs.MemoryReservation != 0 {
|
|
e.Resources.MemoryReservation = int64(rs.MemoryReservation)
|
|
}
|
|
if rs.MemorySwap != 0 {
|
|
e.Resources.MemorySwap = int64(rs.MemorySwap)
|
|
}
|
|
}
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.UpdateContainerResponse{}, nil
|
|
}
|
|
|
|
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
|
|
e := &supervisor.UpdateProcessTask{}
|
|
e.ID = r.Id
|
|
e.PID = r.Pid
|
|
e.Height = int(r.Height)
|
|
e.Width = int(r.Width)
|
|
e.CloseStdin = r.CloseStdin
|
|
s.sv.SendTask(e)
|
|
if err := <-e.ErrorCh(); err != nil {
|
|
return nil, err
|
|
}
|
|
return &types.UpdateProcessResponse{}, nil
|
|
}
|
|
|
|
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
|
|
t := time.Time{}
|
|
if r.Timestamp != 0 {
|
|
t = time.Unix(int64(r.Timestamp), 0)
|
|
}
|
|
events := s.sv.Events(t)
|
|
defer s.sv.Unsubscribe(events)
|
|
for e := range events {
|
|
if err := stream.Send(&types.Event{
|
|
Id: e.ID,
|
|
Type: e.Type,
|
|
Timestamp: uint64(e.Timestamp.Unix()),
|
|
Pid: e.PID,
|
|
Status: uint32(e.Status),
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|