Move server state behind an interface

This will allow the development of alternative methods of storing CRI-O's state

Signed-off-by: Matthew Heon <mheon@redhat.com>
This commit is contained in:
Matthew Heon 2017-03-23 14:02:58 -04:00
parent f3f8b67b76
commit d530d36b4b
17 changed files with 535 additions and 327 deletions

View file

@ -11,14 +11,10 @@ func (s *Server) getContainerFromRequest(cid string) (*oci.Container, error) {
return nil, fmt.Errorf("container ID should not be empty") return nil, fmt.Errorf("container ID should not be empty")
} }
containerID, err := s.ctrIDIndex.Get(cid) c, err := s.state.LookupContainerByID(cid)
if err != nil { if err != nil {
return nil, fmt.Errorf("container with ID starting with %s not found: %v", cid, err) return nil, fmt.Errorf("container with ID starting with %s could not be retrieved: %v", cid, err)
} }
c := s.state.containers.Get(containerID)
if c == nil {
return nil, fmt.Errorf("specified container not found: %s", containerID)
}
return c, nil return c, nil
} }

View file

@ -38,10 +38,9 @@ func (s *Server) Attach(ctx context.Context, req *pb.AttachRequest) (*pb.AttachR
// Attach endpoint for streaming.Runtime // Attach endpoint for streaming.Runtime
func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error { func (ss streamService) Attach(containerID string, inputStream io.Reader, outputStream, errorStream io.WriteCloser, tty bool, resize <-chan term.Size) error {
c := ss.runtimeServer.GetContainer(containerID) c, err := ss.runtimeServer.GetContainer(containerID)
if err != nil {
if c == nil { return err
return fmt.Errorf("could not find container %q", containerID)
} }
if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil {

View file

@ -236,14 +236,9 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq
return nil, fmt.Errorf("PodSandboxId should not be empty") return nil, fmt.Errorf("PodSandboxId should not be empty")
} }
sandboxID, err := s.podIDIndex.Get(sbID) sb, err := s.state.LookupSandboxByID(sbID)
if err != nil { if err != nil {
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", sbID, err) return nil, fmt.Errorf("error retrieving PodSandbox with ID starting with %s: %v", sbID, err)
}
sb := s.getSandbox(sandboxID)
if sb == nil {
return nil, fmt.Errorf("specified sandbox not found: %s", sandboxID)
} }
// The config of the container // The config of the container
@ -262,12 +257,6 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq
return nil, err return nil, err
} }
defer func() {
if err != nil {
s.releaseContainerName(containerName)
}
}()
container, err := s.createSandboxContainer(ctx, containerID, containerName, sb, req.GetSandboxConfig(), containerConfig) container, err := s.createSandboxContainer(ctx, containerID, containerName, sb, req.GetSandboxConfig(), containerConfig)
if err != nil { if err != nil {
return nil, err return nil, err
@ -289,10 +278,7 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq
return nil, err return nil, err
} }
s.addContainer(container) if err := s.addContainer(container); err != nil {
if err = s.ctrIDIndex.Add(containerID); err != nil {
s.removeContainer(container)
return nil, err return nil, err
} }

View file

@ -30,10 +30,14 @@ func (s *Server) Exec(ctx context.Context, req *pb.ExecRequest) (*pb.ExecRespons
// Exec endpoint for streaming.Runtime // Exec endpoint for streaming.Runtime
func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { func (ss streamService) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
c := ss.runtimeServer.GetContainer(containerID) sbID, err := ss.runtimeServer.state.GetContainerSandbox(containerID)
if err != nil {
return err
}
if c == nil { c, err := ss.runtimeServer.state.GetContainer(containerID, sbID)
return fmt.Errorf("could not find container %q", containerID) if err != nil {
return err
} }
if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil {

View file

@ -31,17 +31,15 @@ func (s *Server) ListContainers(ctx context.Context, req *pb.ListContainersReque
logrus.Debugf("ListContainersRequest %+v", req) logrus.Debugf("ListContainersRequest %+v", req)
var ctrs []*pb.Container var ctrs []*pb.Container
filter := req.Filter filter := req.Filter
ctrList := s.state.containers.List() ctrList, _ := s.state.GetAllContainers()
// Filter using container id and pod id first. // Filter using container id and pod id first.
if filter != nil { if filter != nil {
if filter.Id != "" { if filter.Id != "" {
id, err := s.ctrIDIndex.Get(filter.Id) c, err := s.state.LookupContainerByID(filter.Id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c := s.state.containers.Get(id)
if c != nil {
if filter.PodSandboxId != "" { if filter.PodSandboxId != "" {
if c.Sandbox() == filter.PodSandboxId { if c.Sandbox() == filter.PodSandboxId {
ctrList = []*oci.Container{c} ctrList = []*oci.Container{c}
@ -52,11 +50,11 @@ func (s *Server) ListContainers(ctx context.Context, req *pb.ListContainersReque
} else { } else {
ctrList = []*oci.Container{c} ctrList = []*oci.Container{c}
} }
}
} else { } else {
if filter.PodSandboxId != "" { if filter.PodSandboxId != "" {
pod := s.state.sandboxes[filter.PodSandboxId] pod, err := s.state.GetSandbox(filter.PodSandboxId)
if pod == nil { // TODO check if this is a pod not found error, if not we should error out here
if err != nil {
ctrList = []*oci.Container{} ctrList = []*oci.Container{}
} else { } else {
ctrList = pod.containers.List() ctrList = pod.containers.List()

View file

@ -28,10 +28,9 @@ func (s *Server) PortForward(ctx context.Context, req *pb.PortForwardRequest) (*
} }
func (ss streamService) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { func (ss streamService) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
c := ss.runtimeServer.GetSandboxContainer(podSandboxID) c, err := ss.runtimeServer.GetSandboxContainer(podSandboxID)
if err != nil {
if c == nil { return err
return fmt.Errorf("could not find container for sandbox %q", podSandboxID)
} }
if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil { if err := ss.runtimeServer.runtime.UpdateStatus(c); err != nil {

View file

@ -36,18 +36,14 @@ func (s *Server) RemoveContainer(ctx context.Context, req *pb.RemoveContainerReq
return nil, fmt.Errorf("failed to delete container %s: %v", c.ID(), err) return nil, fmt.Errorf("failed to delete container %s: %v", c.ID(), err)
} }
s.removeContainer(c) if err := s.removeContainer(c); err != nil {
return nil, fmt.Errorf("failed to remove container %s: %v", c.ID(), err)
}
if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil { if err := s.storageRuntimeServer.DeleteContainer(c.ID()); err != nil {
return nil, fmt.Errorf("failed to delete storage for container %s: %v", c.ID(), err) return nil, fmt.Errorf("failed to delete storage for container %s: %v", c.ID(), err)
} }
s.releaseContainerName(c.Name())
if err := s.ctrIDIndex.Delete(c.ID()); err != nil {
return nil, err
}
resp := &pb.RemoveContainerResponse{} resp := &pb.RemoveContainerResponse{}
logrus.Debugf("RemoveContainerResponse: %+v", resp) logrus.Debugf("RemoveContainerResponse: %+v", resp)
return resp, nil return resp, nil

367
server/in_memory_state.go Normal file
View file

@ -0,0 +1,367 @@
package server
import (
"fmt"
"sync"
"github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/oci"
)
// TODO: make operations atomic to greatest extent possible
// InMemoryState is an in-memory state store, suitable for use when no other
// programs are expected to interact with the server
type InMemoryState struct {
lock sync.Mutex
sandboxes map[string]*sandbox
containers oci.ContainerStorer
podNameIndex *registrar.Registrar
podIDIndex *truncindex.TruncIndex
ctrNameIndex *registrar.Registrar
ctrIDIndex *truncindex.TruncIndex
}
// NewInMemoryState creates a new, empty server state
func NewInMemoryState() StateStore {
state := new(InMemoryState)
state.sandboxes = make(map[string]*sandbox)
state.containers = oci.NewMemoryStore()
state.podNameIndex = registrar.NewRegistrar()
state.podIDIndex = truncindex.NewTruncIndex([]string{})
state.ctrNameIndex = registrar.NewRegistrar()
state.ctrIDIndex = truncindex.NewTruncIndex([]string{})
return state
}
// AddSandbox adds a sandbox and any containers in it to the state
func (s *InMemoryState) AddSandbox(sandbox *sandbox) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, exist := s.sandboxes[sandbox.id]; exist {
return fmt.Errorf("sandbox with ID %v already exists", sandbox.id)
}
// We shouldn't share ID with any containers, either
if ctrCheck := s.containers.Get(sandbox.id); ctrCheck != nil {
return fmt.Errorf("requested sandbox ID %v conflicts with existing container ID", sandbox.id)
}
s.sandboxes[sandbox.id] = sandbox
if err := s.podNameIndex.Reserve(sandbox.name, sandbox.id); err != nil {
return fmt.Errorf("error registering sandbox name: %v", err)
}
if err := s.podIDIndex.Add(sandbox.id); err != nil {
return fmt.Errorf("error registering sandbox ID: %v", err)
}
// If there are containers in the sandbox add them to the mapping
containers := sandbox.containers.List()
for _, ctr := range containers {
if err := s.addContainerMappings(ctr, true); err != nil {
return fmt.Errorf("error adding container %v mappings in sandbox %v", ctr.ID(), sandbox.id)
}
}
// Add the pod infrastructure container to mappings
// TODO: Right now, we don't add it to the all containers listing. We may want to change this.
if err := s.addContainerMappings(sandbox.infraContainer, false); err != nil {
return fmt.Errorf("error adding infrastructure container %v to mappings: %v", sandbox.infraContainer.ID(), err)
}
return nil
}
// HasSandbox determines if a given sandbox exists in the state
func (s *InMemoryState) HasSandbox(id string) bool {
s.lock.Lock()
defer s.lock.Unlock()
_, exist := s.sandboxes[id]
return exist
}
// DeleteSandbox removes a sandbox from the state
func (s *InMemoryState) DeleteSandbox(id string) error {
s.lock.Lock()
defer s.lock.Unlock()
if _, exist := s.sandboxes[id]; !exist {
return fmt.Errorf("no sandbox with ID %v exists, cannot delete", id)
}
name := s.sandboxes[id].name
containers := s.sandboxes[id].containers.List()
infraContainer := s.sandboxes[id].infraContainer
delete(s.sandboxes, id)
s.podNameIndex.Release(name)
if err := s.podIDIndex.Delete(id); err != nil {
return fmt.Errorf("error unregistering sandbox ID: %v", err)
}
// If there are containers left in the sandbox delete them from the mappings
for _, ctr := range containers {
if err := s.deleteContainerMappings(ctr, true); err != nil {
return fmt.Errorf("error removing container %v mappings: %v", ctr.ID(), err)
}
}
// Delete infra container from mappings
if err := s.deleteContainerMappings(infraContainer, false); err != nil {
return fmt.Errorf("error removing infra container %v from mappings: %v", infraContainer.ID(), err)
}
return nil
}
// GetSandbox returns a sandbox given its full ID
func (s *InMemoryState) GetSandbox(id string) (*sandbox, error) {
s.lock.Lock()
defer s.lock.Unlock()
sandbox, ok := s.sandboxes[id]
if !ok {
return nil, fmt.Errorf("no sandbox with id %v exists", id)
}
return sandbox, nil
}
// LookupSandboxByName returns a sandbox given its full or partial name
func (s *InMemoryState) LookupSandboxByName(name string) (*sandbox, error) {
s.lock.Lock()
defer s.lock.Unlock()
id, err := s.podNameIndex.Get(name)
if err != nil {
return nil, fmt.Errorf("could not resolve sandbox name %v: %v", name, err)
}
sandbox, ok := s.sandboxes[id]
if !ok {
// This should never happen
return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", id)
}
return sandbox, nil
}
// LookupSandboxByID returns a sandbox given its full or partial ID
// An error will be returned if the partial ID given is not unique
func (s *InMemoryState) LookupSandboxByID(id string) (*sandbox, error) {
s.lock.Lock()
defer s.lock.Unlock()
fullID, err := s.podIDIndex.Get(id)
if err != nil {
return nil, fmt.Errorf("could not resolve sandbox id %v: %v", id, err)
}
sandbox, ok := s.sandboxes[fullID]
if !ok {
// This should never happen
return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", fullID)
}
return sandbox, nil
}
// GetAllSandboxes returns all sandboxes in the state
func (s *InMemoryState) GetAllSandboxes() ([]*sandbox, error) {
s.lock.Lock()
defer s.lock.Unlock()
sandboxes := make([]*sandbox, 0, len(s.sandboxes))
for _, sb := range s.sandboxes {
sandboxes = append(sandboxes, sb)
}
return sandboxes, nil
}
// AddContainer adds a single container to a given sandbox in the state
func (s *InMemoryState) AddContainer(c *oci.Container, sandboxID string) error {
s.lock.Lock()
defer s.lock.Unlock()
if c.Sandbox() != sandboxID {
return fmt.Errorf("cannot add container to sandbox %v as it is part of sandbox %v", sandboxID, c.Sandbox())
}
sandbox, ok := s.sandboxes[sandboxID]
if !ok {
return fmt.Errorf("sandbox with ID %v does not exist, cannot add container", sandboxID)
}
if ctr := sandbox.containers.Get(c.ID()); ctr != nil {
return fmt.Errorf("container with ID %v already exists in sandbox %v", c.ID(), sandboxID)
}
sandbox.containers.Add(c.ID(), c)
return s.addContainerMappings(c, true)
}
// Add container ID, Name and Sandbox mappings
func (s *InMemoryState) addContainerMappings(c *oci.Container, addToContainers bool) error {
if addToContainers && s.containers.Get(c.ID()) != nil {
return fmt.Errorf("container with ID %v already exists in containers store", c.ID())
}
// TODO: if not a pod infra container, check if it conflicts with existing sandbox ID?
// Does this matter?
if addToContainers {
s.containers.Add(c.ID(), c)
}
if err := s.ctrNameIndex.Reserve(c.Name(), c.ID()); err != nil {
s.containers.Delete(c.ID())
return fmt.Errorf("error registering container name: %v", err)
}
if err := s.ctrIDIndex.Add(c.ID()); err != nil {
s.containers.Delete(c.ID())
s.ctrNameIndex.Release(c.ID())
return fmt.Errorf("error registering container ID: %v", err)
}
return nil
}
// HasContainer checks if a container with the given ID exists in a given sandbox
func (s *InMemoryState) HasContainer(id, sandboxID string) bool {
s.lock.Lock()
defer s.lock.Unlock()
sandbox, ok := s.sandboxes[sandboxID]
if !ok {
return false
}
ctr := sandbox.containers.Get(id)
return ctr != nil
}
// DeleteContainer removes the container with given ID from the given sandbox
func (s *InMemoryState) DeleteContainer(id, sandboxID string) error {
s.lock.Lock()
defer s.lock.Unlock()
sandbox, ok := s.sandboxes[sandboxID]
if !ok {
return fmt.Errorf("sandbox with ID %v does not exist", sandboxID)
}
ctr := sandbox.containers.Get(id)
if ctr == nil {
return fmt.Errorf("sandbox %v has no container with ID %v", sandboxID, id)
}
sandbox.containers.Delete(id)
return s.deleteContainerMappings(ctr, true)
}
// Deletes container from the ID and Name mappings and optionally from the global containers list
func (s *InMemoryState) deleteContainerMappings(ctr *oci.Container, deleteFromContainers bool) error {
if deleteFromContainers && s.containers.Get(ctr.ID()) == nil {
return fmt.Errorf("container ID %v does not exist in containers store", ctr.ID())
}
if deleteFromContainers {
s.containers.Delete(ctr.ID())
}
s.ctrNameIndex.Release(ctr.Name())
if err := s.ctrIDIndex.Delete(ctr.ID()); err != nil {
return fmt.Errorf("error unregistering container ID: %v", err)
}
return nil
}
// GetContainer returns the container with given ID in the given sandbox
func (s *InMemoryState) GetContainer(id, sandboxID string) (*oci.Container, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.getContainerFromSandbox(id, sandboxID)
}
// GetContainerSandbox returns the ID of a container's sandbox from the full container ID
// May not find the ID of pod infrastructure containers
func (s *InMemoryState) GetContainerSandbox(id string) (string, error) {
s.lock.Lock()
defer s.lock.Unlock()
ctr := s.containers.Get(id)
if ctr == nil {
return "", fmt.Errorf("no container with ID %v found", id)
}
return ctr.Sandbox(), nil
}
// LookupContainerByName returns the full ID of a container given its full or partial name
func (s *InMemoryState) LookupContainerByName(name string) (*oci.Container, error) {
s.lock.Lock()
defer s.lock.Unlock()
fullID, err := s.ctrNameIndex.Get(name)
if err != nil {
return nil, fmt.Errorf("cannot resolve container name %v: %v", name, err)
}
return s.getContainer(fullID)
}
// LookupContainerByID returns the full ID of a container given a full or partial ID
// If the given ID is not unique, an error is returned
func (s *InMemoryState) LookupContainerByID(id string) (*oci.Container, error) {
s.lock.Lock()
defer s.lock.Unlock()
fullID, err := s.ctrIDIndex.Get(id)
if err != nil {
return nil, fmt.Errorf("cannot resolve container ID %v: %v", id, err)
}
return s.getContainer(fullID)
}
// GetAllContainers returns all containers in the state, regardless of which sandbox they belong to
// Pod Infra containers are not included
func (s *InMemoryState) GetAllContainers() ([]*oci.Container, error) {
return s.containers.List(), nil
}
// Returns a single container from any sandbox based on full ID
// TODO: is it worth making this public as an alternative to GetContainer
func (s *InMemoryState) getContainer(id string) (*oci.Container, error) {
ctr := s.containers.Get(id)
if ctr == nil {
return nil, fmt.Errorf("cannot find container with ID %v", id)
}
return s.getContainerFromSandbox(id, ctr.Sandbox())
}
// Returns a single container from a sandbox based on its full ID
// Internal implementation of GetContainer() but does not lock so it can be used in other functions
func (s *InMemoryState) getContainerFromSandbox(id, sandboxID string) (*oci.Container, error) {
sandbox, ok := s.sandboxes[sandboxID]
if !ok {
return nil, fmt.Errorf("sandbox with ID %v does not exist", sandboxID)
}
ctr := sandbox.containers.Get(id)
if ctr == nil {
return nil, fmt.Errorf("cannot find container %v in sandbox %v", id, sandboxID)
}
return ctr, nil
}

View file

@ -50,14 +50,11 @@ func (s *Server) generatePodIDandName(sandboxConfig *pb.PodSandboxConfig) (strin
var ( var (
err error err error
id = stringid.GenerateNonCryptoID() id = stringid.GenerateNonCryptoID()
name = makeSandboxName(sandboxConfig)
) )
if sandboxConfig.Metadata.Namespace == "" { if sandboxConfig.Metadata.Namespace == "" {
return "", "", fmt.Errorf("cannot generate pod ID without namespace") return "", "", fmt.Errorf("cannot generate pod ID without namespace")
} }
name, err := s.reservePodName(id, makeSandboxName(sandboxConfig))
if err != nil {
return "", "", err
}
return id, name, err return id, name, err
} }
@ -65,11 +62,8 @@ func (s *Server) generateContainerIDandNameForSandbox(sandboxConfig *pb.PodSandb
var ( var (
err error err error
id = stringid.GenerateNonCryptoID() id = stringid.GenerateNonCryptoID()
name = makeSandboxContainerName(sandboxConfig)
) )
name, err := s.reserveContainerName(id, makeSandboxContainerName(sandboxConfig))
if err != nil {
return "", "", err
}
return id, name, err return id, name, err
} }
@ -77,10 +71,7 @@ func (s *Server) generateContainerIDandName(sandboxMetadata *pb.PodSandboxMetada
var ( var (
err error err error
id = stringid.GenerateNonCryptoID() id = stringid.GenerateNonCryptoID()
name = makeContainerName(sandboxMetadata, containerConfig)
) )
name, err := s.reserveContainerName(id, makeContainerName(sandboxMetadata, containerConfig))
if err != nil {
return "", "", err
}
return id, name, err return id, name, err
} }

View file

@ -270,14 +270,10 @@ func (s *Server) getPodSandboxFromRequest(podSandboxID string) (*sandbox, error)
return nil, errSandboxIDEmpty return nil, errSandboxIDEmpty
} }
sandboxID, err := s.podIDIndex.Get(podSandboxID) sb, err := s.state.LookupSandboxByID(podSandboxID)
if err != nil { if err != nil {
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", podSandboxID, err) return nil, fmt.Errorf("could not retrieve pod sandbox with ID starting with %v: %v", podSandboxID, err)
} }
sb := s.getSandbox(sandboxID)
if sb == nil {
return nil, fmt.Errorf("specified pod sandbox not found: %s", sandboxID)
}
return sb, nil return sb, nil
} }

View file

@ -1,6 +1,8 @@
package server package server
import ( import (
"fmt"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -31,7 +33,13 @@ func (s *Server) ListPodSandbox(ctx context.Context, req *pb.ListPodSandboxReque
logrus.Debugf("ListPodSandboxRequest %+v", req) logrus.Debugf("ListPodSandboxRequest %+v", req)
var pods []*pb.PodSandbox var pods []*pb.PodSandbox
var podList []*sandbox var podList []*sandbox
for _, sb := range s.state.sandboxes {
sandboxes, err := s.state.GetAllSandboxes()
if err != nil {
return nil, fmt.Errorf("error retrieving sandboxes: %v", err)
}
for _, sb := range sandboxes {
podList = append(podList, sb) podList = append(podList, sb)
} }
@ -39,12 +47,9 @@ func (s *Server) ListPodSandbox(ctx context.Context, req *pb.ListPodSandboxReque
// Filter by pod id first. // Filter by pod id first.
if filter != nil { if filter != nil {
if filter.Id != "" { if filter.Id != "" {
id, err := s.podIDIndex.Get(filter.Id) sb, err := s.state.LookupSandboxByID(filter.Id)
// TODO if we return something other than a No Such Sandbox should we throw an error instead?
if err != nil { if err != nil {
return nil, err
}
sb := s.getSandbox(id)
if sb == nil {
podList = []*sandbox{} podList = []*sandbox{}
} else { } else {
podList = []*sandbox{sb} podList = []*sandbox{sb}

View file

@ -64,14 +64,14 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR
return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s: %v", c.Name(), sb.id, err) return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s: %v", c.Name(), sb.id, err)
} }
s.releaseContainerName(c.Name()) if err := s.removeContainer(c); err != nil {
s.removeContainer(c) return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s: %v", c.Name(), sb.id, err)
if err := s.ctrIDIndex.Delete(c.ID()); err != nil {
return nil, fmt.Errorf("failed to delete container %s in pod sandbox %s from index: %v", c.Name(), sb.id, err)
} }
} }
s.removeContainer(podInfraContainer) if err := s.removeSandbox(sb.id); err != nil {
return nil, fmt.Errorf("error removing sandbox %s: %v", sb.id, err)
}
// Remove the files related to the sandbox // Remove the files related to the sandbox
if err := s.storageRuntimeServer.StopContainer(sb.id); err != nil && err != storage.ErrContainerUnknown { if err := s.storageRuntimeServer.StopContainer(sb.id); err != nil && err != storage.ErrContainerUnknown {
@ -81,17 +81,6 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR
return nil, fmt.Errorf("failed to remove pod sandbox %s: %v", sb.id, err) return nil, fmt.Errorf("failed to remove pod sandbox %s: %v", sb.id, err)
} }
s.releaseContainerName(podInfraContainer.Name())
if err := s.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil {
return nil, fmt.Errorf("failed to delete infra container %s in pod sandbox %s from index: %v", podInfraContainer.ID(), sb.id, err)
}
s.releasePodName(sb.name)
s.removeSandbox(sb.id)
if err := s.podIDIndex.Delete(sb.id); err != nil {
return nil, fmt.Errorf("failed to delete pod sandbox %s from index: %v", sb.id, err)
}
resp := &pb.RemovePodSandboxResponse{} resp := &pb.RemovePodSandboxResponse{}
logrus.Debugf("RemovePodSandboxResponse %+v", resp) logrus.Debugf("RemovePodSandboxResponse %+v", resp)
return resp, nil return resp, nil

View file

@ -135,23 +135,11 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
} }
} }
defer func() {
if err != nil {
s.releasePodName(name)
}
}()
_, containerName, err := s.generateContainerIDandNameForSandbox(req.GetConfig()) _, containerName, err := s.generateContainerIDandNameForSandbox(req.GetConfig())
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() {
if err != nil {
s.releaseContainerName(containerName)
}
}()
podContainer, err := s.storageRuntimeServer.CreatePodSandbox(s.imageContext, podContainer, err := s.storageRuntimeServer.CreatePodSandbox(s.imageContext,
name, id, name, id,
s.config.PauseImage, "", s.config.PauseImage, "",
@ -283,18 +271,6 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
return nil, err return nil, err
} }
if err = s.ctrIDIndex.Add(id); err != nil {
return nil, err
}
defer func() {
if err != nil {
if err2 := s.ctrIDIndex.Delete(id); err2 != nil {
logrus.Warnf("couldn't delete ctr id %s from idIndex", id)
}
}
}()
// set log path inside log directory // set log path inside log directory
logPath := filepath.Join(logDir, id+".log") logPath := filepath.Join(logDir, id+".log")
@ -350,25 +326,6 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
portMappings: portMappings, portMappings: portMappings,
} }
s.addSandbox(sb)
defer func() {
if err != nil {
s.removeSandbox(id)
}
}()
if err = s.podIDIndex.Add(id); err != nil {
return nil, err
}
defer func() {
if err != nil {
if err := s.podIDIndex.Delete(id); err != nil {
logrus.Warnf("couldn't delete pod id %s from idIndex", id)
}
}
}()
for k, v := range kubeAnnotations { for k, v := range kubeAnnotations {
g.AddAnnotation(k, v) g.AddAnnotation(k, v)
} }
@ -482,6 +439,11 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
sb.infraContainer = container sb.infraContainer = container
// Only register the sandbox after infra container has been added
if err = s.addSandbox(sb); err != nil {
return nil, err
}
// setup the network // setup the network
if !hostNetwork { if !hostNetwork {
if err = s.netPlugin.SetUpPod(netNsPath, namespace, kubeName, id); err != nil { if err = s.netPlugin.SetUpPod(netNsPath, namespace, kubeName, id); err != nil {

View file

@ -119,7 +119,12 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque
// StopAllPodSandboxes removes all pod sandboxes // StopAllPodSandboxes removes all pod sandboxes
func (s *Server) StopAllPodSandboxes() { func (s *Server) StopAllPodSandboxes() {
logrus.Debugf("StopAllPodSandboxes") logrus.Debugf("StopAllPodSandboxes")
for _, sb := range s.state.sandboxes { sandboxes, err := s.state.GetAllSandboxes()
if err != nil {
logrus.Errorf("error retrieving sandboxes: %v", err)
return
}
for _, sb := range sandboxes {
pod := &pb.StopPodSandboxRequest{ pod := &pb.StopPodSandboxRequest{
PodSandboxId: sb.id, PodSandboxId: sb.id,
} }

View file

@ -14,8 +14,6 @@ import (
"github.com/containers/image/types" "github.com/containers/image/types"
sstorage "github.com/containers/storage" sstorage "github.com/containers/storage"
"github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/pkg/annotations" "github.com/kubernetes-incubator/cri-o/pkg/annotations"
"github.com/kubernetes-incubator/cri-o/pkg/ocicni" "github.com/kubernetes-incubator/cri-o/pkg/ocicni"
@ -57,15 +55,10 @@ type Server struct {
store sstorage.Store store sstorage.Store
storageImageServer storage.ImageServer storageImageServer storage.ImageServer
storageRuntimeServer storage.RuntimeServer storageRuntimeServer storage.RuntimeServer
stateLock sync.Mutex
updateLock sync.RWMutex updateLock sync.RWMutex
state *serverState state StateStore
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
hostportManager hostport.HostPortManager hostportManager hostport.HostPortManager
podNameIndex *registrar.Registrar
podIDIndex *truncindex.TruncIndex
ctrNameIndex *registrar.Registrar
ctrIDIndex *truncindex.TruncIndex
imageContext *types.SystemContext imageContext *types.SystemContext
seccompEnabled bool seccompEnabled bool
@ -106,23 +99,13 @@ func (s *Server) loadContainer(id string) error {
return err return err
} }
name := m.Annotations[annotations.Name] name := m.Annotations[annotations.Name]
name, err = s.reserveContainerName(id, name)
if err != nil {
return err
}
defer func() {
if err != nil {
s.releaseContainerName(name)
}
}()
var metadata pb.ContainerMetadata var metadata pb.ContainerMetadata
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
return err return err
} }
sb := s.getSandbox(m.Annotations[annotations.SandboxID]) sb, err := s.getSandbox(m.Annotations[annotations.SandboxID])
if sb == nil { if err != nil {
return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID]) return fmt.Errorf("could not get sandbox with id %s, skipping", m.Annotations[annotations.SandboxID])
} }
@ -165,8 +148,7 @@ func (s *Server) loadContainer(id string) error {
s.containerStateFromDisk(ctr) s.containerStateFromDisk(ctr)
s.addContainer(ctr) return s.addContainer(ctr)
return s.ctrIDIndex.Add(id)
} }
func (s *Server) containerStateFromDisk(c *oci.Container) error { func (s *Server) containerStateFromDisk(c *oci.Container) error {
@ -224,15 +206,6 @@ func (s *Server) loadSandbox(id string) error {
return err return err
} }
name := m.Annotations[annotations.Name] name := m.Annotations[annotations.Name]
name, err = s.reservePodName(id, name)
if err != nil {
return err
}
defer func() {
if err != nil {
s.releasePodName(name)
}
}()
var metadata pb.PodSandboxMetadata var metadata pb.PodSandboxMetadata
if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil { if err = json.Unmarshal([]byte(m.Annotations[annotations.Metadata]), &metadata); err != nil {
return err return err
@ -283,14 +256,6 @@ func (s *Server) loadSandbox(id string) error {
sb.netns = netNS sb.netns = netNS
} }
s.addSandbox(sb)
defer func() {
if err != nil {
s.removeSandbox(sb.id)
}
}()
sandboxPath, err := s.store.ContainerRunDirectory(id) sandboxPath, err := s.store.ContainerRunDirectory(id)
if err != nil { if err != nil {
return err return err
@ -301,22 +266,12 @@ func (s *Server) loadSandbox(id string) error {
return err return err
} }
cname, err := s.reserveContainerName(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName])
if err != nil {
return err
}
defer func() {
if err != nil {
s.releaseContainerName(cname)
}
}()
created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created]) created, err := time.Parse(time.RFC3339Nano, m.Annotations[annotations.Created])
if err != nil { if err != nil {
return err return err
} }
scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], cname, sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"]) scontainer, err := oci.NewContainer(m.Annotations[annotations.ContainerID], m.Annotations[annotations.ContainerName], sandboxPath, m.Annotations[annotations.LogPath], sb.netNs(), labels, kubeAnnotations, nil, nil, id, false, false, false, privileged, trusted, sandboxDir, created, m.Annotations["org.opencontainers.image.stopSignal"])
if err != nil { if err != nil {
return err return err
} }
@ -327,13 +282,8 @@ func (s *Server) loadSandbox(id string) error {
return err return err
} }
sb.infraContainer = scontainer sb.infraContainer = scontainer
if err = s.ctrIDIndex.Add(scontainer.ID()); err != nil {
return err return s.addSandbox(sb)
}
if err = s.podIDIndex.Add(id); err != nil {
return err
}
return nil
} }
func (s *Server) restore() { func (s *Server) restore() {
@ -399,7 +349,7 @@ func (s *Server) update() error {
oldPodContainers[container.ID] = container.ID oldPodContainers[container.ID] = container.ID
continue continue
} }
if s.getContainer(container.ID) != nil { if _, err := s.getContainer(container.ID); err == nil {
// FIXME: do we need to reload/update any info about the container? // FIXME: do we need to reload/update any info about the container?
oldPodContainers[container.ID] = container.ID oldPodContainers[container.ID] = container.ID
continue continue
@ -416,51 +366,54 @@ func (s *Server) update() error {
newPodContainers[container.ID] = &metadata newPodContainers[container.ID] = &metadata
} }
} }
s.ctrIDIndex.Iterate(func(id string) {
if _, ok := oldPodContainers[id]; !ok { // TODO this will not check pod infra containers - should we care about this?
// this container's ID wasn't in the updated list -> removed stateContainers, err := s.state.GetAllContainers()
removedPodContainers[id] = id if err != nil {
return fmt.Errorf("error retrieving containers list: %v", err)
} }
}) for _, ctr := range stateContainers {
if _, ok := oldPodContainers[ctr.ID()]; !ok {
// this container's ID wasn't in the updated list -> removed
removedPodContainers[ctr.ID()] = ctr.ID()
}
}
for removedPodContainer := range removedPodContainers { for removedPodContainer := range removedPodContainers {
// forget this container // forget this container
c := s.getContainer(removedPodContainer) c, err := s.getContainer(removedPodContainer)
if c == nil { if err != nil {
logrus.Warnf("bad state when getting container removed %+v", removedPodContainer) logrus.Warnf("bad state when getting container removed %+v", removedPodContainer)
continue continue
} }
s.releaseContainerName(c.Name()) if err := s.removeContainer(c); err != nil {
s.removeContainer(c) return fmt.Errorf("error forgetting removed pod container %s: %v", c.ID(), err)
if err = s.ctrIDIndex.Delete(c.ID()); err != nil {
return err
} }
logrus.Debugf("forgetting removed pod container %s", c.ID()) logrus.Debugf("forgetting removed pod container %s", c.ID())
} }
s.podIDIndex.Iterate(func(id string) {
if _, ok := oldPods[id]; !ok { pods, err := s.state.GetAllSandboxes()
// this pod's ID wasn't in the updated list -> removed if err != nil {
removedPods[id] = id return fmt.Errorf("error retrieving pods list: %v", err)
} }
}) for _, pod := range pods {
if _, ok := oldPods[pod.id]; !ok {
// this pod's ID wasn't in the updated list -> removed
removedPods[pod.id] = pod.id
}
}
for removedPod := range removedPods { for removedPod := range removedPods {
// forget this pod // forget this pod
sb := s.getSandbox(removedPod) sb, err := s.getSandbox(removedPod)
if sb == nil { if err != nil {
logrus.Warnf("bad state when getting pod to remove %+v", removedPod) logrus.Warnf("bad state when getting pod to remove %+v", removedPod)
continue continue
} }
podInfraContainer := sb.infraContainer if err := s.removeSandbox(sb.id); err != nil {
s.releaseContainerName(podInfraContainer.Name()) return fmt.Errorf("error removing sandbox %s: %v", sb.id, err)
s.removeContainer(podInfraContainer)
if err = s.ctrIDIndex.Delete(podInfraContainer.ID()); err != nil {
return err
} }
sb.infraContainer = nil sb.infraContainer = nil
s.releasePodName(sb.name)
s.removeSandbox(sb.id)
if err = s.podIDIndex.Delete(sb.id); err != nil {
return err
}
logrus.Debugf("forgetting removed pod %s", sb.id) logrus.Debugf("forgetting removed pod %s", sb.id)
} }
for sandboxID := range newPods { for sandboxID := range newPods {
@ -482,44 +435,6 @@ func (s *Server) update() error {
return nil return nil
} }
func (s *Server) reservePodName(id, name string) (string, error) {
if err := s.podNameIndex.Reserve(name, id); err != nil {
if err == registrar.ErrNameReserved {
id, err := s.podNameIndex.Get(name)
if err != nil {
logrus.Warnf("conflict, pod name %q already reserved", name)
return "", err
}
return "", fmt.Errorf("conflict, name %q already reserved for pod %q", name, id)
}
return "", fmt.Errorf("error reserving pod name %q", name)
}
return name, nil
}
func (s *Server) releasePodName(name string) {
s.podNameIndex.Release(name)
}
func (s *Server) reserveContainerName(id, name string) (string, error) {
if err := s.ctrNameIndex.Reserve(name, id); err != nil {
if err == registrar.ErrNameReserved {
id, err := s.ctrNameIndex.Get(name)
if err != nil {
logrus.Warnf("conflict, ctr name %q already reserved", name)
return "", err
}
return "", fmt.Errorf("conflict, name %q already reserved for ctr %q", name, id)
}
return "", fmt.Errorf("error reserving ctr name %s", name)
}
return name, nil
}
func (s *Server) releaseContainerName(name string) {
s.ctrNameIndex.Release(name)
}
// cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown // cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown
func (s *Server) cleanupSandboxesOnShutdown() { func (s *Server) cleanupSandboxesOnShutdown() {
_, err := os.Stat(shutdownFile) _, err := os.Stat(shutdownFile)
@ -575,8 +490,6 @@ func New(config *Config) (*Server, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sandboxes := make(map[string]*sandbox)
containers := oci.NewMemoryStore()
netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir) netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -592,10 +505,7 @@ func New(config *Config) (*Server, error) {
netPlugin: netPlugin, netPlugin: netPlugin,
hostportManager: hostportManager, hostportManager: hostportManager,
config: *config, config: *config,
state: &serverState{ state: NewInMemoryState(),
sandboxes: sandboxes,
containers: containers,
},
seccompEnabled: seccomp.IsEnabled(), seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile, appArmorProfile: config.ApparmorProfile,
@ -618,10 +528,6 @@ func New(config *Config) (*Server, error) {
} }
} }
s.podIDIndex = truncindex.NewTruncIndex([]string{})
s.podNameIndex = registrar.NewRegistrar()
s.ctrIDIndex = truncindex.NewTruncIndex([]string{})
s.ctrNameIndex = registrar.NewRegistrar()
s.imageContext = &types.SystemContext{ s.imageContext = &types.SystemContext{
SignaturePolicyPath: config.ImageConfig.SignaturePolicyPath, SignaturePolicyPath: config.ImageConfig.SignaturePolicyPath,
} }
@ -656,73 +562,53 @@ func New(config *Config) (*Server, error) {
s.stream.streamServer.Start(true) s.stream.streamServer.Start(true)
}() }()
logrus.Debugf("sandboxes: %v", s.state.sandboxes)
logrus.Debugf("containers: %v", s.state.containers)
return s, nil return s, nil
} }
type serverState struct { func (s *Server) addSandbox(sb *sandbox) error {
sandboxes map[string]*sandbox return s.state.AddSandbox(sb)
containers oci.ContainerStorer
} }
func (s *Server) addSandbox(sb *sandbox) { func (s *Server) getSandbox(id string) (*sandbox, error) {
s.stateLock.Lock() return s.state.GetSandbox(id)
s.state.sandboxes[sb.id] = sb
s.stateLock.Unlock()
}
func (s *Server) getSandbox(id string) *sandbox {
s.stateLock.Lock()
sb := s.state.sandboxes[id]
s.stateLock.Unlock()
return sb
} }
func (s *Server) hasSandbox(id string) bool { func (s *Server) hasSandbox(id string) bool {
s.stateLock.Lock() return s.state.HasSandbox(id)
_, ok := s.state.sandboxes[id]
s.stateLock.Unlock()
return ok
} }
func (s *Server) removeSandbox(id string) { func (s *Server) removeSandbox(id string) error {
s.stateLock.Lock() return s.state.DeleteSandbox(id)
delete(s.state.sandboxes, id)
s.stateLock.Unlock()
} }
func (s *Server) addContainer(c *oci.Container) { func (s *Server) addContainer(c *oci.Container) error {
s.stateLock.Lock() return s.state.AddContainer(c, c.Sandbox())
sandbox := s.state.sandboxes[c.Sandbox()]
// TODO(runcom): handle !ok above!!! otherwise it panics!
sandbox.addContainer(c)
s.state.containers.Add(c.ID(), c)
s.stateLock.Unlock()
} }
func (s *Server) getContainer(id string) *oci.Container { func (s *Server) getContainer(id string) (*oci.Container, error) {
s.stateLock.Lock() sbID, err := s.state.GetContainerSandbox(id)
c := s.state.containers.Get(id) if err != nil {
s.stateLock.Unlock() return nil, err
return c }
return s.state.GetContainer(id, sbID)
} }
// GetSandboxContainer returns the infra container for a given sandbox // GetSandboxContainer returns the infra container for a given sandbox
func (s *Server) GetSandboxContainer(id string) *oci.Container { func (s *Server) GetSandboxContainer(id string) (*oci.Container, error) {
sb := s.getSandbox(id) sb, err := s.getSandbox(id)
return sb.infraContainer if err != nil {
return nil, err
}
return sb.infraContainer, nil
} }
// GetContainer returns a container by its ID // GetContainer returns a container by its ID
func (s *Server) GetContainer(id string) *oci.Container { func (s *Server) GetContainer(id string) (*oci.Container, error) {
return s.getContainer(id) return s.getContainer(id)
} }
func (s *Server) removeContainer(c *oci.Container) { func (s *Server) removeContainer(c *oci.Container) error {
s.stateLock.Lock() return s.state.DeleteContainer(c.ID(), c.Sandbox())
sandbox := s.state.sandboxes[c.Sandbox()]
sandbox.removeContainer(c)
s.state.containers.Delete(c.ID())
s.stateLock.Unlock()
} }

29
server/state_store.go Normal file
View file

@ -0,0 +1,29 @@
package server
import (
"github.com/kubernetes-incubator/cri-o/oci"
)
// StateStore stores the state of the CRI-O server, including active pods and
// containers
type StateStore interface {
AddSandbox(s *sandbox) error
HasSandbox(id string) bool
DeleteSandbox(id string) error
// These should modify the associated sandbox without prompting
AddContainer(c *oci.Container, sandboxID string) error
HasContainer(id, sandboxID string) bool
DeleteContainer(id, sandboxID string) error
// These two require full, explicit ID
GetSandbox(id string) (*sandbox, error)
GetContainer(id, sandboxID string) (*oci.Container, error)
// Get ID of sandbox container belongs to
GetContainerSandbox(id string) (string, error)
// Following 4 should accept partial names as long as they are globally unique
LookupSandboxByName(name string) (*sandbox, error)
LookupSandboxByID(id string) (*sandbox, error)
LookupContainerByName(name string) (*oci.Container, error)
LookupContainerByID(id string) (*oci.Container, error)
GetAllSandboxes() ([]*sandbox, error)
GetAllContainers() ([]*oci.Container, error)
}

View file

@ -11,7 +11,7 @@ function teardown() {
run crioctl ctr status --id randomid run crioctl ctr status --id randomid
echo "$output" echo "$output"
[ "$status" -eq 1 ] [ "$status" -eq 1 ]
[[ "$output" =~ "container with ID starting with randomid not found" ]] [[ "$output" =~ "container with ID starting with randomid could not be retrieved: cannot resolve container ID randomid: ID does not exist" ]]
stop_crio stop_crio
} }