Make container server update it's list of containers upon creation
Signed-off-by: Ryan Cole <rcyoalne@gmail.com>
This commit is contained in:
parent
d574177c2a
commit
b56da85fc1
8 changed files with 385 additions and 383 deletions
|
@ -3,7 +3,10 @@ package libkpod
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/containers/image/types"
|
||||
|
@ -13,18 +16,24 @@ import (
|
|||
"github.com/docker/docker/pkg/truncindex"
|
||||
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
|
||||
"github.com/kubernetes-incubator/cri-o/oci"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/annotations"
|
||||
"github.com/kubernetes-incubator/cri-o/pkg/storage"
|
||||
rspec "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/opencontainers/selinux/go-selinux/label"
|
||||
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
)
|
||||
|
||||
// ContainerServer implements the ImageServer
|
||||
type ContainerServer struct {
|
||||
runtime *oci.Runtime
|
||||
store cstorage.Store
|
||||
storageImageServer storage.ImageServer
|
||||
ctrNameIndex *registrar.Registrar
|
||||
ctrIDIndex *truncindex.TruncIndex
|
||||
podNameIndex *registrar.Registrar
|
||||
podIDIndex *truncindex.TruncIndex
|
||||
runtime *oci.Runtime
|
||||
store cstorage.Store
|
||||
storageImageServer storage.ImageServer
|
||||
storageRuntimeServer storage.RuntimeServer
|
||||
updateLock sync.RWMutex
|
||||
ctrNameIndex *registrar.Registrar
|
||||
ctrIDIndex *truncindex.TruncIndex
|
||||
podNameIndex *registrar.Registrar
|
||||
podIDIndex *truncindex.TruncIndex
|
||||
|
||||
imageContext *types.SystemContext
|
||||
stateLock sync.Locker
|
||||
|
@ -77,6 +86,11 @@ func (c *ContainerServer) Config() *Config {
|
|||
return c.config
|
||||
}
|
||||
|
||||
// StorageRuntimeServer gets the runtime server for the ContainerServer
|
||||
func (c *ContainerServer) StorageRuntimeServer() storage.RuntimeServer {
|
||||
return c.storageRuntimeServer
|
||||
}
|
||||
|
||||
// New creates a new ContainerServer with options provided
|
||||
func New(config *Config) (*ContainerServer, error) {
|
||||
store, err := cstorage.GetStore(cstorage.StoreOptions{
|
||||
|
@ -94,6 +108,11 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
storageRuntimeService := storage.GetRuntimeService(imageService, config.PauseImage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
runtime, err := oci.New(config.Runtime, config.RuntimeUntrustedWorkload, config.DefaultWorkloadTrust, config.Conmon, config.ConmonEnv, config.CgroupManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -111,15 +130,16 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
}
|
||||
|
||||
return &ContainerServer{
|
||||
runtime: runtime,
|
||||
store: store,
|
||||
storageImageServer: imageService,
|
||||
ctrNameIndex: registrar.NewRegistrar(),
|
||||
ctrIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
podNameIndex: registrar.NewRegistrar(),
|
||||
podIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath},
|
||||
stateLock: lock,
|
||||
runtime: runtime,
|
||||
store: store,
|
||||
storageImageServer: imageService,
|
||||
storageRuntimeServer: storageRuntimeService,
|
||||
ctrNameIndex: registrar.NewRegistrar(),
|
||||
ctrIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
podNameIndex: registrar.NewRegistrar(),
|
||||
podIDIndex: truncindex.NewTruncIndex([]string{}),
|
||||
imageContext: &types.SystemContext{SignaturePolicyPath: config.SignaturePolicyPath},
|
||||
stateLock: lock,
|
||||
state: &containerServerState{
|
||||
containers: oci.NewMemoryStore(),
|
||||
sandboxes: make(map[string]*sandbox.Sandbox),
|
||||
|
@ -128,6 +148,322 @@ func New(config *Config) (*ContainerServer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Update makes changes to the server's state (lists of pods and containers) to
|
||||
// reflect the list of pods and containers that are stored on disk, possibly
|
||||
// having been modified by other parties
|
||||
func (c *ContainerServer) Update() error {
|
||||
c.updateLock.Lock()
|
||||
defer c.updateLock.Unlock()
|
||||
|
||||
containers, err := c.store.Containers()
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
logrus.Warnf("could not read containers and sandboxes: %v", err)
|
||||
return err
|
||||
}
|
||||
newPods := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPods := map[string]string{}
|
||||
removedPods := map[string]string{}
|
||||
newPodContainers := map[string]*storage.RuntimeContainerMetadata{}
|
||||
oldPodContainers := map[string]string{}
|
||||
removedPodContainers := map[string]string{}
|
||||
for _, container := range containers {
|
||||
if c.HasSandbox(container.ID) {
|
||||
// FIXME: do we need to reload/update any info about the sandbox?
|
||||
oldPods[container.ID] = container.ID
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
if c.GetContainer(container.ID) != nil {
|
||||
// FIXME: do we need to reload/update any info about the container?
|
||||
oldPodContainers[container.ID] = container.ID
|
||||
continue
|
||||
}
|
||||
// not previously known, so figure out what it is
|
||||
metadata, err2 := c.storageRuntimeServer.GetContainerMetadata(container.ID)
|
||||
if err2 != nil {
|
||||
logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
|
||||
continue
|
||||
}
|
||||
if metadata.Pod {
|
||||
newPods[container.ID] = &metadata
|
||||
} else {
|
||||
newPodContainers[container.ID] = &metadata
|
||||
}
|
||||
}
|
||||
c.ctrIDIndex.Iterate(func(id string) {
|
||||
if _, ok := oldPodContainers[id]; !ok {
|
||||
// this container's ID wasn't in the updated list -> removed
|
||||
removedPodContainers[id] = id
|
||||
}
|
||||
})
|
||||
for removedPodContainer := range removedPodContainers {
|
||||
// forget this container
|
||||
ctr := c.GetContainer(removedPodContainer)
|
||||
if ctr == nil {
|
||||
logrus.Warnf("bad state when getting container removed %+v", removedPodContainer)
|
||||
continue
|
||||
}
|
||||
c.ReleaseContainerName(ctr.Name())
|
||||
c.RemoveContainer(ctr)
|
||||
if err = c.ctrIDIndex.Delete(ctr.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod container %s", ctr.ID())
|
||||
}
|
||||
c.PodIDIndex().Iterate(func(id string) {
|
||||
if _, ok := oldPods[id]; !ok {
|
||||
// this pod's ID wasn't in the updated list -> removed
|
||||
removedPods[id] = id
|
||||
}
|
||||
})
|
||||
for removedPod := range removedPods {
|
||||
// forget this pod
|
||||
sb := c.GetSandbox(removedPod)
|
||||
if sb == nil {
|
||||
logrus.Warnf("bad state when getting pod to remove %+v", removedPod)
|
||||
continue
|
||||
}
|
||||
podInfraContainer := sb.InfraContainer()
|
||||
c.ReleaseContainerName(podInfraContainer.Name())
|
||||
c.RemoveContainer(podInfraContainer)
|
||||
if err = c.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.RemoveInfraContainer()
|
||||
c.ReleasePodName(sb.Name())
|
||||
c.RemoveSandbox(sb.ID())
|
||||
if err = c.podIDIndex.Delete(sb.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("forgetting removed pod %s", sb.ID())
|
||||
}
|
||||
for sandboxID := range newPods {
|
||||
// load this pod
|
||||
if err = c.LoadSandbox(sandboxID); err != nil {
|
||||
logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod sandbox %s", sandboxID, err)
|
||||
}
|
||||
}
|
||||
for containerID := range newPodContainers {
|
||||
// load this container
|
||||
if err = c.LoadContainer(containerID); err != nil {
|
||||
logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err)
|
||||
} else {
|
||||
logrus.Debugf("loaded new pod container %s", containerID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadSandbox loads a sandbox from the disk into the sandbox store
|
||||
func (c *ContainerServer) LoadSandbox(id string) error {
|
||||
config, err := c.store.FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = c.ReservePodName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleasePodName(name)
|
||||
}
|
||||
}()
|
||||
var metadata pb.PodSandboxMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
processLabel, mountLabel, err := label.InitLabels(label.DupSecOpt(m.Process.SelinuxLabel))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
privileged := isTrue(m.Annotations[annotations.PrivilegedRuntime])
|
||||
trusted := isTrue(m.Annotations[annotations.TrustedSandbox])
|
||||
|
||||
sb, err := sandbox.New(id, name, m.Annotations[annotations.KubeName], filepath.Dir(m.Annotations[annotations.LogPath]), "", labels, kubeAnnotations, processLabel, mountLabel, &metadata, m.Annotations[annotations.ShmPath], "", privileged, trusted, m.Annotations[annotations.ResolvPath], "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We add a netNS only if we can load a permanent one.
|
||||
// Otherwise, the sandbox will live in the host namespace.
|
||||
netNsPath, err := configNetNsPath(m)
|
||||
if err == nil {
|
||||
nsErr := sb.NetNsJoin(netNsPath, sb.Name())
|
||||
// If we can't load the networking namespace
|
||||
// because it's closed, we just set the sb netns
|
||||
// pointer to nil. Otherwise we return an error.
|
||||
if nsErr != nil && nsErr != sandbox.ErrClosedNetNS {
|
||||
return nsErr
|
||||
}
|
||||
}
|
||||
|
||||
c.AddSandbox(sb)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.RemoveSandbox(sb.ID())
|
||||
}
|
||||
}()
|
||||
|
||||
sandboxPath, err := c.store.ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sandboxDir, err := c.store.ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cname, err := c.ReserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleaseContainerName(cname)
|
||||
}
|
||||
}()
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, "", nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ContainerStateFromDisk(scontainer)
|
||||
|
||||
if err = label.ReserveLabel(processLabel); err != nil {
|
||||
return err
|
||||
}
|
||||
sb.SetInfraContainer(scontainer)
|
||||
if err = c.ctrIDIndex.Add(scontainer.ID()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = c.podIDIndex.Add(id); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func configNetNsPath(spec rspec.Spec) (string, error) {
|
||||
for _, ns := range spec.Linux.Namespaces {
|
||||
if ns.Type != rspec.NetworkNamespace {
|
||||
continue
|
||||
}
|
||||
|
||||
if ns.Path == "" {
|
||||
return "", fmt.Errorf("empty networking namespace")
|
||||
}
|
||||
|
||||
return ns.Path, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("missing networking namespace")
|
||||
}
|
||||
|
||||
// LoadContainer loads a container from the disk into the container store
|
||||
func (c *ContainerServer) LoadContainer(id string) error {
|
||||
config, err := c.store.FromContainerDirectory(id, "config.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var m rspec.Spec
|
||||
if err = json.Unmarshal(config, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Labels]), &labels); err != nil {
|
||||
return err
|
||||
}
|
||||
name := m.Annotations[annotations.Name]
|
||||
name, err = c.ReserveContainerName(id, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.ReleaseContainerName(name)
|
||||
}
|
||||
}()
|
||||
|
||||
var metadata pb.ContainerMetadata
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
sb := c.GetSandbox(m.Annotations[annotations.SandboxID])
|
||||
if sb == nil {
|
||||
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
|
||||
}
|
||||
|
||||
tty := isTrue(m.Annotations[annotations.TTY])
|
||||
stdin := isTrue(m.Annotations[annotations.Stdin])
|
||||
stdinOnce := isTrue(m.Annotations[annotations.StdinOnce])
|
||||
|
||||
containerPath, err := c.store.ContainerRunDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containerDir, err := c.store.ContainerDirectory(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
img, ok := m.Annotations[annotations.Image]
|
||||
if !ok {
|
||||
img = ""
|
||||
}
|
||||
|
||||
kubeAnnotations := make(map[string]string)
|
||||
if err = json.Unmarshal([]byte(m.Annotations[annotations.Annotations]), &kubeAnnotations); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr, err := oci.NewContainer(id, name, containerPath, m.Annotations[annotations.LogPath], sb.NetNs(), labels, kubeAnnotations, img, &metadata, sb.ID(), tty, stdin, stdinOnce, sb.Privileged(), sb.Trusted(), containerDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ContainerStateFromDisk(ctr)
|
||||
|
||||
c.AddContainer(ctr)
|
||||
return c.ctrIDIndex.Add(id)
|
||||
}
|
||||
|
||||
func isTrue(annotaton string) bool {
|
||||
return annotaton == "true"
|
||||
}
|
||||
|
||||
// ContainerStateFromDisk retrieves information on the state of a running container
|
||||
// from the disk
|
||||
func (c *ContainerServer) ContainerStateFromDisk(ctr *oci.Container) error {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue