From 91e9fe8050518109627e2151f0f632a77445b535 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 26 Feb 2019 14:37:42 -0500 Subject: [PATCH] Optimize the logs archiving worker to not issue extremely long queries Fixes https://jira.coreos.com/browse/QUAY-1371 --- data/model/log.py | 18 +++------ workers/logrotateworker.py | 33 +++++++-------- workers/test/test_logrotateworker.py | 60 ++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 32 deletions(-) create mode 100644 workers/test/test_logrotateworker.py diff --git a/data/model/log.py b/data/model/log.py index 3c771c409..a7f6195f2 100644 --- a/data/model/log.py +++ b/data/model/log.py @@ -151,25 +151,17 @@ def get_stale_logs_start_id(model): """ Gets the oldest log entry. """ # TODO(LogMigrate): Remove the branch once we're back on a single table. try: - return (model.select(model.id).order_by(model.id).limit(1).tuples())[0][0] + return (model.select(fn.Min(model.id)).tuples())[0][0] except IndexError: return None -def get_stale_logs_cutoff_id(cutoff_date, model): - """ Gets the most recent ID created before the cutoff_date. """ - # TODO(LogMigrate): Remove the branch once we're back on a single table. - try: - return (model.select(fn.Max(model.id)).where(model.datetime <= cutoff_date) - .tuples())[0][0] - except IndexError: - return None - - -def get_stale_logs(start_id, end_id, model): +def get_stale_logs(start_id, end_id, model, cutoff_date): """ Returns all the logs with IDs between start_id and end_id inclusively. """ # TODO(LogMigrate): Remove the branch once we're back on a single table. - return model.select().where((model.id >= start_id), (model.id <= end_id)) + return model.select().where((model.id >= start_id), + (model.id <= end_id), + model.datetime <= cutoff_date) def delete_stale_logs(start_id, end_id, model): diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index d9b60e7b9..8c3230c36 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -10,7 +10,7 @@ import features from app import app, storage from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3 from data.model.log import (get_stale_logs, get_stale_logs_start_id, - get_stale_logs_cutoff_id, delete_stale_logs) + delete_stale_logs) from data.userfiles import DelegateUserfiles from util.locking import GlobalLock, LockNotAcquiredException from util.log import logfile_path @@ -21,7 +21,7 @@ from workers.worker import Worker logger = logging.getLogger(__name__) JSON_MIMETYPE = 'application/json' -MIN_LOGS_PER_ROTATION = 10000 +MIN_LOGS_PER_ROTATION = 5000 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12) @@ -44,22 +44,16 @@ class LogRotateWorker(Worker): def _archive_logs_for_model(self, model): logger.debug('Attempting to rotate log entries') - with UseThenDisconnect(app.config): - cutoff_date = datetime.now() - STALE_AFTER - cutoff_id = get_stale_logs_cutoff_id(cutoff_date, model) - if cutoff_id is None: - logger.warning('Failed to find cutoff id') - return - logs_archived = True + cutoff_date = datetime.now() - STALE_AFTER while logs_archived: try: with GlobalLock('ACTION_LOG_ROTATION'): - logs_archived = self._perform_archiving(cutoff_id, model) + logs_archived = self._perform_archiving(model, cutoff_date) except LockNotAcquiredException: return - def _perform_archiving(self, cutoff_id, model): + def _perform_archiving(self, model, cutoff_date): save_location = SAVE_LOCATION if not save_location: # Pick the *same* save location for all instances. This is a fallback if @@ -75,20 +69,21 @@ class LogRotateWorker(Worker): logger.warning('Failed to find start id') return False - logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) + logger.debug('Found starting ID %s', start_id) + lookup_end_id = start_id + MIN_LOGS_PER_ROTATION + logs = [log for log in get_stale_logs(start_id, lookup_end_id, model, cutoff_date)] - approx_count = cutoff_id - start_id - if approx_count < MIN_LOGS_PER_ROTATION: - logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) - return False + if not logs: + logger.debug('No further logs found') + return False - end_id = start_id + MIN_LOGS_PER_ROTATION - logs = [log_dict(log) for log in get_stale_logs(start_id, end_id, model)] + end_id = max([log.id for log in logs]) + formatted_logs = [log_dict(log) for log in logs] logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream: - for chunk in StreamingJSONEncoder().iterencode(logs): + for chunk in StreamingJSONEncoder().iterencode(formatted_logs): zipstream.write(chunk) tempfile.seek(0) diff --git a/workers/test/test_logrotateworker.py b/workers/test/test_logrotateworker.py new file mode 100644 index 000000000..fda03d8f1 --- /dev/null +++ b/workers/test/test_logrotateworker.py @@ -0,0 +1,60 @@ +from datetime import datetime, timedelta + +from data import model +from data.database import LogEntry3 +from workers.logrotateworker import LogRotateWorker + +from test.fixtures import * + + +def test_logrotateworker(initialized_db): + worker = LogRotateWorker() + logs = list(LogEntry3.select()) + + # Ensure there are logs. + assert logs + + # Ensure we don't find any to archive that are new. + assert not worker._perform_archiving(LogEntry3, datetime.utcnow() - timedelta(weeks=6)) + assert len(list(LogEntry3.select())) == len(logs) + + # Archive all the logs. + assert worker._perform_archiving(LogEntry3, datetime.utcnow() + timedelta(days=1)) + + # Ensure all the logs were archived. + assert not list(LogEntry3.select()) + + +def test_logrotateworker_withcutoff(initialized_db): + # Delete all existing logs. + LogEntry3.delete().execute() + + # Create a new set of logs. + start_timestamp = datetime.now() + for day in range(0, 100): + model.log.log_action('push_repo', 'devtable', timestamp=start_timestamp + timedelta(days=day)) + + logs = list(LogEntry3.select()) + + # Sort and find a midpoint in the logs. + logs.sort(key=lambda l: l.datetime) + + midpoint = logs[0:len(logs)/2] + assert midpoint + assert len(midpoint) < len(logs) + + # Archive the earlier logs. + worker = LogRotateWorker() + assert worker._perform_archiving(LogEntry3, midpoint[-1].datetime) + + # Ensure the earlier logs were archived. + for log in midpoint: + try: + LogEntry3.get(id=log.id) + assert False, "Found unexpected log" + except LogEntry3.DoesNotExist: + pass + + # Ensure the later logs were not. + for log in logs[len(logs)/2:]: + LogEntry3.get(id=log.id)