Fix locking via RedLock

Fixes #1777
This commit is contained in:
Joseph Schorr 2016-08-29 11:28:53 -04:00
parent 1667fd6612
commit aa7c87d765
2 changed files with 63 additions and 43 deletions

View file

@ -7,6 +7,11 @@ from app import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LockNotAcquiredException(Exception):
""" Exception raised if a GlobalLock could not be acquired. """
class GlobalLock(object): class GlobalLock(object):
""" A lock object that blocks globally via Redis. Note that Redis is not considered a tier-1 """ A lock object that blocks globally via Redis. Note that Redis is not considered a tier-1
service, so this lock should not be used for any critical code paths. service, so this lock should not be used for any critical code paths.
@ -20,17 +25,22 @@ class GlobalLock(object):
self._redlock = None self._redlock = None
def __enter__(self): def __enter__(self):
return self.lock() if not self.acquire():
raise LockNotAcquiredException()
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
self.unlock() self.release()
def lock(self): def acquire(self):
logger.debug('Acquiring global lock %s', self._lock_name) logger.debug('Acquiring global lock %s', self._lock_name)
try: try:
redlock = RedLock(self._lock_name, connection_details=[self._redis_info], redlock = RedLock(self._lock_name, connection_details=[self._redis_info],
ttl=self._lock_ttl) ttl=self._lock_ttl)
redlock.acquire() acquired = redlock.acquire()
if not acquired:
logger.debug('Was unable to not acquire lock %s', self._lock_name)
return False
self._redlock = redlock self._redlock = redlock
logger.debug('Acquired lock %s', self._lock_name) logger.debug('Acquired lock %s', self._lock_name)
return True return True
@ -41,10 +51,15 @@ class GlobalLock(object):
logger.debug('Could not connect to Redis for lock %s: %s', self._lock_name, re) logger.debug('Could not connect to Redis for lock %s: %s', self._lock_name, re)
return False return False
def release(self):
def unlock(self):
if self._redlock is not None: if self._redlock is not None:
logger.debug('Releasing lock %s', self._lock_name) logger.debug('Releasing lock %s', self._lock_name)
try:
self._redlock.release() self._redlock.release()
except RedLockError:
logger.debug('Could not release lock %s', self._lock_name)
except RedisError as re:
logger.debug('Could not connect to Redis for releasing lock %s: %s', self._lock_name, re)
logger.debug('Released lock %s', self._lock_name) logger.debug('Released lock %s', self._lock_name)
self._redlock = None self._redlock = None

View file

@ -12,7 +12,7 @@ from data.database import UseThenDisconnect
from data.model.log import (get_stale_logs, get_stale_logs_start_id, from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs) get_stale_logs_cutoff_id, delete_stale_logs)
from data.userfiles import DelegateUserfiles from data.userfiles import DelegateUserfiles
from util.locking import GlobalLock from util.locking import GlobalLock, LockNotAcquiredException
from util.streamingjsonencoder import StreamingJSONEncoder from util.streamingjsonencoder import StreamingJSONEncoder
from workers.worker import Worker from workers.worker import Worker
@ -23,7 +23,7 @@ STALE_AFTER = timedelta(days=30)
MIN_LOGS_PER_ROTATION = 10000 MIN_LOGS_PER_ROTATION = 10000
MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024
WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 24) WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12)
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION')
@ -36,8 +36,6 @@ class LogRotateWorker(Worker):
def _archive_logs(self): def _archive_logs(self):
logger.debug('Attempting to rotate log entries') logger.debug('Attempting to rotate log entries')
log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH)
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
cutoff_date = datetime.now() - STALE_AFTER cutoff_date = datetime.now() - STALE_AFTER
cutoff_id = get_stale_logs_cutoff_id(cutoff_date) cutoff_id = get_stale_logs_cutoff_id(cutoff_date)
@ -45,25 +43,30 @@ class LogRotateWorker(Worker):
logger.warning('Failed to find cutoff id') logger.warning('Failed to find cutoff id')
return return
while True: logs_archived = True
with GlobalLock('ACTION_LOG_ROTATION') as gl: while logs_archived:
if not gl: try:
logger.debug('Could not acquire global lock; sleeping') with GlobalLock('ACTION_LOG_ROTATION'):
logs_archived = self._perform_archiving(cutoff_id)
except LockNotAcquiredException:
return return
def _perform_archiving(self, cutoff_id):
log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH)
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
start_id = get_stale_logs_start_id() start_id = get_stale_logs_start_id()
if start_id is None: if start_id is None:
logger.warning('Failed to find start id') logger.warning('Failed to find start id')
return return False
logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id)
approx_count = cutoff_id - start_id approx_count = cutoff_id - start_id
if approx_count < MIN_LOGS_PER_ROTATION: if approx_count < MIN_LOGS_PER_ROTATION:
logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count)
return return False
end_id = start_id + MIN_LOGS_PER_ROTATION end_id = start_id + MIN_LOGS_PER_ROTATION
logs = [log_dict(log) for log in get_stale_logs(start_id, end_id)] logs = [log_dict(log) for log in get_stale_logs(start_id, end_id)]
@ -84,6 +87,8 @@ class LogRotateWorker(Worker):
logger.debug('Deleting logs from IDs %s to %s', start_id, end_id) logger.debug('Deleting logs from IDs %s to %s', start_id, end_id)
delete_stale_logs(start_id, end_id) delete_stale_logs(start_id, end_id)
return True
def log_dict(log): def log_dict(log):
""" Pretty prints a LogEntry in JSON. """ """ Pretty prints a LogEntry in JSON. """