From bd3b44be3eef93505347ccbe21fb5dc55c205715 Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Thu, 8 Oct 2015 11:00:24 -0400 Subject: [PATCH] Add a file-backed read-write mutex implementation The fsync package provides Mutex and RWMutex types which aim to provide the same semantics as their namesakes in the sync package, extending that locking across processes by using file locks and randomly-generated identifiers to allow processes to determine whether or not they were the last to modify a resource that's protected by the lock. Signed-off-by: Nalin Dahyabhai (github: nalind) --- fsync/README.md | 3 + fsync/fsync_shared.go | 95 +++++++++++++++++++++ fsync/fsync_test.go | 29 +++++++ fsync/fsync_unix.go | 164 +++++++++++++++++++++++++++++++++++++ fsync/fsync_unsupported.go | 70 ++++++++++++++++ 5 files changed, 361 insertions(+) create mode 100644 fsync/README.md create mode 100644 fsync/fsync_shared.go create mode 100644 fsync/fsync_test.go create mode 100644 fsync/fsync_unix.go create mode 100644 fsync/fsync_unsupported.go diff --git a/fsync/README.md b/fsync/README.md new file mode 100644 index 0000000..dd9d529 --- /dev/null +++ b/fsync/README.md @@ -0,0 +1,3 @@ +The fsync package provides a workalike for the `sync` package's `RWMutex`, +which uses a file lock on a specified file to coordinate its effects across +multiple cooperating processes. diff --git a/fsync/fsync_shared.go b/fsync/fsync_shared.go new file mode 100644 index 0000000..7f6a274 --- /dev/null +++ b/fsync/fsync_shared.go @@ -0,0 +1,95 @@ +package fsync + +import ( + "sync" +) + +// rmutex is a wrapper for a Mutex which provides Lock and Unlock methods which +// merely call the underlying Mutex's RLock and RUnlock methods. +type rmutex struct { + m *Mutex +} + +type RLocker interface { + sync.Locker + Updated() bool +} + +// RLocker returns a Locker which obtains and releases read locks on the underlying Mutex. +func (m *Mutex) RLocker() RLocker { + return &rmutex{m: m} +} + +// Lock obtains a read lock on the underlying Mutex. +func (r *rmutex) Lock() { + r.m.RLock() +} + +// Unlock releases a read lock on the underlying Mutex. +func (r *rmutex) Unlock() { + r.m.RUnlock() +} + +// Updated checks if the data protected by the lock was changed since we last +// modified it. +func (r *rmutex) Updated() bool { + return r.m.Updated() +} + +var lockMgr struct { + m sync.Mutex + locks map[string]*Mutex +} + +func init() { + lockMgr.locks = make(map[string]*Mutex) +} + +// Get returns a mutex which is tied to a lock on the specified lockfile, or nil on error. The file descriptor is kept open. +func Get(lockfile string) (*Mutex, error) { + m, err := get(lockfile) + if err != nil { + return nil, err + } + return m, nil +} + +// RLock obtains a read lock on the specified lock file, or returns an error. +func RLock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.RLock() + return nil +} + +// RUnlock releases a read lock on the specified lock file, or returns an error. +func RUnlock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.RUnlock() + return nil +} + +// Lock obtains a write lock on the specified lock file, or returns an error. +func Lock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.Lock() + return nil +} + +// Unlock releases a write lock on the specified lock file, or returns an error. +func Unlock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.Unlock() + return nil +} diff --git a/fsync/fsync_test.go b/fsync/fsync_test.go new file mode 100644 index 0000000..a8e3796 --- /dev/null +++ b/fsync/fsync_test.go @@ -0,0 +1,29 @@ +package fsync + +import ( + "testing" +) + +func TestLock(t *testing.T) { + m, err := Get("lockfile") + if err != nil { + t.Error(err) + return + } + m.RLock() + m.RUnlock() + m.Lock() + if !m.Updated() { + t.Errorf("lock appears to have been updated by us?") + return + } + m.Touch() + if m.Updated() { + t.Errorf("lock appears to have been updated by someone else") + return + } + m.Unlock() + r := m.RLocker() + r.Lock() + r.Unlock() +} diff --git a/fsync/fsync_unix.go b/fsync/fsync_unix.go new file mode 100644 index 0000000..95b810c --- /dev/null +++ b/fsync/fsync_unix.go @@ -0,0 +1,164 @@ +// +build linux + +package fsync + +import ( + "bytes" + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/Sirupsen/logrus" +) + +type holderID [16]byte + +// Mutex represents an RWMutex which synchronizes its state with a file lock, +// allowing two process which use the same lock file to share reading and +// writing locks. +type Mutex struct { + rw sync.RWMutex + m sync.Mutex + lockfile string + locked bool + lockfd int + readers, writers int + us, holder holderID +} + +// lockop obtains or clears a file lock on the specified descriptor, blocking +// and retrying indefinitely if it fails to do so for any reason. +func lockop(name string, fd, lockop int) { + err := syscall.Flock(fd, lockop) + for err != nil { + logrus.Debugf("waiting for file lock on %s", name) + time.Sleep(100 * time.Millisecond) + err = syscall.Flock(fd, lockop) + if err == nil { + logrus.Debugf("obtained file lock on %s", name) + } + } +} + +// RLock obtains a read lock on the Mutex. +func (m *Mutex) RLock() { + m.rw.RLock() + m.m.Lock() + defer m.m.Unlock() + m.readers++ + if m.readers == 1 { + lockop(m.lockfile, m.lockfd, syscall.LOCK_SH) + m.locked = true + } +} + +// RUnlock releases a read lock on the Mutex. +func (m *Mutex) RUnlock() { + m.rw.RUnlock() + m.m.Lock() + defer m.m.Unlock() + m.readers-- + if m.readers == 0 { + if !m.locked { + panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile)) + } + lockop(m.lockfile, m.lockfd, syscall.LOCK_UN) + m.locked = false + } +} + +// Lock obtains a write lock on the Mutex. +func (m *Mutex) Lock() { + m.rw.Lock() + m.m.Lock() + defer m.m.Unlock() + m.writers++ + if m.writers == 1 { + lockop(m.lockfile, m.lockfd, syscall.LOCK_EX) + m.locked = true + } +} + +// Unlock releases a write lock on the Mutex. +func (m *Mutex) Unlock() { + m.rw.Unlock() + m.m.Lock() + defer m.m.Unlock() + m.writers-- + if m.writers == 0 { + if !m.locked { + panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile)) + } + lockop(m.lockfile, m.lockfd, syscall.LOCK_UN) + m.locked = false + } +} + +// Touch updates the contents of the lock file to hold our ID, as an indicator +// that other processes can consult to see that it was this process that last +// modified the data that's protected by the lock. Should only be called with +// a write lock held. +func (m *Mutex) Touch() error { + n, err := rand.Read(m.us[:]) + if n != len(m.us) || err != nil { + return err + } + if _, err := syscall.Pwrite(m.lockfd, m.us[:], 0); err != nil { + return err + } + m.holder = m.us + return nil +} + +// Updated tells us if the last recorded ID in the lock file is different from +// ours and has changed since we last looked at it. Should only be called with +// either a read or write lock held. +func (m *Mutex) Updated() bool { + var holder holderID + if n, err := syscall.Pread(m.lockfd, holder[:], 0); err != nil && n != 0 { + return true + } + updated := bytes.Compare(holder[:], m.us[:]) != 0 && + bytes.Compare(holder[:], m.holder[:]) != 0 + m.holder = holder + return updated +} + +// get initializes a mutex, opening the lockfile. +func get(lockfile string) (*Mutex, error) { + var noHolder, us holderID + + lockMgr.m.Lock() + defer lockMgr.m.Unlock() + + if bytes.Compare(us[:], noHolder[:]) == 0 { + n, err := rand.Read(us[:]) + if n != len(us) || err != nil { + return nil, err + } + } + + name, err := filepath.Abs(lockfile) + if err != nil { + return nil, err + } + name = filepath.Clean(name) + fl, ok := lockMgr.locks[name] + if !ok { + lockfd, err := syscall.Open(name, syscall.O_CREAT|syscall.O_RDWR, syscall.S_IRUSR|syscall.S_IWUSR) + if err != nil { + return nil, err + } + syscall.CloseOnExec(lockfd) + fl = &Mutex{ + lockfd: lockfd, + lockfile: name, + us: us, + } + lockMgr.locks[name] = fl + } + return fl, nil +} diff --git a/fsync/fsync_unsupported.go b/fsync/fsync_unsupported.go new file mode 100644 index 0000000..5d7e292 --- /dev/null +++ b/fsync/fsync_unsupported.go @@ -0,0 +1,70 @@ +// +build !linux + +package fsync + +import ( + "path/filepath" + "sync" +) + +// Mutex represents an RWMutex which synchronizes its state with a file lock, +// allowing two process which use the same lock file to share reading and +// writing locks. +type Mutex struct { + rw sync.RWMutex + lockfile string +} + +// RLock obtains a read lock on the Mutex. +func (m *Mutex) RLock() { + m.rw.RLock() +} + +// RUnlock releases a read lock on the Mutex. +func (m *Mutex) RUnlock() { + m.rw.RUnlock() +} + +// Lock obtains a write lock on the Mutex. +func (m *Mutex) Lock() { + m.rw.Lock() +} + +// Unlock releases a write lock on the Mutex. +func (m *Mutex) Unlock() { + m.rw.Unlock() +} + +// Touch updates the timestamp on the lock file to signal other processes or +// threads that the data protected by the lock has been modified by someone +// else. +func (m *Mutex) Touch() error { + return nil +} + +// Updated tells us if the timestamp on the lock file is more recent than the +// last time we recorded for the lockfile. Should only be called with the lock +// held. +func (m *Mutex) Updated() bool { + return false +} + +// get initializes a mutex, opening the lockfile. +func get(lockfile string) (*Mutex, error) { + lockMgr.m.Lock() + defer lockMgr.m.Unlock() + + name, err := filepath.Abs(lockfile) + if err != nil { + return nil, err + } + name = filepath.Clean(name) + fl, ok := lockMgr.locks[name] + if !ok { + fl = &Mutex{ + lockfile: name, + } + lockMgr.locks[name] = fl + } + return fl, nil +}