diff --git a/fsync/README.md b/fsync/README.md new file mode 100644 index 0000000..dd9d529 --- /dev/null +++ b/fsync/README.md @@ -0,0 +1,3 @@ +The fsync package provides a workalike for the `sync` package's `RWMutex`, +which uses a file lock on a specified file to coordinate its effects across +multiple cooperating processes. diff --git a/fsync/fsync_shared.go b/fsync/fsync_shared.go new file mode 100644 index 0000000..7f6a274 --- /dev/null +++ b/fsync/fsync_shared.go @@ -0,0 +1,95 @@ +package fsync + +import ( + "sync" +) + +// rmutex is a wrapper for a Mutex which provides Lock and Unlock methods which +// merely call the underlying Mutex's RLock and RUnlock methods. +type rmutex struct { + m *Mutex +} + +type RLocker interface { + sync.Locker + Updated() bool +} + +// RLocker returns a Locker which obtains and releases read locks on the underlying Mutex. +func (m *Mutex) RLocker() RLocker { + return &rmutex{m: m} +} + +// Lock obtains a read lock on the underlying Mutex. +func (r *rmutex) Lock() { + r.m.RLock() +} + +// Unlock releases a read lock on the underlying Mutex. +func (r *rmutex) Unlock() { + r.m.RUnlock() +} + +// Updated checks if the data protected by the lock was changed since we last +// modified it. +func (r *rmutex) Updated() bool { + return r.m.Updated() +} + +var lockMgr struct { + m sync.Mutex + locks map[string]*Mutex +} + +func init() { + lockMgr.locks = make(map[string]*Mutex) +} + +// Get returns a mutex which is tied to a lock on the specified lockfile, or nil on error. The file descriptor is kept open. +func Get(lockfile string) (*Mutex, error) { + m, err := get(lockfile) + if err != nil { + return nil, err + } + return m, nil +} + +// RLock obtains a read lock on the specified lock file, or returns an error. +func RLock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.RLock() + return nil +} + +// RUnlock releases a read lock on the specified lock file, or returns an error. +func RUnlock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.RUnlock() + return nil +} + +// Lock obtains a write lock on the specified lock file, or returns an error. +func Lock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.Lock() + return nil +} + +// Unlock releases a write lock on the specified lock file, or returns an error. +func Unlock(lockfile string) error { + fl, err := Get(lockfile) + if err != nil { + return err + } + fl.Unlock() + return nil +} diff --git a/fsync/fsync_test.go b/fsync/fsync_test.go new file mode 100644 index 0000000..a8e3796 --- /dev/null +++ b/fsync/fsync_test.go @@ -0,0 +1,29 @@ +package fsync + +import ( + "testing" +) + +func TestLock(t *testing.T) { + m, err := Get("lockfile") + if err != nil { + t.Error(err) + return + } + m.RLock() + m.RUnlock() + m.Lock() + if !m.Updated() { + t.Errorf("lock appears to have been updated by us?") + return + } + m.Touch() + if m.Updated() { + t.Errorf("lock appears to have been updated by someone else") + return + } + m.Unlock() + r := m.RLocker() + r.Lock() + r.Unlock() +} diff --git a/fsync/fsync_unix.go b/fsync/fsync_unix.go new file mode 100644 index 0000000..95b810c --- /dev/null +++ b/fsync/fsync_unix.go @@ -0,0 +1,164 @@ +// +build linux + +package fsync + +import ( + "bytes" + "crypto/rand" + "fmt" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/Sirupsen/logrus" +) + +type holderID [16]byte + +// Mutex represents an RWMutex which synchronizes its state with a file lock, +// allowing two process which use the same lock file to share reading and +// writing locks. +type Mutex struct { + rw sync.RWMutex + m sync.Mutex + lockfile string + locked bool + lockfd int + readers, writers int + us, holder holderID +} + +// lockop obtains or clears a file lock on the specified descriptor, blocking +// and retrying indefinitely if it fails to do so for any reason. +func lockop(name string, fd, lockop int) { + err := syscall.Flock(fd, lockop) + for err != nil { + logrus.Debugf("waiting for file lock on %s", name) + time.Sleep(100 * time.Millisecond) + err = syscall.Flock(fd, lockop) + if err == nil { + logrus.Debugf("obtained file lock on %s", name) + } + } +} + +// RLock obtains a read lock on the Mutex. +func (m *Mutex) RLock() { + m.rw.RLock() + m.m.Lock() + defer m.m.Unlock() + m.readers++ + if m.readers == 1 { + lockop(m.lockfile, m.lockfd, syscall.LOCK_SH) + m.locked = true + } +} + +// RUnlock releases a read lock on the Mutex. +func (m *Mutex) RUnlock() { + m.rw.RUnlock() + m.m.Lock() + defer m.m.Unlock() + m.readers-- + if m.readers == 0 { + if !m.locked { + panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile)) + } + lockop(m.lockfile, m.lockfd, syscall.LOCK_UN) + m.locked = false + } +} + +// Lock obtains a write lock on the Mutex. +func (m *Mutex) Lock() { + m.rw.Lock() + m.m.Lock() + defer m.m.Unlock() + m.writers++ + if m.writers == 1 { + lockop(m.lockfile, m.lockfd, syscall.LOCK_EX) + m.locked = true + } +} + +// Unlock releases a write lock on the Mutex. +func (m *Mutex) Unlock() { + m.rw.Unlock() + m.m.Lock() + defer m.m.Unlock() + m.writers-- + if m.writers == 0 { + if !m.locked { + panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile)) + } + lockop(m.lockfile, m.lockfd, syscall.LOCK_UN) + m.locked = false + } +} + +// Touch updates the contents of the lock file to hold our ID, as an indicator +// that other processes can consult to see that it was this process that last +// modified the data that's protected by the lock. Should only be called with +// a write lock held. +func (m *Mutex) Touch() error { + n, err := rand.Read(m.us[:]) + if n != len(m.us) || err != nil { + return err + } + if _, err := syscall.Pwrite(m.lockfd, m.us[:], 0); err != nil { + return err + } + m.holder = m.us + return nil +} + +// Updated tells us if the last recorded ID in the lock file is different from +// ours and has changed since we last looked at it. Should only be called with +// either a read or write lock held. +func (m *Mutex) Updated() bool { + var holder holderID + if n, err := syscall.Pread(m.lockfd, holder[:], 0); err != nil && n != 0 { + return true + } + updated := bytes.Compare(holder[:], m.us[:]) != 0 && + bytes.Compare(holder[:], m.holder[:]) != 0 + m.holder = holder + return updated +} + +// get initializes a mutex, opening the lockfile. +func get(lockfile string) (*Mutex, error) { + var noHolder, us holderID + + lockMgr.m.Lock() + defer lockMgr.m.Unlock() + + if bytes.Compare(us[:], noHolder[:]) == 0 { + n, err := rand.Read(us[:]) + if n != len(us) || err != nil { + return nil, err + } + } + + name, err := filepath.Abs(lockfile) + if err != nil { + return nil, err + } + name = filepath.Clean(name) + fl, ok := lockMgr.locks[name] + if !ok { + lockfd, err := syscall.Open(name, syscall.O_CREAT|syscall.O_RDWR, syscall.S_IRUSR|syscall.S_IWUSR) + if err != nil { + return nil, err + } + syscall.CloseOnExec(lockfd) + fl = &Mutex{ + lockfd: lockfd, + lockfile: name, + us: us, + } + lockMgr.locks[name] = fl + } + return fl, nil +} diff --git a/fsync/fsync_unsupported.go b/fsync/fsync_unsupported.go new file mode 100644 index 0000000..5d7e292 --- /dev/null +++ b/fsync/fsync_unsupported.go @@ -0,0 +1,70 @@ +// +build !linux + +package fsync + +import ( + "path/filepath" + "sync" +) + +// Mutex represents an RWMutex which synchronizes its state with a file lock, +// allowing two process which use the same lock file to share reading and +// writing locks. +type Mutex struct { + rw sync.RWMutex + lockfile string +} + +// RLock obtains a read lock on the Mutex. +func (m *Mutex) RLock() { + m.rw.RLock() +} + +// RUnlock releases a read lock on the Mutex. +func (m *Mutex) RUnlock() { + m.rw.RUnlock() +} + +// Lock obtains a write lock on the Mutex. +func (m *Mutex) Lock() { + m.rw.Lock() +} + +// Unlock releases a write lock on the Mutex. +func (m *Mutex) Unlock() { + m.rw.Unlock() +} + +// Touch updates the timestamp on the lock file to signal other processes or +// threads that the data protected by the lock has been modified by someone +// else. +func (m *Mutex) Touch() error { + return nil +} + +// Updated tells us if the timestamp on the lock file is more recent than the +// last time we recorded for the lockfile. Should only be called with the lock +// held. +func (m *Mutex) Updated() bool { + return false +} + +// get initializes a mutex, opening the lockfile. +func get(lockfile string) (*Mutex, error) { + lockMgr.m.Lock() + defer lockMgr.m.Unlock() + + name, err := filepath.Abs(lockfile) + if err != nil { + return nil, err + } + name = filepath.Clean(name) + fl, ok := lockMgr.locks[name] + if !ok { + fl = &Mutex{ + lockfile: name, + } + lockMgr.locks[name] = fl + } + return fl, nil +}