ocicni: Monitor CNI configuration directory

There is nothing preventing Kubernetes from creating a pod sandbox
before the SDN is configured and ready.
For example kubeadm starts the kube-dns pod before the networking
add-on is ready. Therefore ocid may end up calling ocicni.SetUpPod()
without even having any CNI configuration file available.
In those cases, non host networking pods end up being disconnected
from the rest of the pod.

Here we address this SDN asynchronous issue by starting to monitor
the CNI configuration directory for new files if ocicni.InitCNI()
fails because it can not find a default network. We use the fsnotify
package for that.
Once a new file is added to the CNI configuration directory, we
attempt to configure the pod networking namespace again.

Signed-off-by: Samuel Ortiz <sameo@linux.intel.com>
This commit is contained in:
Samuel Ortiz 2017-03-21 12:31:38 +01:00
parent b9d8ee0940
commit 1a4639ae02
4 changed files with 181 additions and 14 deletions

View file

@ -7,7 +7,7 @@ func (noop *cniNoOp) Name() string {
return "CNINoOp"
}
func (noop *cniNoOp) SetUpPod(netnsPath string, namespace string, name string, containerID string) error {
func (noop *cniNoOp) SetUpPod(netnsPath string, namespace string, name string, containerID string, cb SetUpCallback, data interface{}) error {
return nil
}

View file

@ -11,6 +11,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containernetworking/cni/libcni"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/fsnotify/fsnotify"
)
type cniNetworkPlugin struct {
@ -23,6 +24,11 @@ type cniNetworkPlugin struct {
pluginDir string
cniDirs []string
vendorCNIDirPrefix string
cniReadyListeners []chan error
monitorNetDirChan chan error
monitorNetDirDone bool
}
type cniNetwork struct {
@ -31,6 +37,52 @@ type cniNetwork struct {
CNIConfig libcni.CNI
}
var (
errMissingDefaultNetwork = errors.New("Missing CNI default network")
errDefaultNetworkAlreadyExists = errors.New("CNI default network already exists")
errMonitoringTimeout = errors.New("CNI monitoring timeout")
)
func (plugin *cniNetworkPlugin) monitorNetDir() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logrus.Errorf("could not create new watcher %v", err)
plugin.notifyCniReadyListeners(err)
return
}
defer watcher.Close()
go func() {
for {
select {
case event := <-watcher.Events:
logrus.Debugf("CNI monitoring event %v", event)
if event.Op&fsnotify.Create != fsnotify.Create {
continue
}
logrus.Debugf("CNI asynchronous setting succeeded")
plugin.terminateMonitorNetDir(nil)
return
case err1 := <-watcher.Errors:
logrus.Errorf("CNI monitoring error %v", err1)
plugin.terminateMonitorNetDir(err1)
return
}
}
}()
if err = watcher.Add(plugin.pluginDir); err != nil {
logrus.Error(err)
plugin.notifyCniReadyListeners(err)
return
}
err = <-plugin.monitorNetDirChan
plugin.notifyCniReadyListeners(err)
}
// InitCNI takes the plugin directory and cni directories where the cni files should be searched for
// Returns a valid plugin object and any error
func InitCNI(pluginDir string, cniDirs ...string) (CNIPlugin, error) {
@ -44,19 +96,16 @@ func InitCNI(pluginDir string, cniDirs ...string) (CNIPlugin, error) {
// check if a default network exists, otherwise dump the CNI search and return a noop plugin
_, err = getDefaultCNINetwork(plugin.pluginDir, plugin.cniDirs, plugin.vendorCNIDirPrefix)
if err != nil {
if err == errMissingDefaultNetwork {
go plugin.monitorNetDir()
return plugin, nil
}
logrus.Warningf("Error in finding usable CNI plugin - %v", err)
// create a noop plugin instead
return &cniNoOp{}, nil
}
// sync network config from pluginDir periodically to detect network config updates
go func() {
t := time.NewTimer(10 * time.Second)
for {
plugin.syncNetworkConfig()
<-t.C
}
}()
return plugin, nil
}
@ -67,6 +116,8 @@ func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir string, cniDirs []strin
pluginDir: pluginDir,
cniDirs: cniDirs,
vendorCNIDirPrefix: vendorCNIDirPrefix,
monitorNetDirChan: make(chan error),
monitorNetDirDone: false,
}
// sync NetworkConfig in best effort during probing.
@ -87,7 +138,7 @@ func getDefaultCNINetwork(pluginDir string, cniDirs []string, vendorCNIDirPrefix
case err != nil:
return nil, err
case len(files) == 0:
return nil, fmt.Errorf("No networks found in %s", pluginDir)
return nil, errMissingDefaultNetwork
}
sort.Strings(files)
@ -163,9 +214,59 @@ func (plugin *cniNetworkPlugin) setDefaultNetwork(n *cniNetwork) {
plugin.defaultNetwork = n
}
func (plugin *cniNetworkPlugin) terminateMonitorNetDir(err error) {
plugin.Lock()
defer plugin.Unlock()
if !plugin.monitorNetDirDone {
plugin.monitorNetDirChan <- err
plugin.monitorNetDirDone = true
}
}
func (plugin *cniNetworkPlugin) addCniReadyListener() (chan error, error) {
plugin.Lock()
defer plugin.Unlock()
if plugin.defaultNetwork != nil {
return nil, errDefaultNetworkAlreadyExists
}
c := make(chan error)
plugin.cniReadyListeners = append(plugin.cniReadyListeners, c)
return c, nil
}
func (plugin *cniNetworkPlugin) removeCniReadyListener(c chan error) {
plugin.Lock()
defer plugin.Unlock()
for i, ch := range plugin.cniReadyListeners {
if c != ch {
continue
}
plugin.cniReadyListeners = append(plugin.cniReadyListeners[:i], plugin.cniReadyListeners[i+1:]...)
break
}
return
}
func (plugin *cniNetworkPlugin) notifyCniReadyListeners(err error) {
plugin.Lock()
defer plugin.Unlock()
for _, c := range plugin.cniReadyListeners {
c <- err
}
plugin.cniReadyListeners = nil
}
func (plugin *cniNetworkPlugin) checkInitialized() error {
if plugin.getDefaultNetwork() == nil {
return errors.New("cni config uninitialized")
return errMissingDefaultNetwork
}
return nil
}
@ -197,8 +298,41 @@ func (plugin *cniNetworkPlugin) setUpPod(netnsPath string, namespace string, nam
}
func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, name string, id string) error {
func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, name string, id string, cb SetUpCallback, data interface{}) error {
// First let's sync with the latest configuration files
plugin.syncNetworkConfig()
// Now we can check if we really have a default network
if err := plugin.checkInitialized(); err != nil {
if err == errMissingDefaultNetwork {
// We are missing a default network.
// Let's add ourselves to the listeners list and
// wait 30s for a new configuration file to show up.
c, err1 := plugin.addCniReadyListener()
if err1 != nil {
// The CNI default network showed up
if err1 == errDefaultNetworkAlreadyExists {
return plugin.setUpPod(netnsPath, namespace, name, id)
}
return err1
}
go func() {
select {
case err2 := <-c:
cb(data, err2)
case <-time.After(time.Second * 30):
cb(data, errMonitoringTimeout)
}
plugin.removeCniReadyListener(c)
}()
return nil
}
return err
}
@ -207,6 +341,14 @@ func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, nam
func (plugin *cniNetworkPlugin) TearDownPod(netnsPath string, namespace string, name string, id string) error {
if err := plugin.checkInitialized(); err != nil {
if err == errMissingDefaultNetwork {
// We are missing a default network but someone is still
// trying to tear us down. We will try to kill the monitoring
// thread if it's still running.
plugin.terminateMonitorNetDir(nil)
return nil
}
return err
}

View file

@ -13,6 +13,11 @@ const (
VendorCNIDirTemplate = "%s/opt/%s/bin"
)
// SetUpCallback is a callback function that the SetUpPod
// implementation can call to notify ocicni users about
// the result of asynchronous networking pod setup.
type SetUpCallback func(data interface{}, err error)
// CNIPlugin is the interface that needs to be implemented by a plugin
type CNIPlugin interface {
// Name returns the plugin's name. This will be used when searching
@ -22,7 +27,7 @@ type CNIPlugin interface {
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(netnsPath string, namespace string, name string, containerID string) error
SetUpPod(netnsPath string, namespace string, name string, containerID string, cb SetUpCallback, data interface{}) error
// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(netnsPath string, namespace string, name string, containerID string) error

View file

@ -63,6 +63,26 @@ func (s *Server) runContainer(container *oci.Container, cgroupParent string) err
return nil
}
func setNetworkReadiness(data interface{}, err error) {
s, ok := data.(*Server)
if !ok {
logrus.Errorf("invalid networking callback cookie")
return
}
if s == nil {
return
}
if err != nil {
logrus.Errorf("pod networking setup callback error %v", err)
s.runtime.SetNetworkReady(false)
return
}
s.runtime.SetNetworkReady(true)
}
// RunPodSandbox creates and runs a pod-level sandbox.
func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest) (resp *pb.RunPodSandboxResponse, err error) {
logrus.Debugf("RunPodSandboxRequest %+v", req)
@ -385,7 +405,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest
// setup the network
if !hostNetwork {
podNamespace := ""
if err = s.netPlugin.SetUpPod(netNsPath, podNamespace, id, containerName); err != nil {
if err = s.netPlugin.SetUpPod(netNsPath, podNamespace, id, containerName, setNetworkReadiness, s.runtime); err != nil {
return nil, fmt.Errorf("failed to create network for container %s in sandbox %s: %v", containerName, id, err)
}
}