Merge pull request #691 from mheon/remove_server_state

Move remaining server state to libkpod
This commit is contained in:
Mrunal Patel 2017-07-26 11:25:34 -07:00 committed by GitHub
commit 93f1ec3b1f
9 changed files with 136 additions and 91 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/registrar" "github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex" "github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
"github.com/kubernetes-incubator/cri-o/pkg/storage" "github.com/kubernetes-incubator/cri-o/pkg/storage"
) )
@ -22,6 +23,9 @@ type ContainerServer struct {
storageImageServer storage.ImageServer storageImageServer storage.ImageServer
ctrNameIndex *registrar.Registrar ctrNameIndex *registrar.Registrar
ctrIDIndex *truncindex.TruncIndex ctrIDIndex *truncindex.TruncIndex
podNameIndex *registrar.Registrar
podIDIndex *truncindex.TruncIndex
imageContext *types.SystemContext imageContext *types.SystemContext
stateLock sync.Locker stateLock sync.Locker
state *containerServerState state *containerServerState
@ -52,6 +56,16 @@ func (c *ContainerServer) CtrIDIndex() *truncindex.TruncIndex {
return c.ctrIDIndex return c.ctrIDIndex
} }
// PodNameIndex returns the index of pod names
func (c *ContainerServer) PodNameIndex() *registrar.Registrar {
return c.podNameIndex
}
// PodIDIndex returns the index of pod IDs
func (c *ContainerServer) PodIDIndex() *truncindex.TruncIndex {
return c.podIDIndex
}
// ImageContext returns the SystemContext for the ContainerServer // ImageContext returns the SystemContext for the ContainerServer
func (c *ContainerServer) ImageContext() *types.SystemContext { func (c *ContainerServer) ImageContext() *types.SystemContext {
return c.imageContext return c.imageContext
@ -65,10 +79,13 @@ func New(runtime *oci.Runtime, store cstorage.Store, imageService storage.ImageS
storageImageServer: imageService, storageImageServer: imageService,
ctrNameIndex: registrar.NewRegistrar(), ctrNameIndex: registrar.NewRegistrar(),
ctrIDIndex: truncindex.NewTruncIndex([]string{}), ctrIDIndex: truncindex.NewTruncIndex([]string{}),
podNameIndex: registrar.NewRegistrar(),
podIDIndex: truncindex.NewTruncIndex([]string{}),
imageContext: &types.SystemContext{SignaturePolicyPath: signaturePolicyPath}, imageContext: &types.SystemContext{SignaturePolicyPath: signaturePolicyPath},
stateLock: new(sync.Mutex), stateLock: new(sync.Mutex),
state: &containerServerState{ state: &containerServerState{
containers: oci.NewMemoryStore(), containers: oci.NewMemoryStore(),
sandboxes: make(map[string]*sandbox.Sandbox),
}, },
} }
} }
@ -124,6 +141,28 @@ func (c *ContainerServer) ReleaseContainerName(name string) {
c.ctrNameIndex.Release(name) c.ctrNameIndex.Release(name)
} }
// ReservePodName holds a name for a pod that is being created
func (c *ContainerServer) ReservePodName(id, name string) (string, error) {
if err := c.podNameIndex.Reserve(name, id); err != nil {
if err == registrar.ErrNameReserved {
id, err := c.podNameIndex.Get(name)
if err != nil {
logrus.Warnf("conflict, pod name %q already reserved", name)
return "", err
}
return "", fmt.Errorf("conflict, name %q already reserved for pod %q", name, id)
}
return "", fmt.Errorf("error reserving pod name %q", name)
}
return name, nil
}
// ReleasePodName releases a pod name from the index so it can be used by other
// pods
func (c *ContainerServer) ReleasePodName(name string) {
c.podNameIndex.Release(name)
}
// Shutdown attempts to shut down the server's storage cleanly // Shutdown attempts to shut down the server's storage cleanly
func (c *ContainerServer) Shutdown() error { func (c *ContainerServer) Shutdown() error {
_, err := c.store.Shutdown(false) _, err := c.store.Shutdown(false)
@ -132,12 +171,15 @@ func (c *ContainerServer) Shutdown() error {
type containerServerState struct { type containerServerState struct {
containers oci.ContainerStorer containers oci.ContainerStorer
sandboxes map[string]*sandbox.Sandbox
} }
// AddContainer adds a container to the container state store // AddContainer adds a container to the container state store
func (c *ContainerServer) AddContainer(ctr *oci.Container) { func (c *ContainerServer) AddContainer(ctr *oci.Container) {
c.stateLock.Lock() c.stateLock.Lock()
defer c.stateLock.Unlock() defer c.stateLock.Unlock()
sandbox := c.state.sandboxes[ctr.Sandbox()]
sandbox.AddContainer(ctr)
c.state.containers.Add(ctr.ID(), ctr) c.state.containers.Add(ctr.ID(), ctr)
} }
@ -148,10 +190,21 @@ func (c *ContainerServer) GetContainer(id string) *oci.Container {
return c.state.containers.Get(id) return c.state.containers.Get(id)
} }
// HasContainer checks if a container exists in the state
func (c *ContainerServer) HasContainer(id string) bool {
c.stateLock.Lock()
defer c.stateLock.Unlock()
ctr := c.state.containers.Get(id)
return ctr != nil
}
// RemoveContainer removes a container from the container state store // RemoveContainer removes a container from the container state store
func (c *ContainerServer) RemoveContainer(ctr *oci.Container) { func (c *ContainerServer) RemoveContainer(ctr *oci.Container) {
c.stateLock.Lock() c.stateLock.Lock()
defer c.stateLock.Unlock() defer c.stateLock.Unlock()
sbID := ctr.Sandbox()
sb := c.state.sandboxes[sbID]
sb.RemoveContainer(ctr)
c.state.containers.Delete(ctr.ID()) c.state.containers.Delete(ctr.ID())
} }
@ -161,3 +214,55 @@ func (c *ContainerServer) ListContainers() []*oci.Container {
defer c.stateLock.Unlock() defer c.stateLock.Unlock()
return c.state.containers.List() return c.state.containers.List()
} }
// AddSandbox adds a sandbox to the sandbox state store
func (c *ContainerServer) AddSandbox(sb *sandbox.Sandbox) {
c.stateLock.Lock()
defer c.stateLock.Unlock()
c.state.sandboxes[sb.ID()] = sb
}
// GetSandbox returns a sandbox by its ID
func (c *ContainerServer) GetSandbox(id string) *sandbox.Sandbox {
c.stateLock.Lock()
defer c.stateLock.Unlock()
return c.state.sandboxes[id]
}
// GetSandboxContainer returns a sandbox's infra container
func (c *ContainerServer) GetSandboxContainer(id string) *oci.Container {
c.stateLock.Lock()
defer c.stateLock.Unlock()
sb, ok := c.state.sandboxes[id]
if !ok {
return nil
}
return sb.InfraContainer()
}
// HasSandbox checks if a sandbox exists in the state
func (c *ContainerServer) HasSandbox(id string) bool {
c.stateLock.Lock()
defer c.stateLock.Unlock()
_, ok := c.state.sandboxes[id]
return ok
}
// RemoveSandbox removes a sandbox from the state store
func (c *ContainerServer) RemoveSandbox(id string) {
c.stateLock.Lock()
defer c.stateLock.Unlock()
delete(c.state.sandboxes, id)
}
// ListSandboxes lists all sandboxes in the state store
func (c *ContainerServer) ListSandboxes() []*sandbox.Sandbox {
c.stateLock.Lock()
defer c.stateLock.Unlock()
sbArray := make([]*sandbox.Sandbox, 0, len(c.state.sandboxes))
for _, sb := range c.state.sandboxes {
sbArray = append(sbArray, sb)
}
return sbArray
}

View file

@ -275,7 +275,7 @@ func (s *Server) CreateContainer(ctx context.Context, req *pb.CreateContainerReq
return nil, fmt.Errorf("PodSandboxId should not be empty") return nil, fmt.Errorf("PodSandboxId should not be empty")
} }
sandboxID, err := s.podIDIndex.Get(sbID) sandboxID, err := s.PodIDIndex().Get(sbID)
if err != nil { if err != nil {
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", sbID, err) return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", sbID, err)
} }

View file

@ -55,7 +55,7 @@ func (s *Server) ListContainers(ctx context.Context, req *pb.ListContainersReque
} }
} else { } else {
if filter.PodSandboxId != "" { if filter.PodSandboxId != "" {
pod := s.state.sandboxes[filter.PodSandboxId] pod := s.ContainerServer.GetSandbox(filter.PodSandboxId)
if pod == nil { if pod == nil {
ctrList = []*oci.Container{} ctrList = []*oci.Container{}
} else { } else {

View file

@ -54,7 +54,7 @@ func (s *Server) generatePodIDandName(sandboxConfig *pb.PodSandboxConfig) (strin
if sandboxConfig.Metadata.Namespace == "" { if sandboxConfig.Metadata.Namespace == "" {
return "", "", fmt.Errorf("cannot generate pod ID without namespace") return "", "", fmt.Errorf("cannot generate pod ID without namespace")
} }
name, err := s.reservePodName(id, makeSandboxName(sandboxConfig)) name, err := s.ReservePodName(id, makeSandboxName(sandboxConfig))
if err != nil { if err != nil {
return "", "", err return "", "", err
} }

View file

@ -32,7 +32,7 @@ func (s *Server) ListPodSandbox(ctx context.Context, req *pb.ListPodSandboxReque
logrus.Debugf("ListPodSandboxRequest %+v", req) logrus.Debugf("ListPodSandboxRequest %+v", req)
var pods []*pb.PodSandbox var pods []*pb.PodSandbox
var podList []*sandbox.Sandbox var podList []*sandbox.Sandbox
for _, sb := range s.state.sandboxes { for _, sb := range s.ContainerServer.ListSandboxes() {
podList = append(podList, sb) podList = append(podList, sb)
} }
@ -40,7 +40,7 @@ func (s *Server) ListPodSandbox(ctx context.Context, req *pb.ListPodSandboxReque
// Filter by pod id first. // Filter by pod id first.
if filter != nil { if filter != nil {
if filter.Id != "" { if filter.Id != "" {
id, err := s.podIDIndex.Get(filter.Id) id, err := s.PodIDIndex().Get(filter.Id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -87,9 +87,9 @@ func (s *Server) RemovePodSandbox(ctx context.Context, req *pb.RemovePodSandboxR
return nil, fmt.Errorf("failed to delete infra container %s in pod sandbox %s from index: %v", podInfraContainer.ID(), sb.ID(), err) return nil, fmt.Errorf("failed to delete infra container %s in pod sandbox %s from index: %v", podInfraContainer.ID(), sb.ID(), err)
} }
s.releasePodName(sb.Name()) s.ReleasePodName(sb.Name())
s.removeSandbox(sb.ID()) s.removeSandbox(sb.ID())
if err := s.podIDIndex.Delete(sb.ID()); err != nil { if err := s.PodIDIndex().Delete(sb.ID()); err != nil {
return nil, fmt.Errorf("failed to delete pod sandbox %s from index: %v", sb.ID(), err) return nil, fmt.Errorf("failed to delete pod sandbox %s from index: %v", sb.ID(), err)
} }

View file

@ -138,7 +138,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
defer func() { defer func() {
if err != nil { if err != nil {
s.releasePodName(name) s.ReleasePodName(name)
} }
}() }()
@ -358,13 +358,13 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
} }
}() }()
if err = s.podIDIndex.Add(id); err != nil { if err = s.PodIDIndex().Add(id); err != nil {
return nil, err return nil, err
} }
defer func() { defer func() {
if err != nil { if err != nil {
if err := s.podIDIndex.Delete(id); err != nil { if err := s.PodIDIndex().Delete(id); err != nil {
logrus.Warnf("couldn't delete pod id %s from idIndex", id) logrus.Warnf("couldn't delete pod id %s from idIndex", id)
} }
} }

View file

@ -120,7 +120,7 @@ func (s *Server) StopPodSandbox(ctx context.Context, req *pb.StopPodSandboxReque
// StopAllPodSandboxes removes all pod sandboxes // StopAllPodSandboxes removes all pod sandboxes
func (s *Server) StopAllPodSandboxes() { func (s *Server) StopAllPodSandboxes() {
logrus.Debugf("StopAllPodSandboxes") logrus.Debugf("StopAllPodSandboxes")
for _, sb := range s.state.sandboxes { for _, sb := range s.ContainerServer.ListSandboxes() {
pod := &pb.StopPodSandboxRequest{ pod := &pb.StopPodSandboxRequest{
PodSandboxId: sb.ID(), PodSandboxId: sb.ID(),
} }

View file

@ -12,8 +12,6 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
cstorage "github.com/containers/storage" cstorage "github.com/containers/storage"
"github.com/docker/docker/pkg/registrar"
"github.com/docker/docker/pkg/truncindex"
"github.com/kubernetes-incubator/cri-o/libkpod" "github.com/kubernetes-incubator/cri-o/libkpod"
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox" "github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
"github.com/kubernetes-incubator/cri-o/oci" "github.com/kubernetes-incubator/cri-o/oci"
@ -56,13 +54,9 @@ type Server struct {
config Config config Config
storageRuntimeServer storage.RuntimeServer storageRuntimeServer storage.RuntimeServer
stateLock sync.Locker
updateLock sync.RWMutex updateLock sync.RWMutex
state *serverState
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
hostportManager hostport.HostPortManager hostportManager hostport.HostPortManager
podNameIndex *registrar.Registrar
podIDIndex *truncindex.TruncIndex
seccompEnabled bool seccompEnabled bool
seccompProfile seccomp.Seccomp seccompProfile seccomp.Seccomp
@ -192,13 +186,13 @@ func (s *Server) loadSandbox(id string) error {
return err return err
} }
name := m.Annotations[annotations.Name] name := m.Annotations[annotations.Name]
name, err = s.reservePodName(id, name) name, err = s.ReservePodName(id, name)
if err != nil { if err != nil {
return err return err
} }
defer func() { defer func() {
if err != nil { if err != nil {
s.releasePodName(name) s.ReleasePodName(name)
} }
}() }()
var metadata pb.PodSandboxMetadata var metadata pb.PodSandboxMetadata
@ -284,7 +278,7 @@ func (s *Server) loadSandbox(id string) error {
if err = s.CtrIDIndex().Add(scontainer.ID()); err != nil { if err = s.CtrIDIndex().Add(scontainer.ID()); err != nil {
return err return err
} }
if err = s.podIDIndex.Add(id); err != nil { if err = s.PodIDIndex().Add(id); err != nil {
return err return err
} }
return nil return nil
@ -390,7 +384,7 @@ func (s *Server) update() error {
} }
logrus.Debugf("forgetting removed pod container %s", c.ID()) logrus.Debugf("forgetting removed pod container %s", c.ID())
} }
s.podIDIndex.Iterate(func(id string) { s.PodIDIndex().Iterate(func(id string) {
if _, ok := oldPods[id]; !ok { if _, ok := oldPods[id]; !ok {
// this pod's ID wasn't in the updated list -> removed // this pod's ID wasn't in the updated list -> removed
removedPods[id] = id removedPods[id] = id
@ -410,9 +404,9 @@ func (s *Server) update() error {
return err return err
} }
sb.RemoveInfraContainer() sb.RemoveInfraContainer()
s.releasePodName(sb.Name()) s.ReleasePodName(sb.Name())
s.removeSandbox(sb.ID()) s.removeSandbox(sb.ID())
if err = s.podIDIndex.Delete(sb.ID()); err != nil { if err = s.PodIDIndex().Delete(sb.ID()); err != nil {
return err return err
} }
logrus.Debugf("forgetting removed pod %s", sb.ID()) logrus.Debugf("forgetting removed pod %s", sb.ID())
@ -436,25 +430,6 @@ func (s *Server) update() error {
return nil return nil
} }
func (s *Server) reservePodName(id, name string) (string, error) {
if err := s.podNameIndex.Reserve(name, id); err != nil {
if err == registrar.ErrNameReserved {
id, err := s.podNameIndex.Get(name)
if err != nil {
logrus.Warnf("conflict, pod name %q already reserved", name)
return "", err
}
return "", fmt.Errorf("conflict, name %q already reserved for pod %q", name, id)
}
return "", fmt.Errorf("error reserving pod name %q", name)
}
return name, nil
}
func (s *Server) releasePodName(name string) {
s.podNameIndex.Release(name)
}
// cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown // cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown
func (s *Server) cleanupSandboxesOnShutdown() { func (s *Server) cleanupSandboxesOnShutdown() {
_, err := os.Stat(shutdownFile) _, err := os.Stat(shutdownFile)
@ -512,7 +487,6 @@ func New(config *Config) (*Server, error) {
containerServer := libkpod.New(r, store, imageService, config.SignaturePolicyPath) containerServer := libkpod.New(r, store, imageService, config.SignaturePolicyPath)
sandboxes := make(map[string]*sandbox.Sandbox)
netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir) netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -524,13 +498,9 @@ func New(config *Config) (*Server, error) {
s := &Server{ s := &Server{
ContainerServer: *containerServer, ContainerServer: *containerServer,
storageRuntimeServer: storageRuntimeService, storageRuntimeServer: storageRuntimeService,
stateLock: new(sync.Mutex),
netPlugin: netPlugin, netPlugin: netPlugin,
hostportManager: hostportManager, hostportManager: hostportManager,
config: *config, config: *config,
state: &serverState{
sandboxes: sandboxes,
},
seccompEnabled: seccomp.IsEnabled(), seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile, appArmorProfile: config.ApparmorProfile,
@ -553,9 +523,6 @@ func New(config *Config) (*Server, error) {
} }
} }
s.podIDIndex = truncindex.NewTruncIndex([]string{})
s.podNameIndex = registrar.NewRegistrar()
s.restore() s.restore()
s.cleanupSandboxesOnShutdown() s.cleanupSandboxesOnShutdown()
@ -586,60 +553,37 @@ func New(config *Config) (*Server, error) {
s.stream.streamServer.Start(true) s.stream.streamServer.Start(true)
}() }()
logrus.Debugf("sandboxes: %v", s.state.sandboxes) logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes())
return s, nil return s, nil
} }
type serverState struct {
sandboxes map[string]*sandbox.Sandbox
}
func (s *Server) addSandbox(sb *sandbox.Sandbox) { func (s *Server) addSandbox(sb *sandbox.Sandbox) {
s.stateLock.Lock() s.ContainerServer.AddSandbox(sb)
s.state.sandboxes[sb.ID()] = sb
s.stateLock.Unlock()
} }
func (s *Server) getSandbox(id string) *sandbox.Sandbox { func (s *Server) getSandbox(id string) *sandbox.Sandbox {
s.stateLock.Lock() return s.ContainerServer.GetSandbox(id)
sb := s.state.sandboxes[id]
s.stateLock.Unlock()
return sb
} }
func (s *Server) hasSandbox(id string) bool { func (s *Server) hasSandbox(id string) bool {
s.stateLock.Lock() return s.ContainerServer.HasSandbox(id)
_, ok := s.state.sandboxes[id]
s.stateLock.Unlock()
return ok
} }
func (s *Server) removeSandbox(id string) { func (s *Server) removeSandbox(id string) {
s.stateLock.Lock() s.ContainerServer.RemoveSandbox(id)
delete(s.state.sandboxes, id)
s.stateLock.Unlock()
} }
func (s *Server) addContainer(c *oci.Container) { func (s *Server) addContainer(c *oci.Container) {
s.stateLock.Lock()
sandbox := s.state.sandboxes[c.Sandbox()]
// TODO(runcom): handle !ok above!!! otherwise it panics!
sandbox.AddContainer(c)
s.ContainerServer.AddContainer(c) s.ContainerServer.AddContainer(c)
s.stateLock.Unlock()
} }
func (s *Server) getContainer(id string) *oci.Container { func (s *Server) getContainer(id string) *oci.Container {
s.stateLock.Lock() return s.ContainerServer.GetContainer(id)
c := s.ContainerServer.GetContainer(id)
s.stateLock.Unlock()
return c
} }
// GetSandboxContainer returns the infra container for a given sandbox // GetSandboxContainer returns the infra container for a given sandbox
func (s *Server) GetSandboxContainer(id string) *oci.Container { func (s *Server) GetSandboxContainer(id string) *oci.Container {
sb := s.getSandbox(id) return s.ContainerServer.GetSandboxContainer(id)
return sb.InfraContainer()
} }
// GetContainer returns a container by its ID // GetContainer returns a container by its ID
@ -648,11 +592,7 @@ func (s *Server) GetContainer(id string) *oci.Container {
} }
func (s *Server) removeContainer(c *oci.Container) { func (s *Server) removeContainer(c *oci.Container) {
s.stateLock.Lock()
sandbox := s.state.sandboxes[c.Sandbox()]
sandbox.RemoveContainer(c)
s.ContainerServer.RemoveContainer(c) s.ContainerServer.RemoveContainer(c)
s.stateLock.Unlock()
} }
func (s *Server) getPodSandboxFromRequest(podSandboxID string) (*sandbox.Sandbox, error) { func (s *Server) getPodSandboxFromRequest(podSandboxID string) (*sandbox.Sandbox, error) {
@ -660,7 +600,7 @@ func (s *Server) getPodSandboxFromRequest(podSandboxID string) (*sandbox.Sandbox
return nil, sandbox.ErrIDEmpty return nil, sandbox.ErrIDEmpty
} }
sandboxID, err := s.podIDIndex.Get(podSandboxID) sandboxID, err := s.PodIDIndex().Get(podSandboxID)
if err != nil { if err != nil {
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", podSandboxID, err) return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", podSandboxID, err)
} }