From ee705fe7a9b98dc5414a6e12c78e3d51a49b5734 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 9 Feb 2016 15:20:52 -0500 Subject: [PATCH] vastly simplify log rotation --- data/model/log.py | 2 +- workers/logrotateworker.py | 60 +++++++++++++++----------------------- 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/data/model/log.py b/data/model/log.py index a2037e6f7..5b32e3fb1 100644 --- a/data/model/log.py +++ b/data/model/log.py @@ -133,7 +133,7 @@ def get_stale_logs_start_id(): return None -def get_stale_logs_end_id(cutoff_date): +def get_stale_logs_cutoff_id(cutoff_date): """ Gets the most recent ID created before the cutoff_date. """ try: return (LogEntry diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index 40bb690a8..2baa1b0ec 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -3,23 +3,21 @@ import json import time from datetime import timedelta, datetime -from itertools import chain import features from app import app, storage from data.database import UseThenDisconnect from data.model import db_transaction from data.model.log import (get_stale_logs, get_stale_logs_start_id, - get_stale_logs_end_id, delete_stale_logs) + get_stale_logs_cutoff_id, delete_stale_logs) from util.registry.gzipwrap import GzipWrap from workers.globalworkerbase import GlobalWorker logger = logging.getLogger(__name__) -WORKER_FREQUENCY = 3600 +WORKER_FREQUENCY = 3600 * 6 STALE_AFTER = timedelta(days=30) MIN_LOGS_PER_ROTATION = 10000 -BATCH_SIZE = 500 SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') @@ -30,47 +28,35 @@ class LogRotateWorker(GlobalWorker): def perform_global_work(self): logger.debug('Attempting to rotate log entries') - with UseThenDisconnect(app.config): - with db_transaction(): - cutoff = datetime.now() - STALE_AFTER - start_id = get_stale_logs_start_id() - end_id = get_stale_logs_end_id(cutoff) + while True: + with UseThenDisconnect(app.config): + with db_transaction(): + cutoff_date = datetime.now() - STALE_AFTER + start_id = get_stale_logs_start_id() + cutoff_id = get_stale_logs_cutoff_id(cutoff_date) - if start_id is None or end_id is None: - logger.warning('No logs to be archived.') - return + if start_id is None or cutoff_id is None: + logger.warning('No logs to be archived.') + return - logger.debug('Found starting ID %s and ending ID %s', start_id, end_id) + logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id) - approx_count = end_id - start_id - if approx_count < MIN_LOGS_PER_ROTATION: - logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) - return + approx_count = cutoff_id - start_id + if approx_count < MIN_LOGS_PER_ROTATION: + logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) + return - # Build a generator of all the logs we want to archive in batches of 500. - log_generator = (_ for _ in ()) - logs_to_fetch = range(start_id, end_id+1) - while len(logs_to_fetch) != 0: - (batch, logs_to_fetch) = _take(BATCH_SIZE, logs_to_fetch) - logger.debug('Reading archived logs from IDs %s to %s', batch[0], batch[-1]) - batch_generator = (pretty_print_in_json(log) + '\n' - for log in get_stale_logs(batch[0], batch[-1])) - log_generator = chain(log_generator, batch_generator) + end_id = start_id + MIN_LOGS_PER_ROTATION + logs = (pretty_print_in_json(log) + for log in get_stale_logs(start_id, end_id)) - logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) - filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) - storage.stream_write(SAVE_LOCATION, filename, GzipWrap(log_generator)) + logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) + filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) + storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) - logs_to_delete = range(start_id, end_id+1) - while len(logs_to_delete) != 0: - (batch, logs_to_delete) = _take(BATCH_SIZE, logs_to_delete) - logger.debug('Deleting archived logs from IDs %s to %s', batch[0], batch[-1]) - delete_stale_logs(batch[0], batch[-1]) + delete_stale_logs(start_id, end_id) -def _take(count, lst): - return lst[:count], lst[count:] - def pretty_print_in_json(log): """ Pretty prints a LogEntry in JSON. """ return json.dumps({'kind_id': log.kind_id,