0b7348b35c
The storage library uses github.com/pkg/errors to wrap errors that it returns from many of its functions, so when passing them to os.IsNotExist() or comparing them to specific errors defined in the storage library, unwrap them using errors.Cause(). Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
278 lines
8.1 KiB
Go
278 lines
8.1 KiB
Go
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/kubernetes-incubator/cri-o/libkpod"
|
|
"github.com/kubernetes-incubator/cri-o/libkpod/sandbox"
|
|
"github.com/kubernetes-incubator/cri-o/oci"
|
|
"github.com/kubernetes-incubator/cri-o/pkg/ocicni"
|
|
"github.com/kubernetes-incubator/cri-o/pkg/storage"
|
|
"github.com/kubernetes-incubator/cri-o/server/apparmor"
|
|
"github.com/kubernetes-incubator/cri-o/server/seccomp"
|
|
"github.com/pkg/errors"
|
|
knet "k8s.io/apimachinery/pkg/util/net"
|
|
pb "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
|
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
|
|
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
|
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
|
|
utildbus "k8s.io/kubernetes/pkg/util/dbus"
|
|
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
|
)
|
|
|
|
const (
|
|
runtimeAPIVersion = "v1alpha1"
|
|
shutdownFile = "/var/lib/crio/crio.shutdown"
|
|
)
|
|
|
|
func isTrue(annotaton string) bool {
|
|
return annotaton == "true"
|
|
}
|
|
|
|
// streamService implements streaming.Runtime.
|
|
type streamService struct {
|
|
runtimeServer *Server // needed by Exec() endpoint
|
|
streamServer streaming.Server
|
|
streaming.Runtime
|
|
}
|
|
|
|
// Server implements the RuntimeService and ImageService
|
|
type Server struct {
|
|
*libkpod.ContainerServer
|
|
config Config
|
|
|
|
updateLock sync.RWMutex
|
|
netPlugin ocicni.CNIPlugin
|
|
hostportManager hostport.HostPortManager
|
|
|
|
seccompEnabled bool
|
|
seccompProfile seccomp.Seccomp
|
|
|
|
appArmorEnabled bool
|
|
appArmorProfile string
|
|
|
|
stream streamService
|
|
}
|
|
|
|
// GetExec returns exec stream request
|
|
func (s *Server) GetExec(req *pb.ExecRequest) (*pb.ExecResponse, error) {
|
|
return s.stream.streamServer.GetExec(req)
|
|
}
|
|
|
|
// GetAttach returns attach stream request
|
|
func (s *Server) GetAttach(req *pb.AttachRequest) (*pb.AttachResponse, error) {
|
|
return s.stream.streamServer.GetAttach(req)
|
|
}
|
|
|
|
// GetPortForward returns port forward stream request
|
|
func (s *Server) GetPortForward(req *pb.PortForwardRequest) (*pb.PortForwardResponse, error) {
|
|
return s.stream.streamServer.GetPortForward(req)
|
|
}
|
|
|
|
func (s *Server) restore() {
|
|
containers, err := s.Store().Containers()
|
|
if err != nil && !os.IsNotExist(errors.Cause(err)) {
|
|
logrus.Warnf("could not read containers and sandboxes: %v", err)
|
|
}
|
|
pods := map[string]*storage.RuntimeContainerMetadata{}
|
|
podContainers := map[string]*storage.RuntimeContainerMetadata{}
|
|
for _, container := range containers {
|
|
metadata, err2 := s.StorageRuntimeServer().GetContainerMetadata(container.ID)
|
|
if err2 != nil {
|
|
logrus.Warnf("error parsing metadata for %s: %v, ignoring", container.ID, err2)
|
|
continue
|
|
}
|
|
if metadata.Pod {
|
|
pods[container.ID] = &metadata
|
|
} else {
|
|
podContainers[container.ID] = &metadata
|
|
}
|
|
}
|
|
for containerID, metadata := range pods {
|
|
if err = s.LoadSandbox(containerID); err != nil {
|
|
logrus.Warnf("could not restore sandbox %s container %s: %v", metadata.PodID, containerID, err)
|
|
}
|
|
}
|
|
for containerID := range podContainers {
|
|
if err := s.LoadContainer(containerID); err != nil {
|
|
logrus.Warnf("could not restore container %s: %v", containerID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update makes changes to the server's state (lists of pods and containers) to
|
|
// reflect the list of pods and containers that are stored on disk, possibly
|
|
// having been modified by other parties
|
|
func (s *Server) Update() {
|
|
logrus.Debugf("updating sandbox and container information")
|
|
if err := s.ContainerServer.Update(); err != nil {
|
|
logrus.Errorf("error updating sandbox and container information: %v", err)
|
|
}
|
|
}
|
|
|
|
// cleanupSandboxesOnShutdown Remove all running Sandboxes on system shutdown
|
|
func (s *Server) cleanupSandboxesOnShutdown() {
|
|
_, err := os.Stat(shutdownFile)
|
|
if err == nil || !os.IsNotExist(err) {
|
|
logrus.Debugf("shutting down all sandboxes, on shutdown")
|
|
s.StopAllPodSandboxes()
|
|
err = os.Remove(shutdownFile)
|
|
if err != nil {
|
|
logrus.Warnf("Failed to remove %q", shutdownFile)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// Shutdown attempts to shut down the server's storage cleanly
|
|
func (s *Server) Shutdown() error {
|
|
// why do this on clean shutdown! we want containers left running when crio
|
|
// is down for whatever reason no?!
|
|
// notice this won't trigger just on system halt but also on normal
|
|
// crio.service restart!!!
|
|
s.cleanupSandboxesOnShutdown()
|
|
return s.ContainerServer.Shutdown()
|
|
}
|
|
|
|
// New creates a new Server with options provided
|
|
func New(config *Config) (*Server, error) {
|
|
if err := os.MkdirAll("/var/run/crio", 0755); err != nil {
|
|
return nil, err
|
|
}
|
|
containerServer, err := libkpod.New(&config.Config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
netPlugin, err := ocicni.InitCNI(config.NetworkDir, config.PluginDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iptInterface := utiliptables.New(utilexec.New(), utildbus.New(), utiliptables.ProtocolIpv4)
|
|
iptInterface.EnsureChain(utiliptables.TableNAT, iptablesproxy.KubeMarkMasqChain)
|
|
hostportManager := hostport.NewHostportManager()
|
|
|
|
s := &Server{
|
|
ContainerServer: containerServer,
|
|
|
|
netPlugin: netPlugin,
|
|
hostportManager: hostportManager,
|
|
config: *config,
|
|
seccompEnabled: seccomp.IsEnabled(),
|
|
appArmorEnabled: apparmor.IsEnabled(),
|
|
appArmorProfile: config.ApparmorProfile,
|
|
}
|
|
|
|
if s.seccompEnabled {
|
|
seccompProfile, fileErr := ioutil.ReadFile(config.SeccompProfile)
|
|
if fileErr != nil {
|
|
return nil, fmt.Errorf("opening seccomp profile (%s) failed: %v", config.SeccompProfile, fileErr)
|
|
}
|
|
var seccompConfig seccomp.Seccomp
|
|
if jsonErr := json.Unmarshal(seccompProfile, &seccompConfig); jsonErr != nil {
|
|
return nil, fmt.Errorf("decoding seccomp profile failed: %v", jsonErr)
|
|
}
|
|
s.seccompProfile = seccompConfig
|
|
}
|
|
|
|
if s.appArmorEnabled && s.appArmorProfile == apparmor.DefaultApparmorProfile {
|
|
if apparmorErr := apparmor.EnsureDefaultApparmorProfile(); apparmorErr != nil {
|
|
return nil, fmt.Errorf("ensuring the default apparmor profile is installed failed: %v", apparmorErr)
|
|
}
|
|
}
|
|
|
|
s.restore()
|
|
s.cleanupSandboxesOnShutdown()
|
|
|
|
bindAddress := net.ParseIP(config.StreamAddress)
|
|
if bindAddress == nil {
|
|
bindAddress, err = knet.ChooseBindAddress(net.IP{0, 0, 0, 0})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
_, err = net.LookupPort("tcp", config.StreamPort)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Prepare streaming server
|
|
streamServerConfig := streaming.DefaultConfig
|
|
streamServerConfig.Addr = net.JoinHostPort(bindAddress.String(), config.StreamPort)
|
|
s.stream.runtimeServer = s
|
|
s.stream.streamServer, err = streaming.NewServer(streamServerConfig, s.stream)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create streaming server")
|
|
}
|
|
|
|
// TODO: Is it should be started somewhere else?
|
|
go func() {
|
|
s.stream.streamServer.Start(true)
|
|
}()
|
|
|
|
logrus.Debugf("sandboxes: %v", s.ContainerServer.ListSandboxes())
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Server) addSandbox(sb *sandbox.Sandbox) {
|
|
s.ContainerServer.AddSandbox(sb)
|
|
}
|
|
|
|
func (s *Server) getSandbox(id string) *sandbox.Sandbox {
|
|
return s.ContainerServer.GetSandbox(id)
|
|
}
|
|
|
|
func (s *Server) hasSandbox(id string) bool {
|
|
return s.ContainerServer.HasSandbox(id)
|
|
}
|
|
|
|
func (s *Server) removeSandbox(id string) {
|
|
s.ContainerServer.RemoveSandbox(id)
|
|
}
|
|
|
|
func (s *Server) addContainer(c *oci.Container) {
|
|
s.ContainerServer.AddContainer(c)
|
|
}
|
|
|
|
func (s *Server) getContainer(id string) *oci.Container {
|
|
return s.ContainerServer.GetContainer(id)
|
|
}
|
|
|
|
// GetSandboxContainer returns the infra container for a given sandbox
|
|
func (s *Server) GetSandboxContainer(id string) *oci.Container {
|
|
return s.ContainerServer.GetSandboxContainer(id)
|
|
}
|
|
|
|
// GetContainer returns a container by its ID
|
|
func (s *Server) GetContainer(id string) *oci.Container {
|
|
return s.getContainer(id)
|
|
}
|
|
|
|
func (s *Server) removeContainer(c *oci.Container) {
|
|
s.ContainerServer.RemoveContainer(c)
|
|
}
|
|
|
|
func (s *Server) getPodSandboxFromRequest(podSandboxID string) (*sandbox.Sandbox, error) {
|
|
if podSandboxID == "" {
|
|
return nil, sandbox.ErrIDEmpty
|
|
}
|
|
|
|
sandboxID, err := s.PodIDIndex().Get(podSandboxID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("PodSandbox with ID starting with %s not found: %v", podSandboxID, err)
|
|
}
|
|
|
|
sb := s.getSandbox(sandboxID)
|
|
if sb == nil {
|
|
return nil, fmt.Errorf("specified pod sandbox not found: %s", sandboxID)
|
|
}
|
|
return sb, nil
|
|
}
|