vastly simplify log rotation

This commit is contained in:
Jimmy Zelinskie 2016-02-09 15:20:52 -05:00
parent aef651a7ad
commit ee705fe7a9
2 changed files with 24 additions and 38 deletions

View file

@ -133,7 +133,7 @@ def get_stale_logs_start_id():
return None return None
def get_stale_logs_end_id(cutoff_date): def get_stale_logs_cutoff_id(cutoff_date):
""" Gets the most recent ID created before the cutoff_date. """ """ Gets the most recent ID created before the cutoff_date. """
try: try:
return (LogEntry return (LogEntry

View file

@ -3,23 +3,21 @@ import json
import time import time
from datetime import timedelta, datetime from datetime import timedelta, datetime
from itertools import chain
import features import features
from app import app, storage from app import app, storage
from data.database import UseThenDisconnect from data.database import UseThenDisconnect
from data.model import db_transaction from data.model import db_transaction
from data.model.log import (get_stale_logs, get_stale_logs_start_id, from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_end_id, delete_stale_logs) get_stale_logs_cutoff_id, delete_stale_logs)
from util.registry.gzipwrap import GzipWrap from util.registry.gzipwrap import GzipWrap
from workers.globalworkerbase import GlobalWorker from workers.globalworkerbase import GlobalWorker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WORKER_FREQUENCY = 3600 WORKER_FREQUENCY = 3600 * 6
STALE_AFTER = timedelta(days=30) STALE_AFTER = timedelta(days=30)
MIN_LOGS_PER_ROTATION = 10000 MIN_LOGS_PER_ROTATION = 10000
BATCH_SIZE = 500
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION')
@ -30,47 +28,35 @@ class LogRotateWorker(GlobalWorker):
def perform_global_work(self): def perform_global_work(self):
logger.debug('Attempting to rotate log entries') logger.debug('Attempting to rotate log entries')
with UseThenDisconnect(app.config): while True:
with db_transaction(): with UseThenDisconnect(app.config):
cutoff = datetime.now() - STALE_AFTER with db_transaction():
start_id = get_stale_logs_start_id() cutoff_date = datetime.now() - STALE_AFTER
end_id = get_stale_logs_end_id(cutoff) start_id = get_stale_logs_start_id()
cutoff_id = get_stale_logs_cutoff_id(cutoff_date)
if start_id is None or end_id is None: if start_id is None or cutoff_id is None:
logger.warning('No logs to be archived.') logger.warning('No logs to be archived.')
return return
logger.debug('Found starting ID %s and ending ID %s', start_id, end_id) logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id)
approx_count = end_id - start_id approx_count = cutoff_id - start_id
if approx_count < MIN_LOGS_PER_ROTATION: if approx_count < MIN_LOGS_PER_ROTATION:
logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count)
return return
# Build a generator of all the logs we want to archive in batches of 500. end_id = start_id + MIN_LOGS_PER_ROTATION
log_generator = (_ for _ in ()) logs = (pretty_print_in_json(log)
logs_to_fetch = range(start_id, end_id+1) for log in get_stale_logs(start_id, end_id))
while len(logs_to_fetch) != 0:
(batch, logs_to_fetch) = _take(BATCH_SIZE, logs_to_fetch)
logger.debug('Reading archived logs from IDs %s to %s', batch[0], batch[-1])
batch_generator = (pretty_print_in_json(log) + '\n'
for log in get_stale_logs(batch[0], batch[-1]))
log_generator = chain(log_generator, batch_generator)
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id)
storage.stream_write(SAVE_LOCATION, filename, GzipWrap(log_generator)) storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs))
logs_to_delete = range(start_id, end_id+1) delete_stale_logs(start_id, end_id)
while len(logs_to_delete) != 0:
(batch, logs_to_delete) = _take(BATCH_SIZE, logs_to_delete)
logger.debug('Deleting archived logs from IDs %s to %s', batch[0], batch[-1])
delete_stale_logs(batch[0], batch[-1])
def _take(count, lst):
return lst[:count], lst[count:]
def pretty_print_in_json(log): def pretty_print_in_json(log):
""" Pretty prints a LogEntry in JSON. """ """ Pretty prints a LogEntry in JSON. """
return json.dumps({'kind_id': log.kind_id, return json.dumps({'kind_id': log.kind_id,