diff --git a/pkg/ocicni/noop.go b/pkg/ocicni/noop.go index 95447438..24db3132 100644 --- a/pkg/ocicni/noop.go +++ b/pkg/ocicni/noop.go @@ -7,7 +7,7 @@ func (noop *cniNoOp) Name() string { return "CNINoOp" } -func (noop *cniNoOp) SetUpPod(netnsPath string, namespace string, name string, containerID string) error { +func (noop *cniNoOp) SetUpPod(netnsPath string, namespace string, name string, containerID string, cb SetUpCallback, data interface{}) error { return nil } diff --git a/pkg/ocicni/ocicni.go b/pkg/ocicni/ocicni.go index 6ed1d0c6..fe614fc8 100644 --- a/pkg/ocicni/ocicni.go +++ b/pkg/ocicni/ocicni.go @@ -11,6 +11,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/containernetworking/cni/libcni" cnitypes "github.com/containernetworking/cni/pkg/types" + "github.com/fsnotify/fsnotify" ) type cniNetworkPlugin struct { @@ -23,6 +24,11 @@ type cniNetworkPlugin struct { pluginDir string cniDirs []string vendorCNIDirPrefix string + + cniReadyListeners []chan error + + monitorNetDirChan chan error + monitorNetDirDone bool } type cniNetwork struct { @@ -31,6 +37,52 @@ type cniNetwork struct { CNIConfig libcni.CNI } +var ( + errMissingDefaultNetwork = errors.New("Missing CNI default network") + errDefaultNetworkAlreadyExists = errors.New("CNI default network already exists") + errMonitoringTimeout = errors.New("CNI monitoring timeout") +) + +func (plugin *cniNetworkPlugin) monitorNetDir() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + logrus.Errorf("could not create new watcher %v", err) + plugin.notifyCniReadyListeners(err) + return + } + defer watcher.Close() + + go func() { + for { + select { + case event := <-watcher.Events: + logrus.Debugf("CNI monitoring event %v", event) + if event.Op&fsnotify.Create != fsnotify.Create { + continue + } + + logrus.Debugf("CNI asynchronous setting succeeded") + plugin.terminateMonitorNetDir(nil) + return + + case err1 := <-watcher.Errors: + logrus.Errorf("CNI monitoring error %v", err1) + plugin.terminateMonitorNetDir(err1) + return + } + } + }() + + if err = watcher.Add(plugin.pluginDir); err != nil { + logrus.Error(err) + plugin.notifyCniReadyListeners(err) + return + } + + err = <-plugin.monitorNetDirChan + plugin.notifyCniReadyListeners(err) +} + // InitCNI takes the plugin directory and cni directories where the cni files should be searched for // Returns a valid plugin object and any error func InitCNI(pluginDir string, cniDirs ...string) (CNIPlugin, error) { @@ -44,19 +96,16 @@ func InitCNI(pluginDir string, cniDirs ...string) (CNIPlugin, error) { // check if a default network exists, otherwise dump the CNI search and return a noop plugin _, err = getDefaultCNINetwork(plugin.pluginDir, plugin.cniDirs, plugin.vendorCNIDirPrefix) if err != nil { + if err == errMissingDefaultNetwork { + go plugin.monitorNetDir() + return plugin, nil + } + logrus.Warningf("Error in finding usable CNI plugin - %v", err) // create a noop plugin instead return &cniNoOp{}, nil } - // sync network config from pluginDir periodically to detect network config updates - go func() { - t := time.NewTimer(10 * time.Second) - for { - plugin.syncNetworkConfig() - <-t.C - } - }() return plugin, nil } @@ -67,6 +116,8 @@ func probeNetworkPluginsWithVendorCNIDirPrefix(pluginDir string, cniDirs []strin pluginDir: pluginDir, cniDirs: cniDirs, vendorCNIDirPrefix: vendorCNIDirPrefix, + monitorNetDirChan: make(chan error), + monitorNetDirDone: false, } // sync NetworkConfig in best effort during probing. @@ -87,7 +138,7 @@ func getDefaultCNINetwork(pluginDir string, cniDirs []string, vendorCNIDirPrefix case err != nil: return nil, err case len(files) == 0: - return nil, fmt.Errorf("No networks found in %s", pluginDir) + return nil, errMissingDefaultNetwork } sort.Strings(files) @@ -163,9 +214,59 @@ func (plugin *cniNetworkPlugin) setDefaultNetwork(n *cniNetwork) { plugin.defaultNetwork = n } +func (plugin *cniNetworkPlugin) terminateMonitorNetDir(err error) { + plugin.Lock() + defer plugin.Unlock() + if !plugin.monitorNetDirDone { + plugin.monitorNetDirChan <- err + plugin.monitorNetDirDone = true + } +} + +func (plugin *cniNetworkPlugin) addCniReadyListener() (chan error, error) { + plugin.Lock() + defer plugin.Unlock() + + if plugin.defaultNetwork != nil { + return nil, errDefaultNetworkAlreadyExists + } + + c := make(chan error) + plugin.cniReadyListeners = append(plugin.cniReadyListeners, c) + + return c, nil +} + +func (plugin *cniNetworkPlugin) removeCniReadyListener(c chan error) { + plugin.Lock() + defer plugin.Unlock() + + for i, ch := range plugin.cniReadyListeners { + if c != ch { + continue + } + + plugin.cniReadyListeners = append(plugin.cniReadyListeners[:i], plugin.cniReadyListeners[i+1:]...) + break + } + + return +} + +func (plugin *cniNetworkPlugin) notifyCniReadyListeners(err error) { + plugin.Lock() + defer plugin.Unlock() + + for _, c := range plugin.cniReadyListeners { + c <- err + } + + plugin.cniReadyListeners = nil +} + func (plugin *cniNetworkPlugin) checkInitialized() error { if plugin.getDefaultNetwork() == nil { - return errors.New("cni config uninitialized") + return errMissingDefaultNetwork } return nil } @@ -197,8 +298,41 @@ func (plugin *cniNetworkPlugin) setUpPod(netnsPath string, namespace string, nam } -func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, name string, id string) error { +func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, name string, id string, cb SetUpCallback, data interface{}) error { + // First let's sync with the latest configuration files + plugin.syncNetworkConfig() + + // Now we can check if we really have a default network if err := plugin.checkInitialized(); err != nil { + if err == errMissingDefaultNetwork { + // We are missing a default network. + // Let's add ourselves to the listeners list and + // wait 30s for a new configuration file to show up. + c, err1 := plugin.addCniReadyListener() + if err1 != nil { + // The CNI default network showed up + if err1 == errDefaultNetworkAlreadyExists { + return plugin.setUpPod(netnsPath, namespace, name, id) + } + + return err1 + } + + go func() { + select { + case err2 := <-c: + cb(data, err2) + + case <-time.After(time.Second * 30): + cb(data, errMonitoringTimeout) + } + + plugin.removeCniReadyListener(c) + }() + + return nil + } + return err } @@ -207,6 +341,14 @@ func (plugin *cniNetworkPlugin) SetUpPod(netnsPath string, namespace string, nam func (plugin *cniNetworkPlugin) TearDownPod(netnsPath string, namespace string, name string, id string) error { if err := plugin.checkInitialized(); err != nil { + if err == errMissingDefaultNetwork { + // We are missing a default network but someone is still + // trying to tear us down. We will try to kill the monitoring + // thread if it's still running. + plugin.terminateMonitorNetDir(nil) + return nil + } + return err } diff --git a/pkg/ocicni/types.go b/pkg/ocicni/types.go index 4483a203..d7c2990a 100644 --- a/pkg/ocicni/types.go +++ b/pkg/ocicni/types.go @@ -13,6 +13,11 @@ const ( VendorCNIDirTemplate = "%s/opt/%s/bin" ) +// SetUpCallback is a callback function that the SetUpPod +// implementation can call to notify ocicni users about +// the result of asynchronous networking pod setup. +type SetUpCallback func(data interface{}, err error) + // CNIPlugin is the interface that needs to be implemented by a plugin type CNIPlugin interface { // Name returns the plugin's name. This will be used when searching @@ -22,7 +27,7 @@ type CNIPlugin interface { // SetUpPod is the method called after the infra container of // the pod has been created but before the other containers of the // pod are launched. - SetUpPod(netnsPath string, namespace string, name string, containerID string) error + SetUpPod(netnsPath string, namespace string, name string, containerID string, cb SetUpCallback, data interface{}) error // TearDownPod is the method called before a pod's infra container will be deleted TearDownPod(netnsPath string, namespace string, name string, containerID string) error diff --git a/server/sandbox_run.go b/server/sandbox_run.go index c978582a..e40f6187 100644 --- a/server/sandbox_run.go +++ b/server/sandbox_run.go @@ -63,6 +63,26 @@ func (s *Server) runContainer(container *oci.Container, cgroupParent string) err return nil } +func setNetworkReadiness(data interface{}, err error) { + s, ok := data.(*Server) + if !ok { + logrus.Errorf("invalid networking callback cookie") + return + } + + if s == nil { + return + } + + if err != nil { + logrus.Errorf("pod networking setup callback error %v", err) + s.runtime.SetNetworkReady(false) + return + } + + s.runtime.SetNetworkReady(true) +} + // RunPodSandbox creates and runs a pod-level sandbox. func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest) (resp *pb.RunPodSandboxResponse, err error) { logrus.Debugf("RunPodSandboxRequest %+v", req) @@ -385,7 +405,7 @@ func (s *Server) RunPodSandbox(ctx context.Context, req *pb.RunPodSandboxRequest // setup the network if !hostNetwork { podNamespace := "" - if err = s.netPlugin.SetUpPod(netNsPath, podNamespace, id, containerName); err != nil { + if err = s.netPlugin.SetUpPod(netNsPath, podNamespace, id, containerName, setNetworkReadiness, s.runtime); err != nil { return nil, fmt.Errorf("failed to create network for container %s in sandbox %s: %v", containerName, id, err) } }