bbfcf62a0f
CRI requires all timestamps in nanosends instead of seconds. Signed-off-by: Pengfei Ni <feiskyer@gmail.com>
602 lines
16 KiB
Go
602 lines
16 KiB
Go
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"syscall"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/docker/pkg/stringid"
|
|
"github.com/kubernetes-incubator/cri-o/oci"
|
|
"github.com/kubernetes-incubator/cri-o/utils"
|
|
"github.com/opencontainers/runc/libcontainer/label"
|
|
"github.com/opencontainers/runtime-tools/generate"
|
|
"golang.org/x/net/context"
|
|
"k8s.io/kubernetes/pkg/fields"
|
|
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
|
)
|
|
|
|
func (s *Server) generateContainerIDandName(podName string, name string, attempt uint32) (string, string, error) {
|
|
var (
|
|
err error
|
|
id = stringid.GenerateNonCryptoID()
|
|
)
|
|
nameStr := fmt.Sprintf("%s-%s-%v", podName, name, attempt)
|
|
if name == "infra" {
|
|
nameStr = fmt.Sprintf("%s-%s", podName, name)
|
|
}
|
|
if name, err = s.reserveContainerName(id, nameStr); err != nil {
|
|
return "", "", err
|
|
}
|
|
return id, name, err
|
|
}
|
|
|
|
type containerRequest interface {
|
|
GetContainerId() string
|
|
}
|
|
|
|
func (s *Server) getContainerFromRequest(req containerRequest) (*oci.Container, error) {
|
|
ctrID := req.GetContainerId()
|
|
if ctrID == "" {
|
|
return nil, fmt.Errorf("container ID should not be empty")
|
|
}
|
|
|
|
containerID, err := s.ctrIDIndex.Get(ctrID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("container with ID starting with %s not found: %v", ctrID, err)
|
|
}
|
|
|
|
c := s.state.containers.Get(containerID)
|
|
if c == nil {
|
|
return nil, fmt.Errorf("specified container not found: %s", containerID)
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// CreateContainer creates a new container in specified PodSandbox
|
|
func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerRequest) (res *pb.CreateContainerResponse, err error) {
|
|
logrus.Debugf("CreateContainerRequest %+v", req)
|
|
sbID := req.GetPodSandboxId()
|
|
if sbID == "" {
|
|
return nil, fmt.Errorf("PodSandboxId should not be empty")
|
|
}
|
|
|
|
sandboxID, err := s.podIDIndex.Get(sbID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", sbID, err)
|
|
}
|
|
|
|
sb := s.getSandbox(sandboxID)
|
|
if sb == nil {
|
|
return nil, fmt.Errorf("specified sandbox not found: %s", sandboxID)
|
|
}
|
|
|
|
// The config of the container
|
|
containerConfig := req.GetConfig()
|
|
if containerConfig == nil {
|
|
return nil, fmt.Errorf("CreateContainerRequest.ContainerConfig is nil")
|
|
}
|
|
|
|
name := containerConfig.GetMetadata().GetName()
|
|
if name == "" {
|
|
return nil, fmt.Errorf("CreateContainerRequest.ContainerConfig.Name is empty")
|
|
}
|
|
|
|
attempt := containerConfig.GetMetadata().GetAttempt()
|
|
containerID, containerName, err := s.generateContainerIDandName(sb.name, name, attempt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// containerDir is the dir for the container bundle.
|
|
containerDir := filepath.Join(s.runtime.ContainerDir(), containerID)
|
|
defer func() {
|
|
if err != nil {
|
|
s.releaseContainerName(containerName)
|
|
err1 := os.RemoveAll(containerDir)
|
|
if err1 != nil {
|
|
logrus.Warnf("Failed to cleanup container directory: %v")
|
|
}
|
|
}
|
|
}()
|
|
|
|
if _, err = os.Stat(containerDir); err == nil {
|
|
return nil, fmt.Errorf("container (%s) already exists", containerDir)
|
|
}
|
|
|
|
if err = os.MkdirAll(containerDir, 0755); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
container, err := s.createSandboxContainer(containerID, containerName, sb, req.GetSandboxConfig(), containerDir, containerConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = s.runtime.CreateContainer(container); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = s.runtime.UpdateStatus(container); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.addContainer(container)
|
|
|
|
if err = s.ctrIDIndex.Add(containerID); err != nil {
|
|
s.removeContainer(container)
|
|
return nil, err
|
|
}
|
|
|
|
resp := &pb.CreateContainerResponse{
|
|
ContainerId: &containerID,
|
|
}
|
|
|
|
logrus.Debugf("CreateContainerResponse: %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Server) createSandboxContainer(containerID string, containerName string, sb *sandbox, SandboxConfig *pb.PodSandboxConfig, containerDir string, containerConfig *pb.ContainerConfig) (*oci.Container, error) {
|
|
if sb == nil {
|
|
return nil, errors.New("createSandboxContainer needs a sandbox")
|
|
}
|
|
// creates a spec Generator with the default spec.
|
|
specgen := generate.New()
|
|
|
|
// by default, the root path is an empty string.
|
|
// here set it to be "rootfs".
|
|
specgen.SetRootPath("rootfs")
|
|
|
|
args := containerConfig.GetArgs()
|
|
if args == nil {
|
|
args = []string{"/bin/sh"}
|
|
}
|
|
specgen.SetProcessArgs(args)
|
|
|
|
cwd := containerConfig.GetWorkingDir()
|
|
if cwd == "" {
|
|
cwd = "/"
|
|
}
|
|
specgen.SetProcessCwd(cwd)
|
|
|
|
envs := containerConfig.GetEnvs()
|
|
if envs != nil {
|
|
for _, item := range envs {
|
|
key := item.GetKey()
|
|
value := item.GetValue()
|
|
if key == "" {
|
|
continue
|
|
}
|
|
env := fmt.Sprintf("%s=%s", key, value)
|
|
specgen.AddProcessEnv(env)
|
|
}
|
|
}
|
|
|
|
mounts := containerConfig.GetMounts()
|
|
for _, mount := range mounts {
|
|
dest := mount.GetContainerPath()
|
|
if dest == "" {
|
|
return nil, fmt.Errorf("Mount.ContainerPath is empty")
|
|
}
|
|
|
|
src := mount.GetHostPath()
|
|
if src == "" {
|
|
return nil, fmt.Errorf("Mount.HostPath is empty")
|
|
}
|
|
|
|
options := "rw"
|
|
if mount.GetReadonly() {
|
|
options = "ro"
|
|
}
|
|
|
|
if mount.GetSelinuxRelabel() {
|
|
// Need a way in kubernetes to determine if the volume is shared or private
|
|
if err := label.Relabel(src, sb.mountLabel, true); err != nil && err != syscall.ENOTSUP {
|
|
return nil, fmt.Errorf("relabel failed %s: %v", src, err)
|
|
}
|
|
}
|
|
|
|
specgen.AddBindMount(src, dest, options)
|
|
|
|
}
|
|
|
|
labels := containerConfig.GetLabels()
|
|
|
|
metadata := containerConfig.GetMetadata()
|
|
|
|
annotations := containerConfig.GetAnnotations()
|
|
if annotations != nil {
|
|
for k, v := range annotations {
|
|
specgen.AddAnnotation(k, v)
|
|
}
|
|
}
|
|
if containerConfig.GetPrivileged() {
|
|
specgen.SetupPrivileged(true)
|
|
}
|
|
|
|
if containerConfig.GetReadonlyRootfs() {
|
|
specgen.SetRootReadonly(true)
|
|
}
|
|
|
|
logPath := containerConfig.GetLogPath()
|
|
|
|
if containerConfig.GetTty() {
|
|
specgen.SetProcessTerminal(true)
|
|
}
|
|
|
|
linux := containerConfig.GetLinux()
|
|
if linux != nil {
|
|
resources := linux.GetResources()
|
|
if resources != nil {
|
|
cpuPeriod := resources.GetCpuPeriod()
|
|
if cpuPeriod != 0 {
|
|
specgen.SetLinuxResourcesCPUPeriod(uint64(cpuPeriod))
|
|
}
|
|
|
|
cpuQuota := resources.GetCpuQuota()
|
|
if cpuQuota != 0 {
|
|
specgen.SetLinuxResourcesCPUQuota(uint64(cpuQuota))
|
|
}
|
|
|
|
cpuShares := resources.GetCpuShares()
|
|
if cpuShares != 0 {
|
|
specgen.SetLinuxResourcesCPUShares(uint64(cpuShares))
|
|
}
|
|
|
|
memoryLimit := resources.GetMemoryLimitInBytes()
|
|
if memoryLimit != 0 {
|
|
specgen.SetLinuxResourcesMemoryLimit(uint64(memoryLimit))
|
|
}
|
|
|
|
oomScoreAdj := resources.GetOomScoreAdj()
|
|
specgen.SetLinuxResourcesOOMScoreAdj(int(oomScoreAdj))
|
|
}
|
|
|
|
capabilities := linux.GetCapabilities()
|
|
if capabilities != nil {
|
|
addCaps := capabilities.GetAddCapabilities()
|
|
if addCaps != nil {
|
|
for _, cap := range addCaps {
|
|
if err := specgen.AddProcessCapability(cap); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
dropCaps := capabilities.GetDropCapabilities()
|
|
if dropCaps != nil {
|
|
for _, cap := range dropCaps {
|
|
if err := specgen.DropProcessCapability(cap); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
specgen.SetProcessSelinuxLabel(sb.processLabel)
|
|
specgen.SetLinuxMountLabel(sb.mountLabel)
|
|
|
|
user := linux.GetUser()
|
|
if user != nil {
|
|
uid := user.GetUid()
|
|
specgen.SetProcessUID(uint32(uid))
|
|
|
|
gid := user.GetGid()
|
|
specgen.SetProcessGID(uint32(gid))
|
|
|
|
groups := user.GetAdditionalGids()
|
|
if groups != nil {
|
|
for _, group := range groups {
|
|
specgen.AddProcessAdditionalGid(uint32(group))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Join the namespace paths for the pod sandbox container.
|
|
podInfraState := s.runtime.ContainerStatus(sb.infraContainer)
|
|
|
|
logrus.Debugf("pod container state %+v", podInfraState)
|
|
|
|
for nsType, nsFile := range map[string]string{
|
|
"ipc": "ipc",
|
|
"network": "net",
|
|
} {
|
|
nsPath := fmt.Sprintf("/proc/%d/ns/%s", podInfraState.Pid, nsFile)
|
|
if err := specgen.AddOrReplaceLinuxNamespace(nsType, nsPath); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
specgen.AddAnnotation("ocid/name", containerName)
|
|
specgen.AddAnnotation("ocid/sandbox_id", sb.id)
|
|
specgen.AddAnnotation("ocid/log_path", logPath)
|
|
specgen.AddAnnotation("ocid/tty", fmt.Sprintf("%v", containerConfig.GetTty()))
|
|
|
|
metadataJSON, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
specgen.AddAnnotation("ocid/metadata", string(metadataJSON))
|
|
|
|
labelsJSON, err := json.Marshal(labels)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
specgen.AddAnnotation("ocid/labels", string(labelsJSON))
|
|
|
|
if err = specgen.SaveToFile(filepath.Join(containerDir, "config.json")); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
imageSpec := containerConfig.GetImage()
|
|
if imageSpec == nil {
|
|
return nil, fmt.Errorf("CreateContainerRequest.ContainerConfig.Image is nil")
|
|
}
|
|
|
|
image := imageSpec.GetImage()
|
|
if image == "" {
|
|
return nil, fmt.Errorf("CreateContainerRequest.ContainerConfig.Image.Image is empty")
|
|
}
|
|
|
|
// TODO: copy the rootfs into the bundle.
|
|
// Currently, utils.CreateFakeRootfs is used to populate the rootfs.
|
|
if err = utils.CreateFakeRootfs(containerDir, image); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
container, err := oci.NewContainer(containerID, containerName, containerDir, logPath, labels, metadata, sb.id, containerConfig.GetTty())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return container, nil
|
|
}
|
|
|
|
// StartContainer starts the container.
|
|
func (s *Server) StartContainer(ctx context.Context, req *pb.StartContainerRequest) (*pb.StartContainerResponse, error) {
|
|
logrus.Debugf("StartContainerRequest %+v", req)
|
|
c, err := s.getContainerFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.runtime.StartContainer(c); err != nil {
|
|
return nil, fmt.Errorf("failed to start container %s: %v", c.ID(), err)
|
|
}
|
|
|
|
resp := &pb.StartContainerResponse{}
|
|
logrus.Debugf("StartContainerResponse %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// StopContainer stops a running container with a grace period (i.e., timeout).
|
|
func (s *Server) StopContainer(ctx context.Context, req *pb.StopContainerRequest) (*pb.StopContainerResponse, error) {
|
|
logrus.Debugf("StopContainerRequest %+v", req)
|
|
c, err := s.getContainerFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.runtime.StopContainer(c); err != nil {
|
|
return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err)
|
|
}
|
|
|
|
resp := &pb.StopContainerResponse{}
|
|
logrus.Debugf("StopContainerResponse: %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// RemoveContainer removes the container. If the container is running, the container
|
|
// should be force removed.
|
|
func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerRequest) (*pb.RemoveContainerResponse, error) {
|
|
logrus.Debugf("RemoveContainerRequest %+v", req)
|
|
c, err := s.getContainerFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.runtime.UpdateStatus(c); err != nil {
|
|
return nil, fmt.Errorf("failed to update container state: %v", err)
|
|
}
|
|
|
|
cState := s.runtime.ContainerStatus(c)
|
|
if cState.Status == oci.ContainerStateCreated || cState.Status == oci.ContainerStateRunning {
|
|
if err := s.runtime.StopContainer(c); err != nil {
|
|
return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err)
|
|
}
|
|
}
|
|
|
|
if err := s.runtime.DeleteContainer(c); err != nil {
|
|
return nil, fmt.Errorf("failed to delete container %s: %v", c.ID(), err)
|
|
}
|
|
|
|
containerDir := filepath.Join(s.runtime.ContainerDir(), c.ID())
|
|
if err := os.RemoveAll(containerDir); err != nil {
|
|
return nil, fmt.Errorf("failed to remove container %s directory: %v", c.ID(), err)
|
|
}
|
|
|
|
s.releaseContainerName(c.Name())
|
|
s.removeContainer(c)
|
|
|
|
if err := s.ctrIDIndex.Delete(c.ID()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &pb.RemoveContainerResponse{}
|
|
logrus.Debugf("RemoveContainerResponse: %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// filterContainer returns whether passed container matches filtering criteria
|
|
func filterContainer(c *pb.Container, filter *pb.ContainerFilter) bool {
|
|
if filter != nil {
|
|
if filter.State != nil {
|
|
if *c.State != *filter.State {
|
|
return false
|
|
}
|
|
}
|
|
if filter.LabelSelector != nil {
|
|
sel := fields.SelectorFromSet(filter.LabelSelector)
|
|
if !sel.Matches(fields.Set(c.Labels)) {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ListContainers lists all containers by filters.
|
|
func (s *Server) ListContainers(ctx context.Context, req *pb.ListContainersRequest) (*pb.ListContainersResponse, error) {
|
|
logrus.Debugf("ListContainersRequest %+v", req)
|
|
var ctrs []*pb.Container
|
|
filter := req.Filter
|
|
ctrList := s.state.containers.List()
|
|
|
|
// Filter using container id and pod id first.
|
|
if filter != nil {
|
|
if filter.Id != nil {
|
|
c := s.state.containers.Get(*filter.Id)
|
|
if c != nil {
|
|
if filter.PodSandboxId != nil {
|
|
if c.Sandbox() == *filter.PodSandboxId {
|
|
ctrList = []*oci.Container{c}
|
|
} else {
|
|
ctrList = []*oci.Container{}
|
|
}
|
|
|
|
} else {
|
|
ctrList = []*oci.Container{c}
|
|
}
|
|
}
|
|
} else {
|
|
if filter.PodSandboxId != nil {
|
|
pod := s.state.sandboxes[*filter.PodSandboxId]
|
|
if pod == nil {
|
|
ctrList = []*oci.Container{}
|
|
} else {
|
|
ctrList = pod.containers.List()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, ctr := range ctrList {
|
|
if err := s.runtime.UpdateStatus(ctr); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
podSandboxID := ctr.Sandbox()
|
|
cState := s.runtime.ContainerStatus(ctr)
|
|
created := cState.Created.UnixNano()
|
|
rState := pb.ContainerState_UNKNOWN
|
|
cID := ctr.ID()
|
|
|
|
c := &pb.Container{
|
|
Id: &cID,
|
|
PodSandboxId: &podSandboxID,
|
|
CreatedAt: int64Ptr(created),
|
|
Labels: ctr.Labels(),
|
|
Metadata: ctr.Metadata(),
|
|
}
|
|
|
|
switch cState.Status {
|
|
case oci.ContainerStateCreated:
|
|
rState = pb.ContainerState_CREATED
|
|
case oci.ContainerStateRunning:
|
|
rState = pb.ContainerState_RUNNING
|
|
case oci.ContainerStateStopped:
|
|
rState = pb.ContainerState_EXITED
|
|
}
|
|
c.State = &rState
|
|
|
|
// Filter by other criteria such as state and labels.
|
|
if filterContainer(c, req.Filter) {
|
|
ctrs = append(ctrs, c)
|
|
}
|
|
}
|
|
|
|
resp := &pb.ListContainersResponse{
|
|
Containers: ctrs,
|
|
}
|
|
logrus.Debugf("ListContainersResponse: %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// ContainerStatus returns status of the container.
|
|
func (s *Server) ContainerStatus(ctx context.Context, req *pb.ContainerStatusRequest) (*pb.ContainerStatusResponse, error) {
|
|
logrus.Debugf("ContainerStatusRequest %+v", req)
|
|
c, err := s.getContainerFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.runtime.UpdateStatus(c); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
containerID := c.ID()
|
|
resp := &pb.ContainerStatusResponse{
|
|
Status: &pb.ContainerStatus{
|
|
Id: &containerID,
|
|
Metadata: c.Metadata(),
|
|
},
|
|
}
|
|
|
|
cState := s.runtime.ContainerStatus(c)
|
|
rStatus := pb.ContainerState_UNKNOWN
|
|
|
|
switch cState.Status {
|
|
case oci.ContainerStateCreated:
|
|
rStatus = pb.ContainerState_CREATED
|
|
created := cState.Created.UnixNano()
|
|
resp.Status.CreatedAt = int64Ptr(created)
|
|
case oci.ContainerStateRunning:
|
|
rStatus = pb.ContainerState_RUNNING
|
|
created := cState.Created.UnixNano()
|
|
resp.Status.CreatedAt = int64Ptr(created)
|
|
started := cState.Started.UnixNano()
|
|
resp.Status.StartedAt = int64Ptr(started)
|
|
case oci.ContainerStateStopped:
|
|
rStatus = pb.ContainerState_EXITED
|
|
created := cState.Created.UnixNano()
|
|
resp.Status.CreatedAt = int64Ptr(created)
|
|
started := cState.Started.UnixNano()
|
|
resp.Status.StartedAt = int64Ptr(started)
|
|
finished := cState.Finished.UnixNano()
|
|
resp.Status.FinishedAt = int64Ptr(finished)
|
|
resp.Status.ExitCode = int32Ptr(cState.ExitCode)
|
|
}
|
|
|
|
resp.Status.State = &rStatus
|
|
|
|
logrus.Debugf("ContainerStatusResponse: %+v", resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// UpdateRuntimeConfig updates the configuration of a running container.
|
|
func (s *Server) UpdateRuntimeConfig(ctx context.Context, req *pb.UpdateRuntimeConfigRequest) (*pb.UpdateRuntimeConfigResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// ExecSync runs a command in a container synchronously.
|
|
func (s *Server) ExecSync(ctx context.Context, req *pb.ExecSyncRequest) (*pb.ExecSyncResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Exec prepares a streaming endpoint to execute a command in the container.
|
|
func (s *Server) Exec(ctx context.Context, req *pb.ExecRequest) (*pb.ExecResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Attach prepares a streaming endpoint to attach to a running container.
|
|
func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
|
|
func (s *Server) PortForward(ctx context.Context, req *pb.PortForwardRequest) (*pb.PortForwardResponse, error) {
|
|
return nil, nil
|
|
}
|