From b56da85fc13291daa85d113cc98f033ef1e7ddef Mon Sep 17 00:00:00 2001 From: Ryan Cole Date: Mon, 31 Jul 2017 14:38:45 -0400 Subject: [PATCH] Make container server update it's list of containers upon creation Signed-off-by: Ryan Cole --- libkpod/container_server.go | 368 ++++++++++++++++++++++++++++++++++-- server/container_create.go | 6 +- server/container_remove.go | 4 +- server/container_stop.go | 2 +- server/sandbox_remove.go | 8 +- server/sandbox_run.go | 10 +- server/sandbox_stop.go | 4 +- server/server.go | 366 ++--------------------------------- 8 files changed, 385 insertions(+), 383 deletions(-) diff --git a/libkpod/container_server.go b/libkpod/container_server.go index 2f36ec16..749e8d99 100644 --- a/libkpod/container_server.go +++ b/libkpod/container_server.go @@ -3,7 +3,10 @@ package libkpod import ( "encoding/json" "fmt" + "os" + "path/filepath" "sync" + "time" "github.com/Sirupsen/logrus" "github.com/containers/image/types" @@ -13,18 +16,24 @@ import ( "github.com/docker/docker/pkg/truncindex" "github.com/kubernetes-incubator/cri-o/libkpod/sandbox" "github.com/kubernetes-incubator/cri-o/oci" + "github.com/kubernetes-incubator/cri-o/pkg/annotations" "github.com/kubernetes-incubator/cri-o/pkg/storage" + rspec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/selinux/go-selinux/label" + pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) // ContainerServer implements the ImageServer type ContainerServer struct { - runtime *oci.Runtime - store cstorage.Store - storageImageServer storage.ImageServer - ctrNameIndex *registrar.Registrar - ctrIDIndex *truncindex.TruncIndex - podNameIndex *registrar.Registrar - podIDIndex *truncindex.TruncIndex + runtime *oci.Runtime + store cstorage.Store + storageImageServer storage.ImageServer + storageRuntimeServer storage.RuntimeServer + updateLock sync.RWMutex + ctrNameIndex *registrar.Registrar + ctrIDIndex *truncindex.TruncIndex + podNameIndex *registrar.Registrar + podIDIndex *truncindex.TruncIndex imageContext *types.SystemContext stateLock sync.Locker @@ -77,6 +86,11 @@ func (c *ContainerServer) Config() *Config { return c.config } +// StorageRuntimeServer gets the runtime server for the ContainerServer +func (c *ContainerServer) StorageRuntimeServer() storage.RuntimeServer { + return c.storageRuntimeServer +} + // New creates a new ContainerServer with options provided func New(config *Config) (*ContainerServer, error) { store, err := cstorage.GetStore(cstorage.StoreOptions{ @@ -94,6 +108,11 @@ func New(config *Config) (*ContainerServer, error) { return nil, err } + storageRuntimeService := storage.GetRuntimeService(imageService, config.PauseImage) + if err != nil { + return nil, err + } + runtime, err := oci.New(config.Runtime, config.RuntimeUntrustedWorkload, config.DefaultWorkloadTrust, config.Conmon, config.ConmonEnv, config.CgroupManager) if err != nil { return nil, err @@ -111,15 +130,16 @@ func New(config *Config) (*ContainerServer, error) { } return &ContainerServer{ - runtime: runtime, - store: store, - storageImageServer: imageService, - ctrNameIndex: registrar.NewRegistrar(), - ctrIDIndex: truncindex.NewTruncIndex([]string{}), - podNameIndex: registrar.NewRegistrar(), - podIDIndex: truncindex.NewTruncIndex([]string{}), - imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath}, - stateLock: lock, + runtime: runtime, + store: store, + storageImageServer: imageService, + storageRuntimeServer: storageRuntimeService, + ctrNameIndex: registrar.NewRegistrar(), + ctrIDIndex: truncindex.NewTruncIndex([]string{}), + podNameIndex: registrar.NewRegistrar(), + podIDIndex: truncindex.NewTruncIndex([]string{}), + imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath}, + stateLock: lock, state: &containerServerState{ containers: oci.NewMemoryStore(), sandboxes: make(map[string]*sandbox.Sandbox), @@ -128,6 +148,322 @@ func New(config *Config) (*ContainerServer, error) { }, nil } +// Update makes changes to the server's state (lists of pods and containers) to +// reflect the list of pods and containers that are stored on disk, possibly +// having been modified by other parties +func (c *ContainerServer) Update() error { + c.updateLock.Lock() + defer c.updateLock.Unlock() + + containers, err := c.store.Containers() + if err != nil && !os.IsNotExist(err) { + logrus.Warnf("could not read containers and sandboxes: %v", err) + return err + } + newPods := map[string]*storage.RuntimeContainerMetadata{} + oldPods := map[string]string{} + removedPods := map[string]string{} + newPodContainers := map[string]*storage.RuntimeContainerMetadata{} + oldPodContainers := map[string]string{} + removedPodContainers := map[string]string{} + for _, container := range containers { + if c.HasSandbox(container.ID) { + // FIXME: do we need to reload/update any info about the sandbox? + oldPods[container.ID] = container.ID + oldPodContainers[container.ID] = container.ID + continue + } + if c.GetContainer(container.ID) != nil { + // FIXME: do we need to reload/update any info about the container? + oldPodContainers[container.ID] = container.ID + continue + } + // not previously known, so figure out what it is + metadata, err2 := c.storageRuntimeServer.GetContainerMetadata(container.ID) + if err2 != nil { + logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2) + continue + } + if metadata.Pod { + newPods[container.ID] = &metadata + } else { + newPodContainers[container.ID] = &metadata + } + } + c.ctrIDIndex.Iterate(func(id string) { + if _, ok := oldPodContainers[id]; !ok { + // this container's ID wasn't in the updated list -> removed + removedPodContainers[id] = id + } + }) + for removedPodContainer := range removedPodContainers { + // forget this container + ctr := c.GetContainer(removedPodContainer) + if ctr == nil { + logrus.Warnf("bad state when getting container removed %+v", removedPodContainer) + continue + } + c.ReleaseContainerName(ctr.Name()) + c.RemoveContainer(ctr) + if err = c.ctrIDIndex.Delete(ctr.ID()); err != nil { + return err + } + logrus.Debugf("forgetting removed pod container %s", ctr.ID()) + } + c.PodIDIndex().Iterate(func(id string) { + if _, ok := oldPods[id]; !ok { + // this pod's ID wasn't in the updated list -> removed + removedPods[id] = id + } + }) + for removedPod := range removedPods { + // forget this pod + sb := c.GetSandbox(removedPod) + if sb == nil { + logrus.Warnf("bad state when getting pod to remove %+v", removedPod) + continue + } + podInfraContainer := sb.InfraContainer() + c.ReleaseContainerName(podInfraContainer.Name()) + c.RemoveContainer(podInfraContainer) + if err = c.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil { + return err + } + sb.RemoveInfraContainer() + c.ReleasePodName(sb.Name()) + c.RemoveSandbox(sb.ID()) + if err = c.podIDIndex.Delete(sb.ID()); err != nil { + return err + } + logrus.Debugf("forgetting removed pod %s", sb.ID()) + } + for sandboxID := range newPods { + // load this pod + if err = c.LoadSandbox(sandboxID); err != nil { + logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err) + } else { + logrus.Debugf("loaded new pod sandbox %s", sandboxID, err) + } + } + for containerID := range newPodContainers { + // load this container + if err = c.LoadContainer(containerID); err != nil { + logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err) + } else { + logrus.Debugf("loaded new pod container %s", containerID, err) + } + } + return nil +} + +// LoadSandbox loads a sandbox from the disk into the sandbox store +func (c *ContainerServer) LoadSandbox(id string) error { + config, err := c.store.FromContainerDirectory(id, "config.json") + if err != nil { + return err + } + var m rspec.Spec + if err = json.Unmarshal(config, &m); err != nil { + return err + } + labels := make(map[string]string) + if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil { + return err + } + name := m.Annotations[annotations.Name] + name, err = c.ReservePodName(id, name) + if err != nil { + return err + } + defer func() { + if err != nil { + c.ReleasePodName(name) + } + }() + var metadata pb.PodSandboxMetadata + if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { + return err + } + + processLabel, mountLabel, err := label.InitLabels(label.DupSecOpt(m.Process.SelinuxLabel)) + if err != nil { + return err + } + + kubeAnnotations := make(map[string]string) + if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil { + return err + } + + privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime]) + trusted := isTrue(m.Annotations[annotations.TrustedSandbox]) + + sb, err := sandbox.New(id, name, m.Annotations[annotations.KubeName], filepath.Dir(m.Annotations[annotations.LogPath]), "", labels, kubeAnnotations, processLabel, mountLabel, &metadata, m.Annotations[annotations.ShmPath], "", privileged, trusted, m.Annotations[annotations.ResolvPath], "", nil) + if err != nil { + return err + } + + // We add a netNS only if we can load a permanent one. + // Otherwise, the sandbox will live in the host namespace. + netNsPath, err := configNetNsPath(m) + if err == nil { + nsErr := sb.NetNsJoin(netNsPath, sb.Name()) + // If we can't load the networking namespace + // because it's closed, we just set the sb netns + // pointer to nil. Otherwise we return an error. + if nsErr != nil && nsErr != sandbox.ErrClosedNetNS { + return nsErr + } + } + + c.AddSandbox(sb) + + defer func() { + if err != nil { + c.RemoveSandbox(sb.ID()) + } + }() + + sandboxPath, err := c.store.ContainerRunDirectory(id) + if err != nil { + return err + } + + sandboxDir, err := c.store.ContainerDirectory(id) + if err != nil { + return err + } + + cname, err := c.ReserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName]) + if err != nil { + return err + } + defer func() { + if err != nil { + c.ReleaseContainerName(cname) + } + }() + + created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created]) + if err != nil { + return err + } + + scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, "", nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) + if err != nil { + return err + } + + c.ContainerStateFromDisk(scontainer) + + if err = label.ReserveLabel(processLabel); err != nil { + return err + } + sb.SetInfraContainer(scontainer) + if err = c.ctrIDIndex.Add(scontainer.ID()); err != nil { + return err + } + if err = c.podIDIndex.Add(id); err != nil { + return err + } + return nil +} + +func configNetNsPath(spec rspec.Spec) (string, error) { + for _, ns := range spec.Linux.Namespaces { + if ns.Type != rspec.NetworkNamespace { + continue + } + + if ns.Path == "" { + return "", fmt.Errorf("empty networking namespace") + } + + return ns.Path, nil + } + + return "", fmt.Errorf("missing networking namespace") +} + +// LoadContainer loads a container from the disk into the container store +func (c *ContainerServer) LoadContainer(id string) error { + config, err := c.store.FromContainerDirectory(id, "config.json") + if err != nil { + return err + } + var m rspec.Spec + if err = json.Unmarshal(config, &m); err != nil { + return err + } + labels := make(map[string]string) + if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil { + return err + } + name := m.Annotations[annotations.Name] + name, err = c.ReserveContainerName(id, name) + if err != nil { + return err + } + + defer func() { + if err != nil { + c.ReleaseContainerName(name) + } + }() + + var metadata pb.ContainerMetadata + if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { + return err + } + sb := c.GetSandbox(m.Annotations[annotations.SandboxID]) + if sb == nil { + return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID]) + } + + tty := isTrue(m.Annotations[annotations.TTY]) + stdin := isTrue(m.Annotations[annotations.Stdin]) + stdinOnce := isTrue(m.Annotations[annotations.StdinOnce]) + + containerPath, err := c.store.ContainerRunDirectory(id) + if err != nil { + return err + } + + containerDir, err := c.store.ContainerDirectory(id) + if err != nil { + return err + } + + img, ok := m.Annotations[annotations.Image] + if !ok { + img = "" + } + + kubeAnnotations := make(map[string]string) + if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil { + return err + } + + created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created]) + if err != nil { + return err + } + + ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, img, &metadata, sb.ID(), tty, stdin, stdinOnce, sb.Privileged(), sb.Trusted(), containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) + if err != nil { + return err + } + + c.ContainerStateFromDisk(ctr) + + c.AddContainer(ctr) + return c.ctrIDIndex.Add(id) +} + +func isTrue(annotaton string) bool { + return annotaton == "true" +} + // ContainerStateFromDisk retrieves information on the state of a running container // from the disk func (c *ContainerServer) ContainerStateFromDisk(ctr *oci.Container) error { diff --git a/server/container_create.go b/server/container_create.go index f5029b06..2e6f500a 100644 --- a/server/container_create.go +++ b/server/container_create.go @@ -314,7 +314,7 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq } defer func() { if err != nil { - err2 := s.storageRuntimeServer.DeleteContainer(containerID) + err2 := s.StorageRuntimeServer().DeleteContainer(containerID) if err2 != nil { logrus.Warnf("Failed to cleanup container directory: %v", err2) } @@ -613,7 +613,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string, metaname := metadata.Name attempt := metadata.Attempt - containerInfo, err := s.storageRuntimeServer.CreateContainer(s.ImageContext(), + containerInfo, err := s.StorageRuntimeServer().CreateContainer(s.ImageContext(), sb.Name(), sb.ID(), image, image, containerName, containerID, @@ -625,7 +625,7 @@ func (s *Server) createSandboxContainer(ctx context.Context, containerID string, return nil, err } - mountPoint, err := s.storageRuntimeServer.StartContainer(containerID) + mountPoint, err := s.StorageRuntimeServer().StartContainer(containerID) if err != nil { return nil, fmt.Errorf("failed to mount container %s(%s): %v", containerName, containerID, err) } diff --git a/server/container_remove.go b/server/container_remove.go index 8e330e60..1c102e05 100644 --- a/server/container_remove.go +++ b/server/container_remove.go @@ -27,7 +27,7 @@ func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerReq if err := s.Runtime().StopContainer(c, -1); err != nil { return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err) } - if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil { + if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil { return nil, fmt.Errorf("failed to unmount container %s: %v", c.ID(), err) } } @@ -38,7 +38,7 @@ func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerReq s.removeContainer(c) - if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil { + if err := s.StorageRuntimeServer().DeleteContainer(c.ID()); err != nil { return nil, fmt.Errorf("failed to delete storage for container %s: %v", c.ID(), err) } diff --git a/server/container_stop.go b/server/container_stop.go index ccbde3cc..47339db1 100644 --- a/server/container_stop.go +++ b/server/container_stop.go @@ -25,7 +25,7 @@ func (s *Server) StopContainer(ctx context.Context, req *pb.StopContainerRequest if err := s.Runtime().StopContainer(c, req.Timeout); err != nil { return nil, fmt.Errorf("failed to stop container %s: %v", c.ID(), err) } - if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil { + if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil { return nil, fmt.Errorf("failed to unmount container %s: %v", c.ID(), err) } } diff --git a/server/sandbox_remove.go b/server/sandbox_remove.go index 676ea1d8..cc380354 100644 --- a/server/sandbox_remove.go +++ b/server/sandbox_remove.go @@ -57,11 +57,11 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR continue } - if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { + if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { // assume container already umounted logrus.Warnf("failed to stop container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err) } - if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { + if err := s.StorageRuntimeServer().DeleteContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err) } @@ -75,10 +75,10 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR s.removeContainer(podInfraContainer) // Remove the files related to the sandbox - if err := s.storageRuntimeServer.StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown { + if err := s.StorageRuntimeServer().StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown { logrus.Warnf("failed to stop sandbox container in pod sandbox %s: %v", sb.ID(), err) } - if err := s.storageRuntimeServer.RemovePodSandbox(sb.ID()); err != nil && err != pkgstorage.ErrInvalidSandboxID { + if err := s.StorageRuntimeServer().RemovePodSandbox(sb.ID()); err != nil && err != pkgstorage.ErrInvalidSandboxID { return nil, fmt.Errorf("failed to remove pod sandbox %s: %v", sb.ID(), err) } diff --git a/server/sandbox_run.go b/server/sandbox_run.go index 9bcd9005..89bcfe14 100644 --- a/server/sandbox_run.go +++ b/server/sandbox_run.go @@ -153,7 +153,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest } }() - podContainer, err := s.storageRuntimeServer.CreatePodSandbox(s.ImageContext(), + podContainer, err := s.StorageRuntimeServer().CreatePodSandbox(s.ImageContext(), name, id, s.config.PauseImage, "", containerName, @@ -170,7 +170,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest } defer func() { if err != nil { - if err2 := s.storageRuntimeServer.RemovePodSandbox(id); err2 != nil { + if err2 := s.StorageRuntimeServer().RemovePodSandbox(id); err2 != nil { logrus.Warnf("couldn't cleanup pod sandbox %q: %v", id, err2) } } @@ -447,7 +447,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest } saveOptions := generate.ExportOptions{} - mountPoint, err := s.storageRuntimeServer.StartContainer(id) + mountPoint, err := s.StorageRuntimeServer().StartContainer(id) if err != nil { return nil, fmt.Errorf("failed to mount container %s in pod sandbox %s(%s): %v", containerName, sb.Name(), id, err) } @@ -524,12 +524,12 @@ func convertPortMappings(in []*pb.PortMapping) []*hostport.PortMapping { } func (s *Server) setPodSandboxMountLabel(id, mountLabel string) error { - storageMetadata, err := s.storageRuntimeServer.GetContainerMetadata(id) + storageMetadata, err := s.StorageRuntimeServer().GetContainerMetadata(id) if err != nil { return err } storageMetadata.SetMountLabel(mountLabel) - return s.storageRuntimeServer.SetContainerMetadata(id, storageMetadata) + return s.StorageRuntimeServer().SetContainerMetadata(id, storageMetadata) } func getSELinuxLabels(selinuxOptions *pb.SELinuxOption) (processLabel string, mountLabel string, err error) { diff --git a/server/sandbox_stop.go b/server/sandbox_stop.go index 3f21b9e7..61e6e3e6 100644 --- a/server/sandbox_stop.go +++ b/server/sandbox_stop.go @@ -80,7 +80,7 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque if c.ID() == podInfraContainer.ID() { continue } - if err := s.storageRuntimeServer.StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { + if err := s.StorageRuntimeServer().StopContainer(c.ID()); err != nil && err != storage.ErrContainerUnknown { // assume container already umounted logrus.Warnf("failed to stop container %s in pod sandbox %s: %v", c.Name(), sb.ID(), err) } @@ -108,7 +108,7 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque } } } - if err := s.storageRuntimeServer.StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown { + if err := s.StorageRuntimeServer().StopContainer(sb.ID()); err != nil && err != storage.ErrContainerUnknown { logrus.Warnf("failed to stop sandbox container in pod sandbox %s: %v", sb.ID(), err) } diff --git a/server/server.go b/server/server.go index fa4ef56f..e5d19c9c 100644 --- a/server/server.go +++ b/server/server.go @@ -6,22 +6,16 @@ import ( "io/ioutil" "net" "os" - "path/filepath" "sync" - "time" "github.com/Sirupsen/logrus" - cstorage "github.com/containers/storage" "github.com/kubernetes-incubator/cri-o/libkpod" "github.com/kubernetes-incubator/cri-o/libkpod/sandbox" "github.com/kubernetes-incubator/cri-o/oci" - "github.com/kubernetes-incubator/cri-o/pkg/annotations" "github.com/kubernetes-incubator/cri-o/pkg/ocicni" "github.com/kubernetes-incubator/cri-o/pkg/storage" "github.com/kubernetes-incubator/cri-o/server/apparmor" "github.com/kubernetes-incubator/cri-o/server/seccomp" - rspec "github.com/opencontainers/runtime-spec/specs-go" - "github.com/opencontainers/selinux/go-selinux/label" knet "k8s.io/apimachinery/pkg/util/net" pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/network/hostport" @@ -50,13 +44,12 @@ type streamService struct { // Server implements the RuntimeService and ImageService type Server struct { - libkpod.ContainerServer + *libkpod.ContainerServer config Config - storageRuntimeServer storage.RuntimeServer - updateLock sync.RWMutex - netPlugin ocicni.CNIPlugin - hostportManager hostport.HostPortManager + updateLock sync.RWMutex + netPlugin ocicni.CNIPlugin + hostportManager hostport.HostPortManager seccompEnabled bool seccompProfile seccomp.Seccomp @@ -82,208 +75,6 @@ func (s *Server) GetPortForward(req *pb.PortForwardRequest) (*pb.PortForwardResp return s.stream.streamServer.GetPortForward(req) } -func (s *Server) loadContainer(id string) error { - config, err := s.Store().FromContainerDirectory(id, "config.json") - if err != nil { - return err - } - var m rspec.Spec - if err = json.Unmarshal(config, &m); err != nil { - return err - } - labels := make(map[string]string) - if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil { - return err - } - name := m.Annotations[annotations.Name] - name, err = s.ReserveContainerName(id, name) - if err != nil { - return err - } - - defer func() { - if err != nil { - s.ReleaseContainerName(name) - } - }() - - var metadata pb.ContainerMetadata - if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { - return err - } - sb := s.getSandbox(m.Annotations[annotations.SandboxID]) - if sb == nil { - return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID]) - } - - tty := isTrue(m.Annotations[annotations.TTY]) - stdin := isTrue(m.Annotations[annotations.Stdin]) - stdinOnce := isTrue(m.Annotations[annotations.StdinOnce]) - - containerPath, err := s.Store().ContainerRunDirectory(id) - if err != nil { - return err - } - - containerDir, err := s.Store().ContainerDirectory(id) - if err != nil { - return err - } - - img, ok := m.Annotations[annotations.Image] - if !ok { - img = "" - } - - kubeAnnotations := make(map[string]string) - if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil { - return err - } - - created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created]) - if err != nil { - return err - } - - ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, img, &metadata, sb.ID(), tty, stdin, stdinOnce, sb.Privileged(), sb.Trusted(), containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) - if err != nil { - return err - } - - s.ContainerStateFromDisk(ctr) - - s.addContainer(ctr) - return s.CtrIDIndex().Add(id) -} - -func configNetNsPath(spec rspec.Spec) (string, error) { - for _, ns := range spec.Linux.Namespaces { - if ns.Type != rspec.NetworkNamespace { - continue - } - - if ns.Path == "" { - return "", fmt.Errorf("empty networking namespace") - } - - return ns.Path, nil - } - - return "", fmt.Errorf("missing networking namespace") -} - -func (s *Server) loadSandbox(id string) error { - config, err := s.Store().FromContainerDirectory(id, "config.json") - if err != nil { - return err - } - var m rspec.Spec - if err = json.Unmarshal(config, &m); err != nil { - return err - } - labels := make(map[string]string) - if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil { - return err - } - name := m.Annotations[annotations.Name] - name, err = s.ReservePodName(id, name) - if err != nil { - return err - } - defer func() { - if err != nil { - s.ReleasePodName(name) - } - }() - var metadata pb.PodSandboxMetadata - if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { - return err - } - - processLabel, mountLabel, err := label.InitLabels(label.DupSecOpt(m.Process.SelinuxLabel)) - if err != nil { - return err - } - - kubeAnnotations := make(map[string]string) - if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil { - return err - } - - privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime]) - trusted := isTrue(m.Annotations[annotations.TrustedSandbox]) - - sb, err := sandbox.New(id, name, m.Annotations[annotations.KubeName], filepath.Dir(m.Annotations[annotations.LogPath]), "", labels, kubeAnnotations, processLabel, mountLabel, &metadata, m.Annotations[annotations.ShmPath], "", privileged, trusted, m.Annotations[annotations.ResolvPath], "", nil) - if err != nil { - return err - } - - // We add a netNS only if we can load a permanent one. - // Otherwise, the sandbox will live in the host namespace. - netNsPath, err := configNetNsPath(m) - if err == nil { - nsErr := sb.NetNsJoin(netNsPath, sb.Name()) - // If we can't load the networking namespace - // because it's closed, we just set the sb netns - // pointer to nil. Otherwise we return an error. - if nsErr != nil && nsErr != sandbox.ErrClosedNetNS { - return nsErr - } - } - - s.addSandbox(sb) - - defer func() { - if err != nil { - s.removeSandbox(sb.ID()) - } - }() - - sandboxPath, err := s.Store().ContainerRunDirectory(id) - if err != nil { - return err - } - - sandboxDir, err := s.Store().ContainerDirectory(id) - if err != nil { - return err - } - - cname, err := s.ReserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName]) - if err != nil { - return err - } - defer func() { - if err != nil { - s.ReleaseContainerName(cname) - } - }() - - created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created]) - if err != nil { - return err - } - - scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, "", nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) - if err != nil { - return err - } - - s.ContainerStateFromDisk(scontainer) - - if err = label.ReserveLabel(processLabel); err != nil { - return err - } - sb.SetInfraContainer(scontainer) - if err = s.CtrIDIndex().Add(scontainer.ID()); err != nil { - return err - } - if err = s.PodIDIndex().Add(id); err != nil { - return err - } - return nil -} - func (s *Server) restore() { containers, err := s.Store().Containers() if err != nil && !os.IsNotExist(err) { @@ -292,7 +83,7 @@ func (s *Server) restore() { pods := map[string]*storage.RuntimeContainerMetadata{} podContainers := map[string]*storage.RuntimeContainerMetadata{} for _, container := range containers { - metadata, err2 := s.storageRuntimeServer.GetContainerMetadata(container.ID) + metadata, err2 := s.StorageRuntimeServer().GetContainerMetadata(container.ID) if err2 != nil { logrus.Warnf("error parsing metadata for %s: %v, ignoring", container.ID, err2) continue @@ -304,12 +95,12 @@ func (s *Server) restore() { } } for containerID, metadata := range pods { - if err = s.loadSandbox(containerID); err != nil { + if err = s.LoadSandbox(containerID); err != nil { logrus.Warnf("could not restore sandbox %s container %s: %v", metadata.PodID, containerID, err) } } for containerID := range podContainers { - if err := s.loadContainer(containerID); err != nil { + if err := s.LoadContainer(containerID); err != nil { logrus.Warnf("could not restore container %s: %v", containerID, err) } } @@ -320,116 +111,11 @@ func (s *Server) restore() { // having been modified by other parties func (s *Server) Update() { logrus.Debugf("updating sandbox and container information") - if err := s.update(); err != nil { + if err := s.ContainerServer.Update(); err != nil { logrus.Errorf("error updating sandbox and container information: %v", err) } } -func (s *Server) update() error { - s.updateLock.Lock() - defer s.updateLock.Unlock() - - containers, err := s.Store().Containers() - if err != nil && !os.IsNotExist(err) { - logrus.Warnf("could not read containers and sandboxes: %v", err) - return err - } - newPods := map[string]*storage.RuntimeContainerMetadata{} - oldPods := map[string]string{} - removedPods := map[string]string{} - newPodContainers := map[string]*storage.RuntimeContainerMetadata{} - oldPodContainers := map[string]string{} - removedPodContainers := map[string]string{} - for _, container := range containers { - if s.hasSandbox(container.ID) { - // FIXME: do we need to reload/update any info about the sandbox? - oldPods[container.ID] = container.ID - oldPodContainers[container.ID] = container.ID - continue - } - if s.getContainer(container.ID) != nil { - // FIXME: do we need to reload/update any info about the container? - oldPodContainers[container.ID] = container.ID - continue - } - // not previously known, so figure out what it is - metadata, err2 := s.storageRuntimeServer.GetContainerMetadata(container.ID) - if err2 != nil { - logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2) - continue - } - if metadata.Pod { - newPods[container.ID] = &metadata - } else { - newPodContainers[container.ID] = &metadata - } - } - s.CtrIDIndex().Iterate(func(id string) { - if _, ok := oldPodContainers[id]; !ok { - // this container's ID wasn't in the updated list -> removed - removedPodContainers[id] = id - } - }) - for removedPodContainer := range removedPodContainers { - // forget this container - c := s.getContainer(removedPodContainer) - if c == nil { - logrus.Warnf("bad state when getting container removed %+v", removedPodContainer) - continue - } - s.ReleaseContainerName(c.Name()) - s.removeContainer(c) - if err = s.CtrIDIndex().Delete(c.ID()); err != nil { - return err - } - logrus.Debugf("forgetting removed pod container %s", c.ID()) - } - s.PodIDIndex().Iterate(func(id string) { - if _, ok := oldPods[id]; !ok { - // this pod's ID wasn't in the updated list -> removed - removedPods[id] = id - } - }) - for removedPod := range removedPods { - // forget this pod - sb := s.getSandbox(removedPod) - if sb == nil { - logrus.Warnf("bad state when getting pod to remove %+v", removedPod) - continue - } - podInfraContainer := sb.InfraContainer() - s.ReleaseContainerName(podInfraContainer.Name()) - s.removeContainer(podInfraContainer) - if err = s.CtrIDIndex().Delete(podInfraContainer.ID()); err != nil { - return err - } - sb.RemoveInfraContainer() - s.ReleasePodName(sb.Name()) - s.removeSandbox(sb.ID()) - if err = s.PodIDIndex().Delete(sb.ID()); err != nil { - return err - } - logrus.Debugf("forgetting removed pod %s", sb.ID()) - } - for sandboxID := range newPods { - // load this pod - if err = s.loadSandbox(sandboxID); err != nil { - logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err) - } else { - logrus.Debugf("loaded new pod sandbox %s", sandboxID, err) - } - } - for containerID := range newPodContainers { - // load this container - if err = s.loadContainer(containerID); err != nil { - logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err) - } else { - logrus.Debugf("loaded new pod container %s", containerID, err) - } - } - return nil -} - // cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown func (s *Server) cleanupSandboxesOnShutdown() { _, err := os.Stat(shutdownFile) @@ -456,26 +142,6 @@ func (s *Server) Shutdown() error { // New creates a new Server with options provided func New(config *Config) (*Server, error) { - store, err := cstorage.GetStore(cstorage.StoreOptions{ - RunRoot: config.RunRoot, - GraphRoot: config.Root, - GraphDriverName: config.Storage, - GraphDriverOptions: config.StorageOptions, - }) - if err != nil { - return nil, err - } - - imageService, err := storage.GetImageService(store, config.DefaultTransport, config.InsecureRegistries) - if err != nil { - return nil, err - } - - storageRuntimeService := storage.GetRuntimeService(imageService, config.PauseImage) - if err != nil { - return nil, err - } - if err := os.MkdirAll("/var/run/crio", 0755); err != nil { return nil, err } @@ -493,14 +159,14 @@ func New(config *Config) (*Server, error) { hostportManager := hostport.NewHostportManager() s := &Server{ - ContainerServer: *containerServer, - storageRuntimeServer: storageRuntimeService, - netPlugin: netPlugin, - hostportManager: hostportManager, - config: *config, - seccompEnabled: seccomp.IsEnabled(), - appArmorEnabled: apparmor.IsEnabled(), - appArmorProfile: config.ApparmorProfile, + ContainerServer: containerServer, + + netPlugin: netPlugin, + hostportManager: hostportManager, + config: *config, + seccompEnabled: seccomp.IsEnabled(), + appArmorEnabled: apparmor.IsEnabled(), + appArmorProfile: config.ApparmorProfile, } if s.seccompEnabled {