Make state and sandbox operations atomic. Minor refactors.

Small API change to state.Store to remove unnecessary parameter from
AddContainer.

Signed-off-by: Matthew Heon <mheon@redhat.com>
This commit is contained in:
Matthew Heon 2017-04-07 11:58:59 -04:00
parent 05a2bc8f73
commit 16fb71fea0
3 changed files with 145 additions and 54 deletions

View file

@ -576,7 +576,7 @@ func (s *Server) removeSandbox(id string) error {
} }
func (s *Server) addContainer(c *oci.Container) error { func (s *Server) addContainer(c *oci.Container) error {
return s.state.AddContainer(c, c.Sandbox()) return s.state.AddContainer(c)
} }
func (s *Server) getContainer(id string) (*oci.Container, error) { func (s *Server) getContainer(id string) (*oci.Container, error) {

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/registrar" "github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex" "github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
@ -38,35 +39,52 @@ func NewInMemoryState() Store {
} }
// AddSandbox adds a sandbox and any containers in it to the state // AddSandbox adds a sandbox and any containers in it to the state
func (s *InMemoryState) AddSandbox(sandbox *sandbox.Sandbox) error { func (s *InMemoryState) AddSandbox(sandbox *sandbox.Sandbox) (err error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if sandbox == nil {
return fmt.Errorf("nil passed as sandbox to AddSandbox")
}
if _, exist := s.sandboxes[sandbox.ID()]; exist { if _, exist := s.sandboxes[sandbox.ID()]; exist {
return fmt.Errorf("sandbox with ID %v already exists", sandbox.ID()) return fmt.Errorf("sandbox with ID %v already exists", sandbox.ID())
} }
// We shouldn't share ID with any containers, either // We shouldn't share ID with any containers
// Our pod infra container will share our ID and we don't want it to conflict with anything // Our pod infra container will share our ID and we don't want it to conflict with anything
if ctrCheck := s.containers.Get(sandbox.ID()); ctrCheck != nil { if ctrCheck := s.containers.Get(sandbox.ID()); ctrCheck != nil {
return fmt.Errorf("requested sandbox ID %v conflicts with existing container ID", sandbox.ID()) return fmt.Errorf("requested sandbox ID %v conflicts with existing container ID", sandbox.ID())
} }
s.sandboxes[sandbox.ID()] = sandbox if err := s.addSandboxMappings(sandbox); err != nil {
if err := s.podNameIndex.Reserve(sandbox.Name(), sandbox.ID()); err != nil { return err
return fmt.Errorf("error registering sandbox name: %v", err)
}
if err := s.podIDIndex.Add(sandbox.ID()); err != nil {
return fmt.Errorf("error registering sandbox ID: %v", err)
} }
defer func() {
if err != nil {
if err2 := s.deleteSandboxMappings(sandbox); err2 != nil {
logrus.Errorf("Error removing mappings for incompletely-added sandbox %s: %v", sandbox.ID(), err)
}
}
}()
// If there are containers in the sandbox add them to the mapping // If there are containers in the sandbox add them to the mapping
containers := sandbox.Containers() addedCtrs := make([]*oci.Container, 0, len(sandbox.Containers()))
for _, ctr := range containers { for _, ctr := range sandbox.Containers() {
if err := s.addContainerMappings(ctr, true); err != nil { if err := s.addContainerMappings(ctr, true); err != nil {
return fmt.Errorf("error adding container %v mappings in sandbox %v", ctr.ID(), sandbox.ID()) return fmt.Errorf("error adding container %v mappings in sandbox %v", ctr.ID(), sandbox.ID())
} }
addedCtrs = append(addedCtrs, ctr)
} }
defer func() {
if err != nil {
for _, ctr := range addedCtrs {
if err2 := s.deleteContainerMappings(ctr, true); err2 != nil {
logrus.Errorf("Error removing container %s mappings: %v", ctr.ID(), err2)
}
}
}
}()
// Add the pod infrastructure container to mappings // Add the pod infrastructure container to mappings
// TODO: Right now, we don't add it to the all containers listing. We may want to change this. // TODO: Right now, we don't add it to the all containers listing. We may want to change this.
@ -77,6 +95,24 @@ func (s *InMemoryState) AddSandbox(sandbox *sandbox.Sandbox) error {
return nil return nil
} }
// Add sandbox name, ID to appropriate mappings
func (s *InMemoryState) addSandboxMappings(sb *sandbox.Sandbox) error {
if _, exist := s.sandboxes[sb.ID()]; exist {
return fmt.Errorf("sandbox with ID %s already exists in sandboxes map", sb.ID())
}
if err := s.podNameIndex.Reserve(sb.Name(), sb.ID()); err != nil {
return fmt.Errorf("error registering sandbox name: %v", err)
}
if err := s.podIDIndex.Add(sb.ID()); err != nil {
s.podNameIndex.Release(sb.Name())
return fmt.Errorf("error registering sandbox ID: %v", err)
}
s.sandboxes[sb.ID()] = sb
return nil
}
// HasSandbox determines if a given sandbox exists in the state // HasSandbox determines if a given sandbox exists in the state
func (s *InMemoryState) HasSandbox(id string) bool { func (s *InMemoryState) HasSandbox(id string) bool {
s.lock.Lock() s.lock.Lock()
@ -88,39 +124,68 @@ func (s *InMemoryState) HasSandbox(id string) bool {
} }
// DeleteSandbox removes a sandbox from the state // DeleteSandbox removes a sandbox from the state
func (s *InMemoryState) DeleteSandbox(id string) error { func (s *InMemoryState) DeleteSandbox(id string) (err error) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if _, exist := s.sandboxes[id]; !exist { if _, exist := s.sandboxes[id]; !exist {
return fmt.Errorf("no sandbox with ID %v exists, cannot delete", id) return errNoSuchSandbox(id)
} }
name := s.sandboxes[id].Name() sb := s.sandboxes[id]
containers := s.sandboxes[id].Containers()
infraContainer := s.sandboxes[id].InfraContainer()
delete(s.sandboxes, id) if err := s.deleteSandboxMappings(sb); err != nil {
s.podNameIndex.Release(name) return err
if err := s.podIDIndex.Delete(id); err != nil {
return fmt.Errorf("error unregistering sandbox ID: %v", err)
} }
defer func() {
if err != nil {
if err2 := s.addSandboxMappings(sb); err2 != nil {
logrus.Errorf("Error re-adding sandbox mappings: %v", err2)
}
}
}()
removedCtrs := make([]*oci.Container, 0, len(sb.Containers()))
// If there are containers left in the sandbox delete them from the mappings // If there are containers left in the sandbox delete them from the mappings
for _, ctr := range containers { for _, ctr := range sb.Containers() {
if err := s.deleteContainerMappings(ctr, true); err != nil { if err := s.deleteContainerMappings(ctr, true); err != nil {
return fmt.Errorf("error removing container %v mappings: %v", ctr.ID(), err) return fmt.Errorf("error removing container %v mappings: %v", ctr.ID(), err)
} }
removedCtrs = append(removedCtrs, ctr)
} }
defer func() {
if err != nil {
for _, ctr := range removedCtrs {
if err2 := s.addContainerMappings(ctr, true); err2 != nil {
logrus.Errorf("Error re-adding mappings for container %s: %v", ctr.ID(), err2)
}
}
}
}()
// Delete infra container from mappings // Delete infra container from mappings
if err := s.deleteContainerMappings(infraContainer, false); err != nil { if err := s.deleteContainerMappings(sb.InfraContainer(), false); err != nil {
return fmt.Errorf("error removing infra container %v from mappings: %v", infraContainer.ID(), err) return fmt.Errorf("error removing infra container %v from mappings: %v", sb.InfraContainer().ID(), err)
} }
return nil return nil
} }
// Remove sandbox name, ID to appropriate mappings
func (s *InMemoryState) deleteSandboxMappings(sb *sandbox.Sandbox) error {
if _, exist := s.sandboxes[sb.ID()]; !exist {
return fmt.Errorf("sandbox with ID %s does not exist in sandboxes map", sb.ID())
}
if err := s.podIDIndex.Delete(sb.ID()); err != nil {
return fmt.Errorf("error unregistering sandbox %s: %v", sb.ID(), err)
}
delete(s.sandboxes, sb.ID())
s.podNameIndex.Release(sb.Name())
return nil
}
// GetSandbox returns a sandbox given its full ID // GetSandbox returns a sandbox given its full ID
func (s *InMemoryState) GetSandbox(id string) (*sandbox.Sandbox, error) { func (s *InMemoryState) GetSandbox(id string) (*sandbox.Sandbox, error) {
s.lock.Lock() s.lock.Lock()
@ -128,7 +193,7 @@ func (s *InMemoryState) GetSandbox(id string) (*sandbox.Sandbox, error) {
sandbox, ok := s.sandboxes[id] sandbox, ok := s.sandboxes[id]
if !ok { if !ok {
return nil, fmt.Errorf("no sandbox with id %v exists", id) return nil, errNoSuchSandbox(id)
} }
return sandbox, nil return sandbox, nil
@ -146,7 +211,7 @@ func (s *InMemoryState) LookupSandboxByName(name string) (*sandbox.Sandbox, erro
sandbox, ok := s.sandboxes[id] sandbox, ok := s.sandboxes[id]
if !ok { if !ok {
// This should never happen // This should never happen - our internal state must be desynced
return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", id) return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", id)
} }
@ -166,7 +231,7 @@ func (s *InMemoryState) LookupSandboxByID(id string) (*sandbox.Sandbox, error) {
sandbox, ok := s.sandboxes[fullID] sandbox, ok := s.sandboxes[fullID]
if !ok { if !ok {
// This should never happen // This should never happen, internal state must be desynced
return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", fullID) return nil, fmt.Errorf("cannot find sandbox %v in sandboxes map", fullID)
} }
@ -187,26 +252,34 @@ func (s *InMemoryState) GetAllSandboxes() ([]*sandbox.Sandbox, error) {
} }
// AddContainer adds a single container to a given sandbox in the state // AddContainer adds a single container to a given sandbox in the state
func (s *InMemoryState) AddContainer(c *oci.Container, sandboxID string) error { func (s *InMemoryState) AddContainer(c *oci.Container) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
if c.Sandbox() != sandboxID { if c == nil {
return fmt.Errorf("cannot add container to sandbox %v as it is part of sandbox %v", sandboxID, c.Sandbox()) return fmt.Errorf("nil passed as container to AddContainer")
} }
sandbox, ok := s.sandboxes[sandboxID] sandbox, ok := s.sandboxes[c.Sandbox()]
if !ok { if !ok {
return fmt.Errorf("sandbox with ID %v does not exist, cannot add container", sandboxID) return errNoSuchSandbox(c.Sandbox())
} }
if ctr := sandbox.GetContainer(c.ID()); ctr != nil { if ctr := sandbox.GetContainer(c.ID()); ctr != nil {
return fmt.Errorf("container with ID %v already exists in sandbox %v", c.ID(), sandboxID) return fmt.Errorf("container with ID %v already exists in sandbox %v", c.ID(), c.Sandbox())
}
if sandbox.InfraContainer().ID() == c.ID() {
return fmt.Errorf("container is infra container of sandbox %s, refusing to add to containers list", c.Sandbox())
}
if err := s.addContainerMappings(c, true); err != nil {
return err
} }
sandbox.AddContainer(c) sandbox.AddContainer(c)
return s.addContainerMappings(c, true) return nil
} }
// Add container ID, Name and Sandbox mappings // Add container ID, Name and Sandbox mappings
@ -218,18 +291,18 @@ func (s *InMemoryState) addContainerMappings(c *oci.Container, addToContainers b
// TODO: if not a pod infra container, check if it conflicts with existing sandbox ID? // TODO: if not a pod infra container, check if it conflicts with existing sandbox ID?
// Does this matter? // Does this matter?
if err := s.ctrNameIndex.Reserve(c.Name(), c.ID()); err != nil {
return fmt.Errorf("error registering name for container %s: %v", c.ID(), err)
}
if err := s.ctrIDIndex.Add(c.ID()); err != nil {
s.ctrNameIndex.Release(c.ID())
return fmt.Errorf("error registering ID for container %s: %v", c.ID(), err)
}
if addToContainers { if addToContainers {
s.containers.Add(c.ID(), c) s.containers.Add(c.ID(), c)
} }
if err := s.ctrNameIndex.Reserve(c.Name(), c.ID()); err != nil {
s.containers.Delete(c.ID())
return fmt.Errorf("error registering container name: %v", err)
}
if err := s.ctrIDIndex.Add(c.ID()); err != nil {
s.containers.Delete(c.ID())
s.ctrNameIndex.Release(c.ID())
return fmt.Errorf("error registering container ID: %v", err)
}
return nil return nil
} }
@ -256,17 +329,21 @@ func (s *InMemoryState) DeleteContainer(id, sandboxID string) error {
sandbox, ok := s.sandboxes[sandboxID] sandbox, ok := s.sandboxes[sandboxID]
if !ok { if !ok {
return fmt.Errorf("sandbox with ID %v does not exist", sandboxID) return errNoSuchSandbox(sandboxID)
} }
ctr := sandbox.GetContainer(id) ctr := sandbox.GetContainer(id)
if ctr == nil { if ctr == nil {
return fmt.Errorf("sandbox %v has no container with ID %v", sandboxID, id) return errNoSuchContainerInSandbox(id, sandboxID)
}
if err := s.deleteContainerMappings(ctr, true); err != nil {
return nil
} }
sandbox.RemoveContainer(id) sandbox.RemoveContainer(id)
return s.deleteContainerMappings(ctr, true) return nil
} }
// Deletes container from the ID and Name mappings and optionally from the global containers list // Deletes container from the ID and Name mappings and optionally from the global containers list
@ -275,13 +352,15 @@ func (s *InMemoryState) deleteContainerMappings(ctr *oci.Container, deleteFromCo
return fmt.Errorf("container ID %v does not exist in containers store", ctr.ID()) return fmt.Errorf("container ID %v does not exist in containers store", ctr.ID())
} }
if err := s.ctrIDIndex.Delete(ctr.ID()); err != nil {
return fmt.Errorf("error unregistering container ID for %s: %v", ctr.ID(), err)
}
s.ctrNameIndex.Release(ctr.Name())
if deleteFromContainers { if deleteFromContainers {
s.containers.Delete(ctr.ID()) s.containers.Delete(ctr.ID())
} }
s.ctrNameIndex.Release(ctr.Name())
if err := s.ctrIDIndex.Delete(ctr.ID()); err != nil {
return fmt.Errorf("error unregistering container ID: %v", err)
}
return nil return nil
} }
@ -302,7 +381,7 @@ func (s *InMemoryState) GetContainerSandbox(id string) (string, error) {
ctr := s.containers.Get(id) ctr := s.containers.Get(id)
if ctr == nil { if ctr == nil {
return "", fmt.Errorf("no container with ID %v found", id) return "", errNoSuchContainer(id)
} }
return ctr.Sandbox(), nil return ctr.Sandbox(), nil
@ -346,7 +425,7 @@ func (s *InMemoryState) GetAllContainers() ([]*oci.Container, error) {
func (s *InMemoryState) getContainer(id string) (*oci.Container, error) { func (s *InMemoryState) getContainer(id string) (*oci.Container, error) {
ctr := s.containers.Get(id) ctr := s.containers.Get(id)
if ctr == nil { if ctr == nil {
return nil, fmt.Errorf("cannot find container with ID %v", id) return nil, errNoSuchContainer(id)
} }
return s.getContainerFromSandbox(id, ctr.Sandbox()) return s.getContainerFromSandbox(id, ctr.Sandbox())
@ -357,13 +436,25 @@ func (s *InMemoryState) getContainer(id string) (*oci.Container, error) {
func (s *InMemoryState) getContainerFromSandbox(id, sandboxID string) (*oci.Container, error) { func (s *InMemoryState) getContainerFromSandbox(id, sandboxID string) (*oci.Container, error) {
sandbox, ok := s.sandboxes[sandboxID] sandbox, ok := s.sandboxes[sandboxID]
if !ok { if !ok {
return nil, fmt.Errorf("sandbox with ID %v does not exist", sandboxID) return nil, errNoSuchSandbox(sandboxID)
} }
ctr := sandbox.GetContainer(id) ctr := sandbox.GetContainer(id)
if ctr == nil { if ctr == nil {
return nil, fmt.Errorf("cannot find container %v in sandbox %v", id, sandboxID) return nil, errNoSuchContainerInSandbox(id, sandboxID)
} }
return ctr, nil return ctr, nil
} }
func errNoSuchSandbox(id string) error {
return fmt.Errorf("no sandbox with ID %s found", id)
}
func errNoSuchContainer(id string) error {
return fmt.Errorf("no container with ID %s found", id)
}
func errNoSuchContainerInSandbox(id string, sandboxID string) error {
return fmt.Errorf("no container with ID %s in sandbox %s", id, sandboxID)
}

View file

@ -11,7 +11,7 @@ type Store interface {
HasSandbox(id string) bool HasSandbox(id string) bool
DeleteSandbox(id string) error DeleteSandbox(id string) error
// These should modify the associated sandbox without prompting // These should modify the associated sandbox without prompting
AddContainer(c *oci.Container, sandboxID string) error AddContainer(c *oci.Container) error
HasContainer(id, sandboxID string) bool HasContainer(id, sandboxID string) bool
DeleteContainer(id, sandboxID string) error DeleteContainer(id, sandboxID string) error
// These two require full, explicit ID // These two require full, explicit ID