Merge pull request #713 from 14rcole/containerserver-update
Make container server update it's list of containers upon creation
This commit is contained in:
commit
7351786411
8 changed files with 385 additions and 383 deletions
|
@ -3,7 +3,10 @@ package libkpod
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/containers/image/types"
|
||||
|
@ -13,18 +16,24 @@ import (
|
|||
"github.com/docker/docker/pkg/truncindex"
|
||||
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
|
||||
"github.com/kubernetes-incubator/cri-o/oci"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/annotations"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/storage"
|
||||
rspec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/opencontainers/selinux/go-selinux/label"
|
||||
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
)
|
||||
|
||||
// ContainerServer implements the ImageServer
|
||||
type ContainerServer struct {
|
||||
runtime *oci.Runtime
|
||||
store cstorage.Store
|
||||
storageImageServer storage.ImageServer
|
||||
ctrNameIndex *registrar.Registrar
|
||||
ctrIDIndex *truncindex.TruncIndex
|
||||
podNameIndex *registrar.Registrar
|
||||
podIDIndex *truncindex.TruncIndex
|
||||
runtime *oci.Runtime
|
||||
store cstorage.Store
|
||||
storageImageServer storage.ImageServer
|
||||
storageRuntimeServer storage.RuntimeServer
|
||||
updateLock sync.RWMutex
|
||||
ctrNameIndex *registrar.Registrar
|
||||
ctrIDIndex *truncindex.TruncIndex
|
||||
podNameIndex *registrar.Registrar
|
||||
podIDIndex *truncindex.TruncIndex
|
||||
|
||||
imageContext *types.SystemContext
|
||||
stateLock sync.Locker
|
||||
|
@ -77,6 +86,11 @@ func (c *ContainerServer) Config() *Config {
|
|||
return c.config
|
||||
}
|
||||
|
||||
// StorageRuntimeServer gets the runtime server for the ContainerServer
|
||||
func (c *ContainerServer) StorageRuntimeServer() storage.RuntimeServer {
|
||||
return c.storageRuntimeServer
|
||||
}
|
||||
|
||||
// New creates a new ContainerServer with options provided
|
||||
func New(config *Config) (*ContainerServer, error) {
|
||||
store, err := cstorage.GetStore(cstorage.StoreOptions{
|
||||
|
@ -94,6 +108,11 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
storageRuntimeService := storage.GetRuntimeService(imageService, config.PauseImage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
runtime, err := oci.New(config.Runtime, config.RuntimeUntrustedWorkload, config.DefaultWorkloadTrust, config.Conmon, config.ConmonEnv, config.CgroupManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -111,15 +130,16 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
}
|
||||
|
||||
return &ContainerServer{
|
||||
runtime: runtime,
|
||||
store: store,
|
||||
storageImageServer: imageService,
|
||||
ctrNameIndex: registrar.NewRegistrar(),
|
||||
ctrIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
podNameIndex: registrar.NewRegistrar(),
|
||||
podIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath},
|
||||
stateLock: lock,
|
||||
runtime: runtime,
|
||||
store: store,
|
||||
storageImageServer: imageService,
|
||||
storageRuntimeServer: storageRuntimeService,
|
||||
ctrNameIndex: registrar.NewRegistrar(),
|
||||
ctrIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
podNameIndex: registrar.NewRegistrar(),
|
||||
podIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath},
|
||||
stateLock: lock,
|
||||
state: &containerServerState{
|
||||
containers: oci.NewMemoryStore(),
|
||||
sandboxes: make(map[string]*sandbox.Sandbox),
|
||||
|
@ -128,6 +148,322 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Update makes changes to the server's state (lists of pods and containers) to
|
||||
// reflect the list of pods and containers that are stored on disk, possibly
|
||||
// having been modified by other parties
|
||||
func (c *ContainerServer) Update() error {
|
||||
c.updateLock.Lock()
|
||||
defer c.updateLock.Unlock()
|
||||
|
||||
containers, err := c.store.Containers()
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
logrus.Warnf("could not read containers and sandboxes: %v", err)
|
||||
return err
|
||||
}
|
||||
newPods := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPods := map[string]string{}
|
||||
removedPods := map[string]string{}
|
||||
newPodContainers := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPodContainers := map[string]string{}
|
||||
removedPodContainers := map[string]string{}
|
||||
for _, container := range containers {
|
||||
if c.HasSandbox(container.ID) {
|
||||
// FIXME: do we need to reload/update any info about the sandbox?
|
||||
oldPods[container.ID] = container.ID
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
if c.GetContainer(container.ID) != nil {
|
||||
// FIXME: do we need to reload/update any info about the container?
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
// not previously known, so figure out what it is
|
||||
metadata, err2 := c.storageRuntimeServer.GetContainerMetadata(container.ID)
|
||||
if err2 != nil {
|
||||
logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
|
||||
continue
|
||||
}
|
||||
if metadata.Pod {
|
||||
newPods[container.ID] = &metadata
|
||||
} else {
|
||||
newPodContainers[container.ID] = &metadata
|
||||
}
|
||||
}
|
||||
c.ctrIDIndex.Iterate(func(id string) {
|
||||
if _, ok := oldPodContainers[id]; !ok {
|
||||
// this container's ID wasn't in the updated list -> removed
|
||||
removedPodContainers[id] = id
|
||||
}
|
||||
})
|
||||
for removedPodContainer := range removedPodContainers {
|
||||
// forget this container
|
||||
ctr := c.GetContainer(removedPodContainer)
|
||||
if ctr == nil {
|
||||
logrus.Warnf("bad state when getting container removed %+v", removedPodContainer)
|
||||
continue
|
||||
}
|
||||
c.ReleaseContainerName(ctr.Name())
|
||||
c.RemoveContainer(ctr)
|
||||
if err = c.ctrIDIndex.Delete(ctr.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod container %s", ctr.ID())
|
||||
}
|
||||
c.PodIDIndex().Iterate(func(id string) {
|
||||
if _, ok := oldPods[id]; !ok {
|
||||
// this pod's ID wasn't in the updated list -> removed
|
||||
removedPods[id] = id
|
||||
}
|
||||
})
|
||||
for removedPod := range removedPods {
|
||||
// forget this pod
|
||||
sb := c.GetSandbox(removedPod)
|
||||
if sb == nil {
|
||||
logrus.Warnf("bad state when getting pod to remove %+v", removedPod)
|
||||
continue
|
||||
}
|
||||
podInfraContainer := sb.InfraContainer()
|
||||
c.ReleaseContainerName(podInfraContainer.Name())
|
||||
c.RemoveContainer(podInfraContainer)
|
||||
if err = c.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.RemoveInfraContainer()
|
||||
c.ReleasePodName(sb.Name())
|
||||
c.RemoveSandbox(sb.ID())
|
||||
if err = c.podIDIndex.Delete(sb.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod %s", sb.ID())
|
||||
}
|
||||
for sandboxID := range newPods {
|
||||
// load this pod
|
||||
if err = c.LoadSandbox(sandboxID); err != nil {
|
||||
logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod sandbox %s", sandboxID, err)
|
||||
}
|
||||
}
|
||||
for containerID := range newPodContainers {
|
||||
// load this container
|
||||
if err = c.LoadContainer(containerID); err != nil {
|
||||
logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod container %s", containerID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadSandbox loads a sandbox from the disk into the sandbox store
|
||||
func (c *ContainerServer) LoadSandbox(id string) error {
|
||||
config, err := c.store.FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = c.ReservePodName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleasePodName(name)
|
||||
}
|
||||
}()
|
||||
var metadata pb.PodSandboxMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
processLabel, mountLabel, err := label.InitLabels(label.DupSecOpt(m.Process.SelinuxLabel))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime])
|
||||
trusted := isTrue(m.Annotations[annotations.TrustedSandbox])
|
||||
|
||||
sb, err := sandbox.New(id, name, m.Annotations[annotations.KubeName], filepath.Dir(m.Annotations[annotations.LogPath]), "", labels, kubeAnnotations, processLabel, mountLabel, &metadata, m.Annotations[annotations.ShmPath], "", privileged, trusted, m.Annotations[annotations.ResolvPath], "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We add a netNS only if we can load a permanent one.
|
||||
// Otherwise, the sandbox will live in the host namespace.
|
||||
netNsPath, err := configNetNsPath(m)
|
||||
if err == nil {
|
||||
nsErr := sb.NetNsJoin(netNsPath, sb.Name())
|
||||
// If we can't load the networking namespace
|
||||
// because it's closed, we just set the sb netns
|
||||
// pointer to nil. Otherwise we return an error.
|
||||
if nsErr != nil && nsErr != sandbox.ErrClosedNetNS {
|
||||
return nsErr
|
||||
}
|
||||
}
|
||||
|
||||
c.AddSandbox(sb)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.RemoveSandbox(sb.ID())
|
||||
}
|
||||
}()
|
||||
|
||||
sandboxPath, err := c.store.ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sandboxDir, err := c.store.ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cname, err := c.ReserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleaseContainerName(cname)
|
||||
}
|
||||
}()
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, "", nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ContainerStateFromDisk(scontainer)
|
||||
|
||||
if err = label.ReserveLabel(processLabel); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.SetInfraContainer(scontainer)
|
||||
if err = c.ctrIDIndex.Add(scontainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = c.podIDIndex.Add(id); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func configNetNsPath(spec rspec.Spec) (string, error) {
|
||||
for _, ns := range spec.Linux.Namespaces {
|
||||
if ns.Type != rspec.NetworkNamespace {
|
||||
continue
|
||||
}
|
||||
|
||||
if ns.Path == "" {
|
||||
return "", fmt.Errorf("empty networking namespace")
|
||||
}
|
||||
|
||||
return ns.Path, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("missing networking namespace")
|
||||
}
|
||||
|
||||
// LoadContainer loads a container from the disk into the container store
|
||||
func (c *ContainerServer) LoadContainer(id string) error {
|
||||
config, err := c.store.FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = c.ReserveContainerName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleaseContainerName(name)
|
||||
}
|
||||
}()
|
||||
|
||||
var metadata pb.ContainerMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
sb := c.GetSandbox(m.Annotations[annotations.SandboxID])
|
||||
if sb == nil {
|
||||
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
|
||||
}
|
||||
|
||||
tty := isTrue(m.Annotations[annotations.TTY])
|
||||
stdin := isTrue(m.Annotations[annotations.Stdin])
|
||||
stdinOnce := isTrue(m.Annotations[annotations.StdinOnce])
|
||||
|
||||
containerPath, err := c.store.ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containerDir, err := c.store.ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
img, ok := m.Annotations[annotations.Image]
|
||||
if !ok {
|
||||
img = ""
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, img, &metadata, sb.ID(), tty, stdin, stdinOnce, sb.Privileged(), sb.Trusted(), containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ContainerStateFromDisk(ctr)
|
||||
|
||||
c.AddContainer(ctr)
|
||||
return c.ctrIDIndex.Add(id)
|
||||
}
|
||||
|
||||
func isTrue(annotaton string) bool {
|
||||
return annotaton == "true"
|
||||
}
|
||||
|
||||
// ContainerStateFromDisk retrieves information on the state of a running container
|
||||
// from the disk
|
||||
func (c *ContainerServer) ContainerStateFromDisk(ctr *oci.Container) error {
|
||||
|
|
|
@ -314,7 +314,7 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq
|
|||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err2 := s.storageRuntimeServer.DeleteContainer(containerID)
|
||||
err2 := s.StorageRuntimeServer().DeleteContainer(containerID)
|
||||
if err2 != nil {
|
||||
logrus.Warnf("Failed to cleanup container directory: %v", err2)
|
||||
}
|
||||
|
@ -613,7 +613,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
|
|||
|
||||
metaname := metadata.Name
|
||||
attempt := metadata.Attempt
|
||||
containerInfo, err := s.storageRuntimeServer.CreateContainer(s.ImageContext(),
|
||||
containerInfo, err := s.StorageRuntimeServer().CreateContainer(s.ImageContext(),
|
||||
sb.Name(), sb.ID(),
|
||||
image, image,
|
||||
containerName, containerID,
|
||||
|
@ -625,7 +625,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
mountPoint, err := s.storageRuntimeServer.StartContainer(containerID)
|
||||
mountPoint, err := s.StorageRuntimeServer().StartContainer(containerID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to mount container %s(%s): %v", containerName, containerID, err)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerReq
|
|||
if err := s.Runtime().StopContainer(c, -1); err != nil {
|
||||
return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err)
|
||||
}
|
||||
if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil {
|
||||
if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmount container %s: %v", c.ID(), err)
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerReq
|
|||
|
||||
s.removeContainer(c)
|
||||
|
||||
if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil {
|
||||
if err := s.StorageRuntimeServer().DeleteContainer(c.ID()); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete storage for container %s: %v", c.ID(), err)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ func (s *Server) StopContainer(ctx context.Context, req *pb.StopContainerRequest
|
|||
if err := s.Runtime().StopContainer(c, req.Timeout); err != nil {
|
||||
return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err)
|
||||
}
|
||||
if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil {
|
||||
if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmount container %s: %v", c.ID(), err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,11 +57,11 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR
|
|||
continue
|
||||
}
|
||||
|
||||
if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
// assume container already umounted
|
||||
logrus.Warnf("failed to stop container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err)
|
||||
}
|
||||
if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
if err := s.StorageRuntimeServer().DeleteContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err)
|
||||
}
|
||||
|
||||
|
@ -75,10 +75,10 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR
|
|||
s.removeContainer(podInfraContainer)
|
||||
|
||||
// Remove the files related to the sandbox
|
||||
if err := s.storageRuntimeServer.StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
if err := s.StorageRuntimeServer().StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
logrus.Warnf("failed to stop sandbox container in pod sandbox %s: %v", sb.ID(), err)
|
||||
}
|
||||
if err := s.storageRuntimeServer.RemovePodSandbox(sb.ID()); err != nil && err != pkgstorage.ErrInvalidSandboxID {
|
||||
if err := s.StorageRuntimeServer().RemovePodSandbox(sb.ID()); err != nil && err != pkgstorage.ErrInvalidSandboxID {
|
||||
return nil, fmt.Errorf("failed to remove pod sandbox %s: %v", sb.ID(), err)
|
||||
}
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
|
|||
}
|
||||
}()
|
||||
|
||||
podContainer, err := s.storageRuntimeServer.CreatePodSandbox(s.ImageContext(),
|
||||
podContainer, err := s.StorageRuntimeServer().CreatePodSandbox(s.ImageContext(),
|
||||
name, id,
|
||||
s.config.PauseImage, "",
|
||||
containerName,
|
||||
|
@ -170,7 +170,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
|
|||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if err2 := s.storageRuntimeServer.RemovePodSandbox(id); err2 != nil {
|
||||
if err2 := s.StorageRuntimeServer().RemovePodSandbox(id); err2 != nil {
|
||||
logrus.Warnf("couldn't cleanup pod sandbox %q: %v", id, err2)
|
||||
}
|
||||
}
|
||||
|
@ -447,7 +447,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
|
|||
}
|
||||
|
||||
saveOptions := generate.ExportOptions{}
|
||||
mountPoint, err := s.storageRuntimeServer.StartContainer(id)
|
||||
mountPoint, err := s.StorageRuntimeServer().StartContainer(id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to mount container %s in pod sandbox %s(%s): %v", containerName, sb.Name(), id, err)
|
||||
}
|
||||
|
@ -524,12 +524,12 @@ func convertPortMappings(in []*pb.PortMapping) []*hostport.PortMapping {
|
|||
}
|
||||
|
||||
func (s *Server) setPodSandboxMountLabel(id, mountLabel string) error {
|
||||
storageMetadata, err := s.storageRuntimeServer.GetContainerMetadata(id)
|
||||
storageMetadata, err := s.StorageRuntimeServer().GetContainerMetadata(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storageMetadata.SetMountLabel(mountLabel)
|
||||
return s.storageRuntimeServer.SetContainerMetadata(id, storageMetadata)
|
||||
return s.StorageRuntimeServer().SetContainerMetadata(id, storageMetadata)
|
||||
}
|
||||
|
||||
func getSELinuxLabels(selinuxOptions *pb.SELinuxOption) (processLabel string, mountLabel string, err error) {
|
||||
|
|
|
@ -80,7 +80,7 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque
|
|||
if c.ID() == podInfraContainer.ID() {
|
||||
continue
|
||||
}
|
||||
if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
// assume container already umounted
|
||||
logrus.Warnf("failed to stop container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err)
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque
|
|||
}
|
||||
}
|
||||
}
|
||||
if err := s.storageRuntimeServer.StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
if err := s.StorageRuntimeServer().StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown {
|
||||
logrus.Warnf("failed to stop sandbox container in pod sandbox %s: %v", sb.ID(), err)
|
||||
}
|
||||
|
||||
|
|
366
server/server.go
366
server/server.go
|
@ -6,22 +6,16 @@ import (
|
|||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
cstorage "github.com/containers/storage"
|
||||
"github.com/kubernetes-incubator/cri-o/libkpod"
|
||||
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
|
||||
"github.com/kubernetes-incubator/cri-o/oci"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/annotations"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/storage"
|
||||
"github.com/kubernetes-incubator/cri-o/server/apparmor"
|
||||
"github.com/kubernetes-incubator/cri-o/server/seccomp"
|
||||
rspec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/opencontainers/selinux/go-selinux/label"
|
||||
knet "k8s.io/apimachinery/pkg/util/net"
|
||||
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
|
||||
|
@ -50,13 +44,12 @@ type streamService struct {
|
|||
|
||||
// Server implements the RuntimeService and ImageService
|
||||
type Server struct {
|
||||
libkpod.ContainerServer
|
||||
*libkpod.ContainerServer
|
||||
config Config
|
||||
|
||||
storageRuntimeServer storage.RuntimeServer
|
||||
updateLock sync.RWMutex
|
||||
netPlugin ocicni.CNIPlugin
|
||||
hostportManager hostport.HostPortManager
|
||||
updateLock sync.RWMutex
|
||||
netPlugin ocicni.CNIPlugin
|
||||
hostportManager hostport.HostPortManager
|
||||
|
||||
seccompEnabled bool
|
||||
seccompProfile seccomp.Seccomp
|
||||
|
@ -82,208 +75,6 @@ func (s *Server) GetPortForward(req *pb.PortForwardRequest) (*pb.PortForwardResp
|
|||
return s.stream.streamServer.GetPortForward(req)
|
||||
}
|
||||
|
||||
func (s *Server) loadContainer(id string) error {
|
||||
config, err := s.Store().FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = s.ReserveContainerName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.ReleaseContainerName(name)
|
||||
}
|
||||
}()
|
||||
|
||||
var metadata pb.ContainerMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
sb := s.getSandbox(m.Annotations[annotations.SandboxID])
|
||||
if sb == nil {
|
||||
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
|
||||
}
|
||||
|
||||
tty := isTrue(m.Annotations[annotations.TTY])
|
||||
stdin := isTrue(m.Annotations[annotations.Stdin])
|
||||
stdinOnce := isTrue(m.Annotations[annotations.StdinOnce])
|
||||
|
||||
containerPath, err := s.Store().ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containerDir, err := s.Store().ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
img, ok := m.Annotations[annotations.Image]
|
||||
if !ok {
|
||||
img = ""
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, img, &metadata, sb.ID(), tty, stdin, stdinOnce, sb.Privileged(), sb.Trusted(), containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ContainerStateFromDisk(ctr)
|
||||
|
||||
s.addContainer(ctr)
|
||||
return s.CtrIDIndex().Add(id)
|
||||
}
|
||||
|
||||
func configNetNsPath(spec rspec.Spec) (string, error) {
|
||||
for _, ns := range spec.Linux.Namespaces {
|
||||
if ns.Type != rspec.NetworkNamespace {
|
||||
continue
|
||||
}
|
||||
|
||||
if ns.Path == "" {
|
||||
return "", fmt.Errorf("empty networking namespace")
|
||||
}
|
||||
|
||||
return ns.Path, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("missing networking namespace")
|
||||
}
|
||||
|
||||
func (s *Server) loadSandbox(id string) error {
|
||||
config, err := s.Store().FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = s.ReservePodName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.ReleasePodName(name)
|
||||
}
|
||||
}()
|
||||
var metadata pb.PodSandboxMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
processLabel, mountLabel, err := label.InitLabels(label.DupSecOpt(m.Process.SelinuxLabel))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime])
|
||||
trusted := isTrue(m.Annotations[annotations.TrustedSandbox])
|
||||
|
||||
sb, err := sandbox.New(id, name, m.Annotations[annotations.KubeName], filepath.Dir(m.Annotations[annotations.LogPath]), "", labels, kubeAnnotations, processLabel, mountLabel, &metadata, m.Annotations[annotations.ShmPath], "", privileged, trusted, m.Annotations[annotations.ResolvPath], "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We add a netNS only if we can load a permanent one.
|
||||
// Otherwise, the sandbox will live in the host namespace.
|
||||
netNsPath, err := configNetNsPath(m)
|
||||
if err == nil {
|
||||
nsErr := sb.NetNsJoin(netNsPath, sb.Name())
|
||||
// If we can't load the networking namespace
|
||||
// because it's closed, we just set the sb netns
|
||||
// pointer to nil. Otherwise we return an error.
|
||||
if nsErr != nil && nsErr != sandbox.ErrClosedNetNS {
|
||||
return nsErr
|
||||
}
|
||||
}
|
||||
|
||||
s.addSandbox(sb)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.removeSandbox(sb.ID())
|
||||
}
|
||||
}()
|
||||
|
||||
sandboxPath, err := s.Store().ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sandboxDir, err := s.Store().ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cname, err := s.ReserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.ReleaseContainerName(cname)
|
||||
}
|
||||
}()
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, "", nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ContainerStateFromDisk(scontainer)
|
||||
|
||||
if err = label.ReserveLabel(processLabel); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.SetInfraContainer(scontainer)
|
||||
if err = s.CtrIDIndex().Add(scontainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.PodIDIndex().Add(id); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) restore() {
|
||||
containers, err := s.Store().Containers()
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
|
@ -292,7 +83,7 @@ func (s *Server) restore() {
|
|||
pods := map[string]*storage.RuntimeContainerMetadata{}
|
||||
podContainers := map[string]*storage.RuntimeContainerMetadata{}
|
||||
for _, container := range containers {
|
||||
metadata, err2 := s.storageRuntimeServer.GetContainerMetadata(container.ID)
|
||||
metadata, err2 := s.StorageRuntimeServer().GetContainerMetadata(container.ID)
|
||||
if err2 != nil {
|
||||
logrus.Warnf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
|
||||
continue
|
||||
|
@ -304,12 +95,12 @@ func (s *Server) restore() {
|
|||
}
|
||||
}
|
||||
for containerID, metadata := range pods {
|
||||
if err = s.loadSandbox(containerID); err != nil {
|
||||
if err = s.LoadSandbox(containerID); err != nil {
|
||||
logrus.Warnf("could not restore sandbox %s container %s: %v", metadata.PodID, containerID, err)
|
||||
}
|
||||
}
|
||||
for containerID := range podContainers {
|
||||
if err := s.loadContainer(containerID); err != nil {
|
||||
if err := s.LoadContainer(containerID); err != nil {
|
||||
logrus.Warnf("could not restore container %s: %v", containerID, err)
|
||||
}
|
||||
}
|
||||
|
@ -320,116 +111,11 @@ func (s *Server) restore() {
|
|||
// having been modified by other parties
|
||||
func (s *Server) Update() {
|
||||
logrus.Debugf("updating sandbox and container information")
|
||||
if err := s.update(); err != nil {
|
||||
if err := s.ContainerServer.Update(); err != nil {
|
||||
logrus.Errorf("error updating sandbox and container information: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) update() error {
|
||||
s.updateLock.Lock()
|
||||
defer s.updateLock.Unlock()
|
||||
|
||||
containers, err := s.Store().Containers()
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
logrus.Warnf("could not read containers and sandboxes: %v", err)
|
||||
return err
|
||||
}
|
||||
newPods := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPods := map[string]string{}
|
||||
removedPods := map[string]string{}
|
||||
newPodContainers := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPodContainers := map[string]string{}
|
||||
removedPodContainers := map[string]string{}
|
||||
for _, container := range containers {
|
||||
if s.hasSandbox(container.ID) {
|
||||
// FIXME: do we need to reload/update any info about the sandbox?
|
||||
oldPods[container.ID] = container.ID
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
if s.getContainer(container.ID) != nil {
|
||||
// FIXME: do we need to reload/update any info about the container?
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
// not previously known, so figure out what it is
|
||||
metadata, err2 := s.storageRuntimeServer.GetContainerMetadata(container.ID)
|
||||
if err2 != nil {
|
||||
logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
|
||||
continue
|
||||
}
|
||||
if metadata.Pod {
|
||||
newPods[container.ID] = &metadata
|
||||
} else {
|
||||
newPodContainers[container.ID] = &metadata
|
||||
}
|
||||
}
|
||||
s.CtrIDIndex().Iterate(func(id string) {
|
||||
if _, ok := oldPodContainers[id]; !ok {
|
||||
// this container's ID wasn't in the updated list -> removed
|
||||
removedPodContainers[id] = id
|
||||
}
|
||||
})
|
||||
for removedPodContainer := range removedPodContainers {
|
||||
// forget this container
|
||||
c := s.getContainer(removedPodContainer)
|
||||
if c == nil {
|
||||
logrus.Warnf("bad state when getting container removed %+v", removedPodContainer)
|
||||
continue
|
||||
}
|
||||
s.ReleaseContainerName(c.Name())
|
||||
s.removeContainer(c)
|
||||
if err = s.CtrIDIndex().Delete(c.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod container %s", c.ID())
|
||||
}
|
||||
s.PodIDIndex().Iterate(func(id string) {
|
||||
if _, ok := oldPods[id]; !ok {
|
||||
// this pod's ID wasn't in the updated list -> removed
|
||||
removedPods[id] = id
|
||||
}
|
||||
})
|
||||
for removedPod := range removedPods {
|
||||
// forget this pod
|
||||
sb := s.getSandbox(removedPod)
|
||||
if sb == nil {
|
||||
logrus.Warnf("bad state when getting pod to remove %+v", removedPod)
|
||||
continue
|
||||
}
|
||||
podInfraContainer := sb.InfraContainer()
|
||||
s.ReleaseContainerName(podInfraContainer.Name())
|
||||
s.removeContainer(podInfraContainer)
|
||||
if err = s.CtrIDIndex().Delete(podInfraContainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.RemoveInfraContainer()
|
||||
s.ReleasePodName(sb.Name())
|
||||
s.removeSandbox(sb.ID())
|
||||
if err = s.PodIDIndex().Delete(sb.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod %s", sb.ID())
|
||||
}
|
||||
for sandboxID := range newPods {
|
||||
// load this pod
|
||||
if err = s.loadSandbox(sandboxID); err != nil {
|
||||
logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod sandbox %s", sandboxID, err)
|
||||
}
|
||||
}
|
||||
for containerID := range newPodContainers {
|
||||
// load this container
|
||||
if err = s.loadContainer(containerID); err != nil {
|
||||
logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod container %s", containerID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown
|
||||
func (s *Server) cleanupSandboxesOnShutdown() {
|
||||
_, err := os.Stat(shutdownFile)
|
||||
|
@ -456,26 +142,6 @@ func (s *Server) Shutdown() error {
|
|||
|
||||
// New creates a new Server with options provided
|
||||
func New(config *Config) (*Server, error) {
|
||||
store, err := cstorage.GetStore(cstorage.StoreOptions{
|
||||
RunRoot: config.RunRoot,
|
||||
GraphRoot: config.Root,
|
||||
GraphDriverName: config.Storage,
|
||||
GraphDriverOptions: config.StorageOptions,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
imageService, err := storage.GetImageService(store, config.DefaultTransport, config.InsecureRegistries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storageRuntimeService := storage.GetRuntimeService(imageService, config.PauseImage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll("/var/run/crio", 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -493,14 +159,14 @@ func New(config *Config) (*Server, error) {
|
|||
hostportManager := hostport.NewHostportManager()
|
||||
|
||||
s := &Server{
|
||||
ContainerServer: *containerServer,
|
||||
storageRuntimeServer: storageRuntimeService,
|
||||
netPlugin: netPlugin,
|
||||
hostportManager: hostportManager,
|
||||
config: *config,
|
||||
seccompEnabled: seccomp.IsEnabled(),
|
||||
appArmorEnabled: apparmor.IsEnabled(),
|
||||
appArmorProfile: config.ApparmorProfile,
|
||||
ContainerServer: containerServer,
|
||||
|
||||
netPlugin: netPlugin,
|
||||
hostportManager: hostportManager,
|
||||
config: *config,
|
||||
seccompEnabled: seccomp.IsEnabled(),
|
||||
appArmorEnabled: apparmor.IsEnabled(),
|
||||
appArmorProfile: config.ApparmorProfile,
|
||||
}
|
||||
|
||||
if s.seccompEnabled {
|
||||
|
|
Loading…
Reference in a new issue