Switch from fsync to a plain old lock
We don't need the complexity of fsync when we just want to take an exclusive lock, so replace its use with an advisory file lock. Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
This commit is contained in:
parent
bd3b44be3e
commit
b350de236d
5 changed files with 0 additions and 361 deletions
|
@ -1,3 +0,0 @@
|
||||||
The fsync package provides a workalike for the `sync` package's `RWMutex`,
|
|
||||||
which uses a file lock on a specified file to coordinate its effects across
|
|
||||||
multiple cooperating processes.
|
|
|
@ -1,95 +0,0 @@
|
||||||
package fsync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// rmutex is a wrapper for a Mutex which provides Lock and Unlock methods which
|
|
||||||
// merely call the underlying Mutex's RLock and RUnlock methods.
|
|
||||||
type rmutex struct {
|
|
||||||
m *Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
type RLocker interface {
|
|
||||||
sync.Locker
|
|
||||||
Updated() bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLocker returns a Locker which obtains and releases read locks on the underlying Mutex.
|
|
||||||
func (m *Mutex) RLocker() RLocker {
|
|
||||||
return &rmutex{m: m}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock obtains a read lock on the underlying Mutex.
|
|
||||||
func (r *rmutex) Lock() {
|
|
||||||
r.m.RLock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock releases a read lock on the underlying Mutex.
|
|
||||||
func (r *rmutex) Unlock() {
|
|
||||||
r.m.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updated checks if the data protected by the lock was changed since we last
|
|
||||||
// modified it.
|
|
||||||
func (r *rmutex) Updated() bool {
|
|
||||||
return r.m.Updated()
|
|
||||||
}
|
|
||||||
|
|
||||||
var lockMgr struct {
|
|
||||||
m sync.Mutex
|
|
||||||
locks map[string]*Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
lockMgr.locks = make(map[string]*Mutex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns a mutex which is tied to a lock on the specified lockfile, or nil on error. The file descriptor is kept open.
|
|
||||||
func Get(lockfile string) (*Mutex, error) {
|
|
||||||
m, err := get(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLock obtains a read lock on the specified lock file, or returns an error.
|
|
||||||
func RLock(lockfile string) error {
|
|
||||||
fl, err := Get(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fl.RLock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RUnlock releases a read lock on the specified lock file, or returns an error.
|
|
||||||
func RUnlock(lockfile string) error {
|
|
||||||
fl, err := Get(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fl.RUnlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock obtains a write lock on the specified lock file, or returns an error.
|
|
||||||
func Lock(lockfile string) error {
|
|
||||||
fl, err := Get(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fl.Lock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock releases a write lock on the specified lock file, or returns an error.
|
|
||||||
func Unlock(lockfile string) error {
|
|
||||||
fl, err := Get(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fl.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,29 +0,0 @@
|
||||||
package fsync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLock(t *testing.T) {
|
|
||||||
m, err := Get("lockfile")
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.RLock()
|
|
||||||
m.RUnlock()
|
|
||||||
m.Lock()
|
|
||||||
if !m.Updated() {
|
|
||||||
t.Errorf("lock appears to have been updated by us?")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.Touch()
|
|
||||||
if m.Updated() {
|
|
||||||
t.Errorf("lock appears to have been updated by someone else")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.Unlock()
|
|
||||||
r := m.RLocker()
|
|
||||||
r.Lock()
|
|
||||||
r.Unlock()
|
|
||||||
}
|
|
|
@ -1,164 +0,0 @@
|
||||||
// +build linux
|
|
||||||
|
|
||||||
package fsync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"crypto/rand"
|
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
type holderID [16]byte
|
|
||||||
|
|
||||||
// Mutex represents an RWMutex which synchronizes its state with a file lock,
|
|
||||||
// allowing two process which use the same lock file to share reading and
|
|
||||||
// writing locks.
|
|
||||||
type Mutex struct {
|
|
||||||
rw sync.RWMutex
|
|
||||||
m sync.Mutex
|
|
||||||
lockfile string
|
|
||||||
locked bool
|
|
||||||
lockfd int
|
|
||||||
readers, writers int
|
|
||||||
us, holder holderID
|
|
||||||
}
|
|
||||||
|
|
||||||
// lockop obtains or clears a file lock on the specified descriptor, blocking
|
|
||||||
// and retrying indefinitely if it fails to do so for any reason.
|
|
||||||
func lockop(name string, fd, lockop int) {
|
|
||||||
err := syscall.Flock(fd, lockop)
|
|
||||||
for err != nil {
|
|
||||||
logrus.Debugf("waiting for file lock on %s", name)
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
err = syscall.Flock(fd, lockop)
|
|
||||||
if err == nil {
|
|
||||||
logrus.Debugf("obtained file lock on %s", name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLock obtains a read lock on the Mutex.
|
|
||||||
func (m *Mutex) RLock() {
|
|
||||||
m.rw.RLock()
|
|
||||||
m.m.Lock()
|
|
||||||
defer m.m.Unlock()
|
|
||||||
m.readers++
|
|
||||||
if m.readers == 1 {
|
|
||||||
lockop(m.lockfile, m.lockfd, syscall.LOCK_SH)
|
|
||||||
m.locked = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RUnlock releases a read lock on the Mutex.
|
|
||||||
func (m *Mutex) RUnlock() {
|
|
||||||
m.rw.RUnlock()
|
|
||||||
m.m.Lock()
|
|
||||||
defer m.m.Unlock()
|
|
||||||
m.readers--
|
|
||||||
if m.readers == 0 {
|
|
||||||
if !m.locked {
|
|
||||||
panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile))
|
|
||||||
}
|
|
||||||
lockop(m.lockfile, m.lockfd, syscall.LOCK_UN)
|
|
||||||
m.locked = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock obtains a write lock on the Mutex.
|
|
||||||
func (m *Mutex) Lock() {
|
|
||||||
m.rw.Lock()
|
|
||||||
m.m.Lock()
|
|
||||||
defer m.m.Unlock()
|
|
||||||
m.writers++
|
|
||||||
if m.writers == 1 {
|
|
||||||
lockop(m.lockfile, m.lockfd, syscall.LOCK_EX)
|
|
||||||
m.locked = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock releases a write lock on the Mutex.
|
|
||||||
func (m *Mutex) Unlock() {
|
|
||||||
m.rw.Unlock()
|
|
||||||
m.m.Lock()
|
|
||||||
defer m.m.Unlock()
|
|
||||||
m.writers--
|
|
||||||
if m.writers == 0 {
|
|
||||||
if !m.locked {
|
|
||||||
panic(fmt.Sprintf("attempted to unlock %s while not locked", m.lockfile))
|
|
||||||
}
|
|
||||||
lockop(m.lockfile, m.lockfd, syscall.LOCK_UN)
|
|
||||||
m.locked = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Touch updates the contents of the lock file to hold our ID, as an indicator
|
|
||||||
// that other processes can consult to see that it was this process that last
|
|
||||||
// modified the data that's protected by the lock. Should only be called with
|
|
||||||
// a write lock held.
|
|
||||||
func (m *Mutex) Touch() error {
|
|
||||||
n, err := rand.Read(m.us[:])
|
|
||||||
if n != len(m.us) || err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := syscall.Pwrite(m.lockfd, m.us[:], 0); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.holder = m.us
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updated tells us if the last recorded ID in the lock file is different from
|
|
||||||
// ours and has changed since we last looked at it. Should only be called with
|
|
||||||
// either a read or write lock held.
|
|
||||||
func (m *Mutex) Updated() bool {
|
|
||||||
var holder holderID
|
|
||||||
if n, err := syscall.Pread(m.lockfd, holder[:], 0); err != nil && n != 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
updated := bytes.Compare(holder[:], m.us[:]) != 0 &&
|
|
||||||
bytes.Compare(holder[:], m.holder[:]) != 0
|
|
||||||
m.holder = holder
|
|
||||||
return updated
|
|
||||||
}
|
|
||||||
|
|
||||||
// get initializes a mutex, opening the lockfile.
|
|
||||||
func get(lockfile string) (*Mutex, error) {
|
|
||||||
var noHolder, us holderID
|
|
||||||
|
|
||||||
lockMgr.m.Lock()
|
|
||||||
defer lockMgr.m.Unlock()
|
|
||||||
|
|
||||||
if bytes.Compare(us[:], noHolder[:]) == 0 {
|
|
||||||
n, err := rand.Read(us[:])
|
|
||||||
if n != len(us) || err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
name, err := filepath.Abs(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
name = filepath.Clean(name)
|
|
||||||
fl, ok := lockMgr.locks[name]
|
|
||||||
if !ok {
|
|
||||||
lockfd, err := syscall.Open(name, syscall.O_CREAT|syscall.O_RDWR, syscall.S_IRUSR|syscall.S_IWUSR)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
syscall.CloseOnExec(lockfd)
|
|
||||||
fl = &Mutex{
|
|
||||||
lockfd: lockfd,
|
|
||||||
lockfile: name,
|
|
||||||
us: us,
|
|
||||||
}
|
|
||||||
lockMgr.locks[name] = fl
|
|
||||||
}
|
|
||||||
return fl, nil
|
|
||||||
}
|
|
|
@ -1,70 +0,0 @@
|
||||||
// +build !linux
|
|
||||||
|
|
||||||
package fsync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Mutex represents an RWMutex which synchronizes its state with a file lock,
|
|
||||||
// allowing two process which use the same lock file to share reading and
|
|
||||||
// writing locks.
|
|
||||||
type Mutex struct {
|
|
||||||
rw sync.RWMutex
|
|
||||||
lockfile string
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLock obtains a read lock on the Mutex.
|
|
||||||
func (m *Mutex) RLock() {
|
|
||||||
m.rw.RLock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RUnlock releases a read lock on the Mutex.
|
|
||||||
func (m *Mutex) RUnlock() {
|
|
||||||
m.rw.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock obtains a write lock on the Mutex.
|
|
||||||
func (m *Mutex) Lock() {
|
|
||||||
m.rw.Lock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock releases a write lock on the Mutex.
|
|
||||||
func (m *Mutex) Unlock() {
|
|
||||||
m.rw.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Touch updates the timestamp on the lock file to signal other processes or
|
|
||||||
// threads that the data protected by the lock has been modified by someone
|
|
||||||
// else.
|
|
||||||
func (m *Mutex) Touch() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updated tells us if the timestamp on the lock file is more recent than the
|
|
||||||
// last time we recorded for the lockfile. Should only be called with the lock
|
|
||||||
// held.
|
|
||||||
func (m *Mutex) Updated() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// get initializes a mutex, opening the lockfile.
|
|
||||||
func get(lockfile string) (*Mutex, error) {
|
|
||||||
lockMgr.m.Lock()
|
|
||||||
defer lockMgr.m.Unlock()
|
|
||||||
|
|
||||||
name, err := filepath.Abs(lockfile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
name = filepath.Clean(name)
|
|
||||||
fl, ok := lockMgr.locks[name]
|
|
||||||
if !ok {
|
|
||||||
fl = &Mutex{
|
|
||||||
lockfile: name,
|
|
||||||
}
|
|
||||||
lockMgr.locks[name] = fl
|
|
||||||
}
|
|
||||||
return fl, nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue