From 2d5e8bce1c8335ed1e0c9fe4d702eb5ac2b978ee Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 8 Oct 2015 16:55:28 -0400 Subject: [PATCH] Fallback to file polling for jsonlog reader on err Signed-off-by: Brian Goff --- filenotify/filenotify.go | 40 ++++++++ filenotify/fsnotify.go | 18 ++++ filenotify/poller.go | 205 ++++++++++++++++++++++++++++++++++++++ filenotify/poller_test.go | 133 +++++++++++++++++++++++++ 4 files changed, 396 insertions(+) create mode 100644 filenotify/filenotify.go create mode 100644 filenotify/fsnotify.go create mode 100644 filenotify/poller.go create mode 100644 filenotify/poller_test.go diff --git a/filenotify/filenotify.go b/filenotify/filenotify.go new file mode 100644 index 0000000..e042c6b --- /dev/null +++ b/filenotify/filenotify.go @@ -0,0 +1,40 @@ +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangably in your code. +package filenotify + +import "gopkg.in/fsnotify.v1" + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +// New tries to use an fs-event watcher, and falls back to the poller if there is an error +func New() (FileWatcher, error) { + if watcher, err := NewEventWatcher(); err == nil { + return watcher, nil + } + return NewPollingWatcher(), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher() FileWatcher { + return &filePoller{ + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns an fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/filenotify/fsnotify.go b/filenotify/fsnotify.go new file mode 100644 index 0000000..4203883 --- /dev/null +++ b/filenotify/fsnotify.go @@ -0,0 +1,18 @@ +package filenotify + +import "gopkg.in/fsnotify.v1" + +// fsNotify wraps the fsnotify package to satisfy the FileNotifer interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// GetEvents returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// GetErrors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/filenotify/poller.go b/filenotify/poller.go new file mode 100644 index 0000000..a55266d --- /dev/null +++ b/filenotify/poller.go @@ -0,0 +1,205 @@ +package filenotify + +import ( + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "gopkg.in/fsnotify.v1" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchPoller is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("poller does not exist") +) + +// watchWaitTime is the time to wait between file poll loops +const watchWaitTime = 200 * time.Millisecond + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satifies the FileWatcher interface +type filePoller struct { + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed == true { + return errPollerClosed + } + + f, err := os.Open(name) + if err != nil { + return err + } + fi, err := os.Stat(name) + if err != nil { + return err + } + + if w.watches == nil { + w.watches = make(map[string]chan struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + chClose := make(chan struct{}) + w.watches[name] = chClose + + go w.watch(f, fi, chClose) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed == true { + return errPollerClosed + } + + chClose, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + close(chClose) + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + + w.closed = true + for name := range w.watches { + w.remove(name) + delete(w.watches, name) + } + close(w.events) + close(w.errors) + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event, chClose <-chan struct{}) error { + select { + case w.events <- e: + case <-chClose: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error { + select { + case w.errors <- e: + case <-chClose: + return fmt.Errorf("closed") + } + return nil +} + +// watch is responsible for polling the specified file for changes +// upon finding changes to a file or errors, sendEvent/sendErr is called +func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) { + for { + time.Sleep(watchWaitTime) + select { + case <-chClose: + logrus.Debugf("watch for %s closed", f.Name()) + return + default: + } + + fi, err := os.Stat(f.Name()) + if err != nil { + // if we got an error here and lastFi is not set, we can presume that nothing has changed + // This should be safe since before `watch()` is called, a stat is performed, there is any error `watch` is not called + if lastFi == nil { + continue + } + // If it doesn't exist at this point, it must have been removed + // no need to send the error here since this is a valid operation + if os.IsNotExist(err) { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Remove, Name: f.Name()}, chClose); err != nil { + return + } + lastFi = nil + continue + } + // at this point, send the error + if err := w.sendErr(err, chClose); err != nil { + return + } + continue + } + + if lastFi == nil { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + + if fi.Mode() != lastFi.Mode() { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Chmod, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + + if fi.ModTime() != lastFi.ModTime() || fi.Size() != lastFi.Size() { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Write, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + } +} diff --git a/filenotify/poller_test.go b/filenotify/poller_test.go new file mode 100644 index 0000000..da946d2 --- /dev/null +++ b/filenotify/poller_test.go @@ -0,0 +1,133 @@ +package filenotify + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "gopkg.in/fsnotify.v1" +) + +func TestPollerAddRemove(t *testing.T) { + w := NewPollingWatcher() + + if err := w.Add("no-such-file"); err == nil { + t.Fatal("should have gotten error when adding a non-existant file") + } + if err := w.Remove("no-such-file"); err == nil { + t.Fatal("should have gotten error when removing non-existant watch") + } + + f, err := ioutil.TempFile("", "asdf") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f.Name()) + + if err := w.Add(f.Name()); err != nil { + t.Fatal(err) + } + + if err := w.Remove(f.Name()); err != nil { + t.Fatal(err) + } +} + +func TestPollerEvent(t *testing.T) { + w := NewPollingWatcher() + + f, err := ioutil.TempFile("", "test-poller") + if err != nil { + t.Fatal("error creating temp file") + } + defer os.RemoveAll(f.Name()) + f.Close() + + if err := w.Add(f.Name()); err != nil { + t.Fatal(err) + } + + select { + case <-w.Events(): + t.Fatal("got event before anything happened") + case <-w.Errors(): + t.Fatal("got error before anything happened") + default: + } + + if err := ioutil.WriteFile(f.Name(), []byte("hello"), 644); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Write); err != nil { + t.Fatal(err) + } + + if err := os.Chmod(f.Name(), 600); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Chmod); err != nil { + t.Fatal(err) + } + + if err := os.Remove(f.Name()); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Remove); err != nil { + t.Fatal(err) + } +} + +func TestPollerClose(t *testing.T) { + w := NewPollingWatcher() + if err := w.Close(); err != nil { + t.Fatal(err) + } + // test double-close + if err := w.Close(); err != nil { + t.Fatal(err) + } + + select { + case _, open := <-w.Events(): + if open { + t.Fatal("event chan should be closed") + } + default: + t.Fatal("event chan should be closed") + } + + select { + case _, open := <-w.Errors(): + if open { + t.Fatal("errors chan should be closed") + } + default: + t.Fatal("errors chan should be closed") + } + + f, err := ioutil.TempFile("", "asdf") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f.Name()) + if err := w.Add(f.Name()); err == nil { + t.Fatal("should have gotten error adding watch for closed watcher") + } +} + +func assertEvent(w FileWatcher, eType fsnotify.Op) error { + var err error + select { + case e := <-w.Events(): + if e.Op != eType { + err = fmt.Errorf("got wrong event type, expected %q: %v", eType, e) + } + case e := <-w.Errors(): + err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e) + case <-time.After(watchWaitTime * 3): + err = fmt.Errorf("timeout waiting for event %v", eType) + } + return err +}