diff --git a/util/locking.py b/util/locking.py new file mode 100644 index 000000000..85de15125 --- /dev/null +++ b/util/locking.py @@ -0,0 +1,50 @@ +import logging + +from redis import ConnectionError +from redlock import RedLock, RedLockError + +from app import app + +logger = logging.getLogger(__name__) + +class GlobalLock(object): + """ A lock object that blocks globally via Redis. Note that Redis is not considered a tier-1 + service, so this lock should not be used for any critical code paths. + """ + def __init__(self, name, lock_ttl=600): + self._lock_name = name + self._redis_info = dict(app.config['USER_EVENTS_REDIS']) + self._redis_info.update({'socket_connect_timeout': 5, 'socket_timeout': 5}) + + self._lock_ttl = lock_ttl + self._redlock = None + + def __enter__(self): + return self.lock() + + def __exit__(self ,type, value, traceback): + self.unlock() + + def lock(self): + logger.debug('Acquiring global lock %s', self._lock_name) + try: + redlock = RedLock(self._lock_name, connection_details=[self._redis_info], + ttl=self._lock_ttl) + redlock.acquire() + self._redlock = redlock + logger.debug('Acquired lock %s', self._lock_name) + return True + except RedLockError: + logger.debug('Could not acquire lock %s', self._lock_name) + return False + except ConnectionError as ce: + logger.debug('Could not connect to Redis for lock %s: %s', self._lock_name, ce) + return False + + + def unlock(self): + if self._redlock is not None: + logger.debug('Releasing lock %s', self._lock_name) + self._redlock.release() + logger.debug('Released lock %s', self._lock_name) + self._redlock = None diff --git a/workers/globalworkerbase.py b/workers/globalworkerbase.py deleted file mode 100644 index 74cc2a905..000000000 --- a/workers/globalworkerbase.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from workers.worker import Worker -from redlock import RedLock, RedLockError - -logger = logging.getLogger(__name__) - -class GlobalWorker(Worker): - def __init__(self, app, sleep_period_seconds=3600, lock_ttl=600): - super(GlobalWorker, self).__init__() - - worker_name = self.__class__.__name__ - - self._redis_info = app.config['USER_EVENTS_REDIS'] - self._lock_name = '%s.global_lock' % (worker_name) - self._lock_ttl = lock_ttl - - self.add_operation(self._prepare_global_work, sleep_period_seconds) - - def _prepare_global_work(self): - try: - logger.debug('Acquiring lock %s', self._lock_name) - with RedLock(self._lock_name, connection_details=[self._redis_info], - ttl=self._lock_ttl): - logger.debug('Acquired lock %s, performing work', self._lock_name) - self.perform_global_work() - - logger.debug('Work complete, releasing lock %s', self._lock_name) - except RedLockError: - logger.debug('Could not acquire lock %s, going to sleep', self._lock_name) - - def perform_global_work(self): - raise NotImplementedError('Workers must implement perform_global_work.') diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index 2baa1b0ec..c29b2b597 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -11,49 +11,58 @@ from data.model import db_transaction from data.model.log import (get_stale_logs, get_stale_logs_start_id, get_stale_logs_cutoff_id, delete_stale_logs) from util.registry.gzipwrap import GzipWrap -from workers.globalworkerbase import GlobalWorker +from util.locking import GlobalLock +from workers.worker import Worker logger = logging.getLogger(__name__) -WORKER_FREQUENCY = 3600 * 6 STALE_AFTER = timedelta(days=30) MIN_LOGS_PER_ROTATION = 10000 + +WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6) SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') -class LogRotateWorker(GlobalWorker): +class LogRotateWorker(Worker): """ Worker used to rotate old logs out the database and into storage. """ def __init__(self): - super(LogRotateWorker, self).__init__(app, sleep_period_seconds=WORKER_FREQUENCY) + super(LogRotateWorker, self).__init__() + self.add_operation(self._archive_logs, WORKER_FREQUENCY) - def perform_global_work(self): + def _archive_logs(self): logger.debug('Attempting to rotate log entries') while True: - with UseThenDisconnect(app.config): - with db_transaction(): - cutoff_date = datetime.now() - STALE_AFTER - start_id = get_stale_logs_start_id() - cutoff_id = get_stale_logs_cutoff_id(cutoff_date) + with GlobalLock('ACTION_LOG_ROTATION') as gl: + if not gl: + logger.debug('Could not acquire global lock; sleeping') + return - if start_id is None or cutoff_id is None: - logger.warning('No logs to be archived.') - return + with UseThenDisconnect(app.config): + with db_transaction(): + cutoff_date = datetime.now() - STALE_AFTER + start_id = get_stale_logs_start_id() + cutoff_id = get_stale_logs_cutoff_id(cutoff_date) - logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) + if start_id is None or cutoff_id is None: + logger.warning('No logs to be archived.') + return - approx_count = cutoff_id - start_id - if approx_count < MIN_LOGS_PER_ROTATION: - logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) - return + logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) - end_id = start_id + MIN_LOGS_PER_ROTATION - logs = (pretty_print_in_json(log) - for log in get_stale_logs(start_id, end_id)) + approx_count = cutoff_id - start_id + if approx_count < MIN_LOGS_PER_ROTATION: + logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) + return - logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) - filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) - storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) + end_id = start_id + MIN_LOGS_PER_ROTATION + logs = (pretty_print_in_json(log) + for log in get_stale_logs(start_id, end_id)) + logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) + filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) + storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) + + with UseThenDisconnect(app.config): delete_stale_logs(start_id, end_id) @@ -69,7 +78,10 @@ def pretty_print_in_json(log): def main(): + logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False) + if not features.ACTION_LOG_ROTATION or None in [SAVE_PATH, SAVE_LOCATION]: + logger.debug('Action log rotation worker not enabled; skipping') while True: time.sleep(100000)