Add a file-backed read-write mutex implementation

The fsync package provides Mutex and RWMutex types which aim to provide
the same semantics as their namesakes in the sync package, extending
that locking across processes by using file locks and randomly-generated
identifiers to allow processes to determine whether or not they were the
last to modify a resource that's protected by the lock.

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com> (github: nalind)
This commit is contained in:
Nalin Dahyabhai 2015-10-08 11:00:24 -04:00
parent fa358584a0
commit bd3b44be3e
5 changed files with 361 additions and 0 deletions

3
fsync/README.md Normal file
View file

@ -0,0 +1,3 @@
The fsync package provides a workalike for the `sync` package's `RWMutex`,
which uses a file lock on a specified file to coordinate its effects across
multiple cooperating processes.

95
fsync/fsync_shared.go Normal file
View file

@ -0,0 +1,95 @@
package fsync
import (
"sync"
)
// rmutex is a wrapper for a Mutex which provides Lock and Unlock methods which
// merely call the underlying Mutex's RLock and RUnlock methods.
type rmutex struct {
m *Mutex
}
type RLocker interface {
sync.Locker
Updated() bool
}
// RLocker returns a Locker which obtains and releases read locks on the underlying Mutex.
func (m *Mutex) RLocker() RLocker {
return &rmutex{m: m}
}
// Lock obtains a read lock on the underlying Mutex.
func (r *rmutex) Lock() {
r.m.RLock()
}
// Unlock releases a read lock on the underlying Mutex.
func (r *rmutex) Unlock() {
r.m.RUnlock()
}
// Updated checks if the data protected by the lock was changed since we last
// modified it.
func (r *rmutex) Updated() bool {
return r.m.Updated()
}
var lockMgr struct {
m sync.Mutex
locks map[string]*Mutex
}
func init() {
lockMgr.locks = make(map[string]*Mutex)
}
// Get returns a mutex which is tied to a lock on the specified lockfile, or nil on error. The file descriptor is kept open.
func Get(lockfile string) (*Mutex, error) {
m, err := get(lockfile)
if err != nil {
return nil, err
}
return m, nil
}
// RLock obtains a read lock on the specified lock file, or returns an error.
func RLock(lockfile string) error {
fl, err := Get(lockfile)
if err != nil {
return err
}
fl.RLock()
return nil
}
// RUnlock releases a read lock on the specified lock file, or returns an error.
func RUnlock(lockfile string) error {
fl, err := Get(lockfile)
if err != nil {
return err
}
fl.RUnlock()
return nil
}
// Lock obtains a write lock on the specified lock file, or returns an error.
func Lock(lockfile string) error {
fl, err := Get(lockfile)
if err != nil {
return err
}
fl.Lock()
return nil
}
// Unlock releases a write lock on the specified lock file, or returns an error.
func Unlock(lockfile string) error {
fl, err := Get(lockfile)
if err != nil {
return err
}
fl.Unlock()
return nil
}

29
fsync/fsync_test.go Normal file
View file

@ -0,0 +1,29 @@
package fsync
import (
"testing"
)
func TestLock(t *testing.T) {
m, err := Get("lockfile")
if err != nil {
t.Error(err)
return
}
m.RLock()
m.RUnlock()
m.Lock()
if !m.Updated() {
t.Errorf("lock appears to have been updated by us?")
return
}
m.Touch()
if m.Updated() {
t.Errorf("lock appears to have been updated by someone else")
return
}
m.Unlock()
r := m.RLocker()
r.Lock()
r.Unlock()
}

164
fsync/fsync_unix.go Normal file
View file

@ -0,0 +1,164 @@
// +build linux
package fsync
import (
"bytes"
"crypto/rand"
"fmt"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
)
type holderID [16]byte
// Mutex represents an RWMutex which synchronizes its state with a file lock,
// allowing two process which use the same lock file to share reading and
// writing locks.
type Mutex struct {
rw sync.RWMutex
m sync.Mutex
lockfile string
locked bool
lockfd int
readers, writers int
us, holder holderID
}
// lockop obtains or clears a file lock on the specified descriptor, blocking
// and retrying indefinitely if it fails to do so for any reason.
func lockop(name string, fd, lockop int) {
err := syscall.Flock(fd, lockop)
for err != nil {
logrus.Debugf("waiting for file lock on %s", name)
time.Sleep(100 * time.Millisecond)
err = syscall.Flock(fd, lockop)
if err == nil {
logrus.Debugf("obtained file lock on %s", name)
}
}
}
// RLock obtains a read lock on the Mutex.
func (m *Mutex) RLock() {
m.rw.RLock()
m.m.Lock()
defer m.m.Unlock()
m.readers++
if m.readers == 1 {
lockop(m.lockfile, m.lockfd, syscall.LOCK_SH)
m.locked = true
}
}
// RUnlock releases a read lock on the Mutex.
func (m *Mutex) RUnlock() {
m.rw.RUnlock()
m.m.Lock()
defer m.m.Unlock()
m.readers--
if m.readers == 0 {
if !m.locked {
panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile))
}
lockop(m.lockfile, m.lockfd, syscall.LOCK_UN)
m.locked = false
}
}
// Lock obtains a write lock on the Mutex.
func (m *Mutex) Lock() {
m.rw.Lock()
m.m.Lock()
defer m.m.Unlock()
m.writers++
if m.writers == 1 {
lockop(m.lockfile, m.lockfd, syscall.LOCK_EX)
m.locked = true
}
}
// Unlock releases a write lock on the Mutex.
func (m *Mutex) Unlock() {
m.rw.Unlock()
m.m.Lock()
defer m.m.Unlock()
m.writers--
if m.writers == 0 {
if !m.locked {
panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile))
}
lockop(m.lockfile, m.lockfd, syscall.LOCK_UN)
m.locked = false
}
}
// Touch updates the contents of the lock file to hold our ID, as an indicator
// that other processes can consult to see that it was this process that last
// modified the data that's protected by the lock. Should only be called with
// a write lock held.
func (m *Mutex) Touch() error {
n, err := rand.Read(m.us[:])
if n != len(m.us) || err != nil {
return err
}
if _, err := syscall.Pwrite(m.lockfd, m.us[:], 0); err != nil {
return err
}
m.holder = m.us
return nil
}
// Updated tells us if the last recorded ID in the lock file is different from
// ours and has changed since we last looked at it. Should only be called with
// either a read or write lock held.
func (m *Mutex) Updated() bool {
var holder holderID
if n, err := syscall.Pread(m.lockfd, holder[:], 0); err != nil && n != 0 {
return true
}
updated := bytes.Compare(holder[:], m.us[:]) != 0 &&
bytes.Compare(holder[:], m.holder[:]) != 0
m.holder = holder
return updated
}
// get initializes a mutex, opening the lockfile.
func get(lockfile string) (*Mutex, error) {
var noHolder, us holderID
lockMgr.m.Lock()
defer lockMgr.m.Unlock()
if bytes.Compare(us[:], noHolder[:]) == 0 {
n, err := rand.Read(us[:])
if n != len(us) || err != nil {
return nil, err
}
}
name, err := filepath.Abs(lockfile)
if err != nil {
return nil, err
}
name = filepath.Clean(name)
fl, ok := lockMgr.locks[name]
if !ok {
lockfd, err := syscall.Open(name, syscall.O_CREAT|syscall.O_RDWR, syscall.S_IRUSR|syscall.S_IWUSR)
if err != nil {
return nil, err
}
syscall.CloseOnExec(lockfd)
fl = &Mutex{
lockfd: lockfd,
lockfile: name,
us: us,
}
lockMgr.locks[name] = fl
}
return fl, nil
}

View file

@ -0,0 +1,70 @@
// +build !linux
package fsync
import (
"path/filepath"
"sync"
)
// Mutex represents an RWMutex which synchronizes its state with a file lock,
// allowing two process which use the same lock file to share reading and
// writing locks.
type Mutex struct {
rw sync.RWMutex
lockfile string
}
// RLock obtains a read lock on the Mutex.
func (m *Mutex) RLock() {
m.rw.RLock()
}
// RUnlock releases a read lock on the Mutex.
func (m *Mutex) RUnlock() {
m.rw.RUnlock()
}
// Lock obtains a write lock on the Mutex.
func (m *Mutex) Lock() {
m.rw.Lock()
}
// Unlock releases a write lock on the Mutex.
func (m *Mutex) Unlock() {
m.rw.Unlock()
}
// Touch updates the timestamp on the lock file to signal other processes or
// threads that the data protected by the lock has been modified by someone
// else.
func (m *Mutex) Touch() error {
return nil
}
// Updated tells us if the timestamp on the lock file is more recent than the
// last time we recorded for the lockfile. Should only be called with the lock
// held.
func (m *Mutex) Updated() bool {
return false
}
// get initializes a mutex, opening the lockfile.
func get(lockfile string) (*Mutex, error) {
lockMgr.m.Lock()
defer lockMgr.m.Unlock()
name, err := filepath.Abs(lockfile)
if err != nil {
return nil, err
}
name = filepath.Clean(name)
fl, ok := lockMgr.locks[name]
if !ok {
fl = &Mutex{
lockfile: name,
}
lockMgr.locks[name] = fl
}
return fl, nil
}