Merge pull request #1345 from runcom/fsnotify-hooks

Fsnotify hooks
This commit is contained in:
Daniel J Walsh 2018-03-01 12:18:00 -08:00 committed by GitHub
commit 0a1ae89ba6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 20 deletions

View file

@ -166,7 +166,7 @@ func catchShutdown(gserver *grpc.Server, sserver *server.Server, hserver *http.S
gserver.GracefulStop() gserver.GracefulStop()
hserver.Shutdown(context.Background()) hserver.Shutdown(context.Background())
sserver.StopStreamServer() sserver.StopStreamServer()
sserver.StopExitMonitor() sserver.StopMonitors()
if err := sserver.Shutdown(); err != nil { if err := sserver.Shutdown(); err != nil {
logrus.Warnf("error shutting down main service %v", err) logrus.Warnf("error shutting down main service %v", err)
} }
@ -484,6 +484,9 @@ func main() {
go func() { go func() {
service.StartExitMonitor() service.StartExitMonitor()
}() }()
go func() {
service.StartHooksMonitor()
}()
m := cmux.New(lis) m := cmux.New(lis)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
@ -514,10 +517,10 @@ func main() {
}() }()
streamServerCloseCh := service.StreamingServerCloseChan() streamServerCloseCh := service.StreamingServerCloseChan()
serverExitMonitorCh := service.ExitMonitorCloseChan() serverMonitorsCh := service.MonitorsCloseChan()
select { select {
case <-streamServerCloseCh: case <-streamServerCloseCh:
case <-serverExitMonitorCh: case <-serverMonitorsCh:
case <-serverCloseCh: case <-serverCloseCh:
} }
@ -525,8 +528,8 @@ func main() {
<-streamServerCloseCh <-streamServerCloseCh
logrus.Debug("closed stream server") logrus.Debug("closed stream server")
<-serverExitMonitorCh <-serverMonitorsCh
logrus.Debug("closed exit monitor") logrus.Debug("closed monitors")
<-serverCloseCh <-serverCloseCh
logrus.Debug("closed main server") logrus.Debug("closed main server")

View file

@ -39,6 +39,7 @@ type ContainerServer struct {
podNameIndex *registrar.Registrar podNameIndex *registrar.Registrar
podIDIndex *truncindex.TruncIndex podIDIndex *truncindex.TruncIndex
hooks map[string]HookParams hooks map[string]HookParams
hooksLock sync.Mutex
imageContext *types.SystemContext imageContext *types.SystemContext
stateLock sync.Locker stateLock sync.Locker
@ -53,7 +54,41 @@ func (c *ContainerServer) Runtime() *oci.Runtime {
// Hooks returns the oci hooks for the ContainerServer // Hooks returns the oci hooks for the ContainerServer
func (c *ContainerServer) Hooks() map[string]HookParams { func (c *ContainerServer) Hooks() map[string]HookParams {
return c.hooks hooks := map[string]HookParams{}
c.hooksLock.Lock()
defer c.hooksLock.Unlock()
for key, h := range c.hooks {
hooks[key] = h
}
return hooks
}
// RemoveHook removes an hook by name
func (c *ContainerServer) RemoveHook(hook string) {
c.hooksLock.Lock()
defer c.hooksLock.Unlock()
if _, ok := c.hooks[hook]; ok {
delete(c.hooks, hook)
}
}
// AddHook adds an hook by hook's path
func (c *ContainerServer) AddHook(hookPath string) {
c.hooksLock.Lock()
defer c.hooksLock.Unlock()
hook, err := readHook(hookPath)
if err != nil {
logrus.Debugf("error while reading hook %s", hookPath)
return
}
for key, h := range c.hooks {
// hook.Hook can only be defined in one hook file, unless it has the
// same name in the override path.
if hook.Hook == h.Hook && key != filepath.Base(hookPath) {
logrus.Debugf("duplicate path, hook %q from %q already defined in %q", hook.Hook, c.config.HooksDirPath, key)
}
}
c.hooks[filepath.Base(hookPath)] = hook
} }
// Store returns the Store for the ContainerServer // Store returns the Store for the ContainerServer
@ -153,6 +188,7 @@ func New(config *Config) (*ContainerServer, error) {
} }
} }
} }
logrus.Debugf("hooks %+v", hooks)
return &ContainerServer{ return &ContainerServer{
runtime: runtime, runtime: runtime,

View file

@ -30,9 +30,16 @@ type HookParams struct {
Arguments []string `json:"arguments"` Arguments []string `json:"arguments"`
} }
var (
errNotJSON = errors.New("hook file isn't a JSON")
)
// readHook reads hooks json files, verifies it and returns the json config // readHook reads hooks json files, verifies it and returns the json config
func readHook(hookPath string) (HookParams, error) { func readHook(hookPath string) (HookParams, error) {
var hook HookParams var hook HookParams
if !strings.HasSuffix(hookPath, ".json") {
return hook, errNotJSON
}
raw, err := ioutil.ReadFile(hookPath) raw, err := ioutil.ReadFile(hookPath)
if err != nil { if err != nil {
return hook, errors.Wrapf(err, "error Reading hook %q", hookPath) return hook, errors.Wrapf(err, "error Reading hook %q", hookPath)
@ -83,11 +90,11 @@ func readHooks(hooksPath string, hooks map[string]HookParams) error {
} }
for _, file := range files { for _, file := range files {
if !strings.HasSuffix(file.Name(), ".json") {
continue
}
hook, err := readHook(filepath.Join(hooksPath, file.Name())) hook, err := readHook(filepath.Join(hooksPath, file.Name()))
if err != nil { if err != nil {
if err == errNotJSON {
continue
}
return err return err
} }
for key, h := range hooks { for key, h := range hooks {

View file

@ -66,9 +66,9 @@ type Server struct {
appArmorEnabled bool appArmorEnabled bool
appArmorProfile string appArmorProfile string
bindAddress string bindAddress string
stream streamService stream streamService
exitMonitorChan chan struct{} monitorsChan chan struct{}
} }
// StopStreamServer stops the stream server // StopStreamServer stops the stream server
@ -211,7 +211,7 @@ func New(config *Config) (*Server, error) {
seccompEnabled: seccomp.IsEnabled(), seccompEnabled: seccomp.IsEnabled(),
appArmorEnabled: apparmor.IsEnabled(), appArmorEnabled: apparmor.IsEnabled(),
appArmorProfile: config.ApparmorProfile, appArmorProfile: config.ApparmorProfile,
exitMonitorChan: make(chan struct{}), monitorsChan: make(chan struct{}),
} }
if s.seccompEnabled { if s.seccompEnabled {
@ -355,14 +355,54 @@ func (s *Server) CreateMetricsEndpoint() (*http.ServeMux, error) {
return mux, nil return mux, nil
} }
// StopExitMonitor stops the exit monitor // StopMonitors stops al the monitors
func (s *Server) StopExitMonitor() { func (s *Server) StopMonitors() {
close(s.exitMonitorChan) close(s.monitorsChan)
} }
// ExitMonitorCloseChan returns the close chan for the exit monitor // MonitorsCloseChan returns the close chan for the exit monitor
func (s *Server) ExitMonitorCloseChan() chan struct{} { func (s *Server) MonitorsCloseChan() chan struct{} {
return s.exitMonitorChan return s.monitorsChan
}
// StartHooksMonitor starts a goroutine to dynamically add hooks at runtime
func (s *Server) StartHooksMonitor() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logrus.Fatalf("Failed to create new watch: %v", err)
}
defer watcher.Close()
done := make(chan struct{})
go func() {
for {
select {
case event := <-watcher.Events:
logrus.Debugf("event: %v", event)
if event.Op&fsnotify.Remove == fsnotify.Remove {
logrus.Debugf("removing hook %s", event.Name)
s.ContainerServer.RemoveHook(filepath.Base(event.Name))
}
if event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Write == fsnotify.Write {
logrus.Debugf("adding hook %s", event.Name)
s.ContainerServer.AddHook(event.Name)
}
case err := <-watcher.Errors:
logrus.Debugf("watch error: %v", err)
close(done)
return
case <-s.monitorsChan:
logrus.Debug("closing hooks monitor...")
close(done)
return
}
}
}()
if err := watcher.Add(s.config.HooksDirPath); err != nil {
logrus.Errorf("watcher.Add(%q) failed: %s", s.config.HooksDirPath, err)
close(done)
}
<-done
} }
// StartExitMonitor start a routine that monitors container exits // StartExitMonitor start a routine that monitors container exits
@ -410,7 +450,7 @@ func (s *Server) StartExitMonitor() {
logrus.Debugf("watch error: %v", err) logrus.Debugf("watch error: %v", err)
close(done) close(done)
return return
case <-s.exitMonitorChan: case <-s.monitorsChan:
logrus.Debug("closing exit monitor...") logrus.Debug("closing exit monitor...")
close(done) close(done)
return return