Integrate containers/storage

Use containers/storage to store images, pod sandboxes, and containers.
A pod sandbox's infrastructure container has the same ID as the pod to
which it belongs, and all containers also keep track of their pod's ID.

The container configuration that we build using the data in a
CreateContainerRequest is stored in the container's ContainerDirectory
and ContainerRunDirectory.

We catch SIGTERM and SIGINT, and when we receive either, we gracefully
exit the grpc loop.  If we also think that there aren't any container
filesystems in use, we attempt to do a clean shutdown of the storage
driver.

The test harness now waits for ocid to exit before attempting to delete
the storage root directory.

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
This commit is contained in:
Nalin Dahyabhai 2016-10-18 10:48:33 -04:00
parent caee4a99c9
commit c0333b102b
29 changed files with 637 additions and 372 deletions

View file

@ -5,14 +5,16 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/containers/image/types"
sstorage "github.com/containers/storage/storage"
"github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/pkg/storage"
"github.com/kubernetes-incubator/cri-o/server/apparmor"
"github.com/kubernetes-incubator/cri-o/server/seccomp"
"github.com/opencontainers/runc/libcontainer/label"
@ -29,6 +31,9 @@ const (
type Server struct {
config Config
runtime *oci.Runtime
store sstorage.Store
images storage.ImageServer
storage storage.RuntimeServer
stateLock sync.Mutex
state *serverState
netPlugin ocicni.CNIPlugin
@ -36,6 +41,7 @@ type Server struct {
podIDIndex *truncindex.TruncIndex
ctrNameIndex *registrar.Registrar
ctrIDIndex *truncindex.TruncIndex
imageContext *types.SystemContext
seccompEnabled bool
seccompProfile seccomp.Seccomp
@ -45,7 +51,7 @@ type Server struct {
}
func (s *Server) loadContainer(id string) error {
config, err := ioutil.ReadFile(filepath.Join(s.runtime.ContainerDir(), id, "config.json"))
config, err := s.store.GetFromContainerDirectory(id, "config.json")
if err != nil {
return err
}
@ -76,7 +82,10 @@ func (s *Server) loadContainer(id string) error {
if v := m.Annotations["ocid/tty"]; v == "true" {
tty = true
}
containerPath := filepath.Join(s.runtime.ContainerDir(), id)
containerPath, err := s.store.GetContainerRunDirectory(id)
if err != nil {
return err
}
var img *pb.ImageSpec
image, ok := m.Annotations["ocid/image"]
@ -122,7 +131,7 @@ func configNetNsPath(spec rspec.Spec) (string, error) {
}
func (s *Server) loadSandbox(id string) error {
config, err := ioutil.ReadFile(filepath.Join(s.config.SandboxDir, id, "config.json"))
config, err := s.store.GetFromContainerDirectory(id, "config.json")
if err != nil {
return err
}
@ -184,7 +193,10 @@ func (s *Server) loadSandbox(id string) error {
s.addSandbox(sb)
sandboxPath := filepath.Join(s.config.SandboxDir, id)
sandboxPath, err := s.store.GetContainerRunDirectory(id)
if err != nil {
return err
}
if err = label.ReserveLabel(processLabel); err != nil {
return err
@ -200,7 +212,7 @@ func (s *Server) loadSandbox(id string) error {
}
sb.infraContainer = scontainer
if err = s.runtime.UpdateStatus(scontainer); err != nil {
logrus.Warnf("error updating status for container %s: %v", scontainer.ID(), err)
logrus.Warnf("error updating status for pod sandbox infra container %s: %v", scontainer.ID(), err)
}
if err = s.ctrIDIndex.Add(scontainer.ID()); err != nil {
return err
@ -212,31 +224,138 @@ func (s *Server) loadSandbox(id string) error {
}
func (s *Server) restore() {
sandboxDir, err := ioutil.ReadDir(s.config.SandboxDir)
containers, err := s.store.Containers()
if err != nil && !os.IsNotExist(err) {
logrus.Warnf("could not read sandbox directory %s: %v", sandboxDir, err)
logrus.Warnf("could not read containers and sandboxes: %v", err)
}
for _, v := range sandboxDir {
if !v.IsDir() {
pods := map[string]*storage.RuntimeContainerMetadata{}
podContainers := map[string]*storage.RuntimeContainerMetadata{}
for _, container := range containers {
metadata, err2 := s.storage.GetContainerMetadata(container.ID)
if err2 != nil {
logrus.Warnf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
continue
}
if err = s.loadSandbox(v.Name()); err != nil {
logrus.Warnf("could not restore sandbox %s: %v", v.Name(), err)
if metadata.Pod {
pods[container.ID] = &metadata
} else {
podContainers[container.ID] = &metadata
}
}
containerDir, err := ioutil.ReadDir(s.runtime.ContainerDir())
if err != nil && !os.IsNotExist(err) {
logrus.Warnf("could not read container directory %s: %v", s.runtime.ContainerDir(), err)
}
for _, v := range containerDir {
if !v.IsDir() {
continue
for containerID, metadata := range pods {
if err = s.loadSandbox(containerID); err != nil {
logrus.Warnf("could not restore sandbox %s container %s: %v", metadata.PodID, containerID, err)
}
if err := s.loadContainer(v.Name()); err != nil {
logrus.Warnf("could not restore container %s: %v", v.Name(), err)
}
for containerID := range podContainers {
if err := s.loadContainer(containerID); err != nil {
logrus.Warnf("could not restore container %s: %v", containerID, err)
}
}
}
// Update makes changes to the server's state (lists of pods and containers) to
// reflect the list of pods and containers that are stored on disk, possibly
// having been modified by other parties
func (s *Server) Update() {
logrus.Debugf("updating sandbox and container information")
if err := s.update(); err != nil {
logrus.Errorf("error updating sandbox and container information: %v", err)
}
}
func (s *Server) update() error {
containers, err := s.store.Containers()
if err != nil && !os.IsNotExist(err) {
logrus.Warnf("could not read containers and sandboxes: %v", err)
return err
}
newPods := map[string]*storage.RuntimeContainerMetadata{}
oldPods := map[string]string{}
removedPods := map[string]string{}
newPodContainers := map[string]*storage.RuntimeContainerMetadata{}
oldPodContainers := map[string]string{}
removedPodContainers := map[string]string{}
for _, container := range containers {
if s.hasSandbox(container.ID) {
// FIXME: do we need to reload/update any info about the sandbox?
oldPods[container.ID] = container.ID
oldPodContainers[container.ID] = container.ID
continue
}
if s.getContainer(container.ID) != nil {
// FIXME: do we need to reload/update any info about the container?
oldPodContainers[container.ID] = container.ID
continue
}
// not previously known, so figure out what it is
metadata, err2 := s.storage.GetContainerMetadata(container.ID)
if err2 != nil {
logrus.Errorf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
continue
}
if metadata.Pod {
newPods[container.ID] = &metadata
} else {
newPodContainers[container.ID] = &metadata
}
}
s.ctrIDIndex.Iterate(func(id string) {
if _, ok := oldPodContainers[id]; !ok {
// this container's ID wasn't in the updated list -> removed
removedPodContainers[id] = id
}
})
for removedPodContainer := range removedPodContainers {
// forget this container
c := s.getContainer(removedPodContainer)
s.releaseContainerName(c.Name())
s.removeContainer(c)
if err = s.ctrIDIndex.Delete(c.ID()); err != nil {
return err
}
logrus.Debugf("forgetting removed pod container %s", c.ID())
}
s.podIDIndex.Iterate(func(id string) {
if _, ok := oldPods[id]; !ok {
// this pod's ID wasn't in the updated list -> removed
removedPods[id] = id
}
})
for removedPod := range removedPods {
// forget this pod
sb := s.getSandbox(removedPod)
podInfraContainer := sb.infraContainer
s.releaseContainerName(podInfraContainer.Name())
s.removeContainer(podInfraContainer)
if err = s.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil {
return err
}
sb.infraContainer = nil
s.releasePodName(sb.name)
s.removeSandbox(sb.id)
if err = s.podIDIndex.Delete(sb.id); err != nil {
return err
}
logrus.Debugf("forgetting removed pod %s", sb.id)
}
for sandboxID := range newPods {
// load this pod
if err = s.loadSandbox(sandboxID); err != nil {
logrus.Warnf("could not load new pod sandbox %s: %v, ignoring", sandboxID, err)
} else {
logrus.Debugf("loaded new pod sandbox %s", sandboxID, err)
}
}
for containerID := range newPodContainers {
// load this container
if err = s.loadContainer(containerID); err != nil {
logrus.Warnf("could not load new sandbox container %s: %v, ignoring", containerID, err)
} else {
logrus.Debugf("loaded new pod container %s", containerID, err)
}
}
return nil
}
func (s *Server) reservePodName(id, name string) (string, error) {
@ -294,17 +413,35 @@ func seccompEnabled() bool {
return enabled
}
// Shutdown attempts to shut down the server's storage cleanly
func (s *Server) Shutdown() error {
_, err := s.store.Shutdown(false)
return err
}
// New creates a new Server with options provided
func New(config *Config) (*Server, error) {
if err := os.MkdirAll(config.ImageDir, 0755); err != nil {
store, err := sstorage.GetStore(sstorage.StoreOptions{
RunRoot: config.RunRoot,
GraphRoot: config.Root,
GraphDriverName: config.Storage,
GraphDriverOptions: config.StorageOptions,
})
if err != nil {
return nil, err
}
if err := os.MkdirAll(config.SandboxDir, 0755); err != nil {
imageService, err := storage.GetImageService(store, config.DefaultTransport)
if err != nil {
return nil, err
}
r, err := oci.New(config.Runtime, config.ContainerDir, config.Conmon, config.ConmonEnv, config.CgroupManager)
storageRuntimeService := storage.GetRuntimeService(imageService)
if err != nil {
return nil, err
}
r, err := oci.New(config.Runtime, config.Conmon, config.ConmonEnv, config.CgroupManager)
if err != nil {
return nil, err
}
@ -316,6 +453,9 @@ func New(config *Config) (*Server, error) {
}
s := &Server{
runtime: r,
store: store,
images: imageService,
storage: storageRuntimeService,
netPlugin: netPlugin,
config: *config,
state: &serverState{
@ -346,6 +486,9 @@ func New(config *Config) (*Server, error) {
s.podNameIndex = registrar.NewRegistrar()
s.ctrIDIndex = truncindex.NewTruncIndex([]string{})
s.ctrNameIndex = registrar.NewRegistrar()
s.imageContext = &types.SystemContext{
SignaturePolicyPath: config.ImageConfig.SignaturePolicyPath,
}
s.restore()