diff --git a/util/locking.py b/util/locking.py index 102acaee0..62d7bb366 100644 --- a/util/locking.py +++ b/util/locking.py @@ -7,6 +7,11 @@ from app import app logger = logging.getLogger(__name__) + +class LockNotAcquiredException(Exception): + """ Exception raised if a GlobalLock could not be acquired. """ + + class GlobalLock(object): """ A lock object that blocks globally via Redis. Note that Redis is not considered a tier-1 service, so this lock should not be used for any critical code paths. @@ -20,17 +25,22 @@ class GlobalLock(object): self._redlock = None def __enter__(self): - return self.lock() + if not self.acquire(): + raise LockNotAcquiredException() - def __exit__(self ,type, value, traceback): - self.unlock() + def __exit__(self, type, value, traceback): + self.release() - def lock(self): + def acquire(self): logger.debug('Acquiring global lock %s', self._lock_name) try: redlock = RedLock(self._lock_name, connection_details=[self._redis_info], - ttl=self._lock_ttl) - redlock.acquire() + ttl=self._lock_ttl) + acquired = redlock.acquire() + if not acquired: + logger.debug('Was unable to not acquire lock %s', self._lock_name) + return False + self._redlock = redlock logger.debug('Acquired lock %s', self._lock_name) return True @@ -41,10 +51,15 @@ class GlobalLock(object): logger.debug('Could not connect to Redis for lock %s: %s', self._lock_name, re) return False - - def unlock(self): + def release(self): if self._redlock is not None: logger.debug('Releasing lock %s', self._lock_name) - self._redlock.release() + try: + self._redlock.release() + except RedLockError: + logger.debug('Could not release lock %s', self._lock_name) + except RedisError as re: + logger.debug('Could not connect to Redis for releasing lock %s: %s', self._lock_name, re) + logger.debug('Released lock %s', self._lock_name) self._redlock = None diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index 35f1a4b99..255477945 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -12,7 +12,7 @@ from data.database import UseThenDisconnect from data.model.log import (get_stale_logs, get_stale_logs_start_id, get_stale_logs_cutoff_id, delete_stale_logs) from data.userfiles import DelegateUserfiles -from util.locking import GlobalLock +from util.locking import GlobalLock, LockNotAcquiredException from util.streamingjsonencoder import StreamingJSONEncoder from workers.worker import Worker @@ -23,7 +23,7 @@ STALE_AFTER = timedelta(days=30) MIN_LOGS_PER_ROTATION = 10000 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 -WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 24) +WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12) SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') @@ -36,8 +36,6 @@ class LogRotateWorker(Worker): def _archive_logs(self): logger.debug('Attempting to rotate log entries') - log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH) - with UseThenDisconnect(app.config): cutoff_date = datetime.now() - STALE_AFTER cutoff_id = get_stale_logs_cutoff_id(cutoff_date) @@ -45,44 +43,51 @@ class LogRotateWorker(Worker): logger.warning('Failed to find cutoff id') return - while True: - with GlobalLock('ACTION_LOG_ROTATION') as gl: - if not gl: - logger.debug('Could not acquire global lock; sleeping') - return + logs_archived = True + while logs_archived: + try: + with GlobalLock('ACTION_LOG_ROTATION'): + logs_archived = self._perform_archiving(cutoff_id) + except LockNotAcquiredException: + return - with UseThenDisconnect(app.config): - start_id = get_stale_logs_start_id() + def _perform_archiving(self, cutoff_id): + log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH) - if start_id is None: - logger.warning('Failed to find start id') - return + with UseThenDisconnect(app.config): + start_id = get_stale_logs_start_id() - logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) + if start_id is None: + logger.warning('Failed to find start id') + return False - approx_count = cutoff_id - start_id - if approx_count < MIN_LOGS_PER_ROTATION: - logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) - return + logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) - end_id = start_id + MIN_LOGS_PER_ROTATION - logs = [log_dict(log) for log in get_stale_logs(start_id, end_id)] + approx_count = cutoff_id - start_id + if approx_count < MIN_LOGS_PER_ROTATION: + logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) + return False - logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) - with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: - with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream: - for chunk in StreamingJSONEncoder().iterencode(logs): - zipstream.write(chunk) + end_id = start_id + MIN_LOGS_PER_ROTATION + logs = [log_dict(log) for log in get_stale_logs(start_id, end_id)] - tempfile.seek(0) - filename = '%d-%d.txt.gz' % (start_id, end_id) - log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', - file_id=filename) - logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id) + logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(logs): + zipstream.write(chunk) - with UseThenDisconnect(app.config): - logger.debug('Deleting logs from IDs %s to %s', start_id, end_id) - delete_stale_logs(start_id, end_id) + tempfile.seek(0) + filename = '%d-%d.txt.gz' % (start_id, end_id) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=filename) + logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id) + + with UseThenDisconnect(app.config): + logger.debug('Deleting logs from IDs %s to %s', start_id, end_id) + delete_stale_logs(start_id, end_id) + + return True def log_dict(log):