From e1f955a3f62959e045ac15760fc99282e733c3af Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 9 Oct 2015 15:41:56 -0400 Subject: [PATCH] add a log rotation worker Fixes #609. --- conf/init/service/logrotateworker/log/run | 2 + conf/init/service/logrotateworker/run | 8 ++ config.py | 7 ++ data/model/log.py | 35 +++++++++ workers/logrotateworker.py | 95 +++++++++++++++++++++++ 5 files changed, 147 insertions(+) create mode 100755 conf/init/service/logrotateworker/log/run create mode 100755 conf/init/service/logrotateworker/run create mode 100644 workers/logrotateworker.py diff --git a/conf/init/service/logrotateworker/log/run b/conf/init/service/logrotateworker/log/run new file mode 100755 index 000000000..d8fe0054b --- /dev/null +++ b/conf/init/service/logrotateworker/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t logrotateworker diff --git a/conf/init/service/logrotateworker/run b/conf/init/service/logrotateworker/run new file mode 100755 index 000000000..a99aa6ad3 --- /dev/null +++ b/conf/init/service/logrotateworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting log rotation worker' + +cd / +venv/bin/python -m workers.logrotateworker + +echo 'Log rotation worker exited' diff --git a/config.py b/config.py index d3a53cff8..48e23973b 100644 --- a/config.py +++ b/config.py @@ -197,6 +197,9 @@ class DefaultConfig(object): # Documentation: http://pythonhosted.org/semantic_version/reference.html#semantic_version.Spec BLACKLIST_V2_SPEC = '<1.6.0' + # Feature Flag: Whether or not to rotate old action logs to storage. + FEATURE_ACTION_LOG_ROTATION = False + BUILD_MANAGER = ('enterprise', {}) DISTRIBUTED_STORAGE_CONFIG = { @@ -218,6 +221,10 @@ class DefaultConfig(object): LOG_ARCHIVE_LOCATION = 'local_us' LOG_ARCHIVE_PATH = 'logarchive/' + # Action logs archive + ACTION_LOG_ARCHIVE_LOCATION = 'local_us' + ACTION_LOG_ARCHIVE_PATH = 'actionlogarchive/' + # For enterprise: MAXIMUM_REPOSITORY_USAGE = 20 diff --git a/data/model/log.py b/data/model/log.py index 21bd1bde6..9bf11216d 100644 --- a/data/model/log.py +++ b/data/model/log.py @@ -124,3 +124,38 @@ def get_repository_usage(): .where(LogEntry.datetime >= one_month_ago) .group_by(LogEntry.ip, LogEntry.repository) .count()) + + +def get_stale_logs_start_id(): + """ Gets the oldest log entry. """ + try: + return (LogEntry + .select(LogEntry.id) + .order_by(LogEntry.id) + .limit(1) + .tuples())[0][0] + except IndexError: + return None + + +def get_stale_logs_end_id(cutoff_date): + """ Gets the most recent ID created before the cutoff_date. """ + try: + return (LogEntry + .select(LogEntry.id) + .where(LogEntry.datetime <= cutoff_date) + .order_by(LogEntry.id.desc()) + .limit(1) + .tuples())[0][0] + except IndexError: + return None + + +def get_stale_logs(start_id, end_id): + """ Returns all the logs with IDs between start_id and end_id inclusively. """ + return LogEntry.select().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)) + + +def delete_stale_logs(start_id, end_id): + """ Deletes all the logs with IDs between start_id and end_id. """ + LogEntry.delete().where((LogEntry.id >= start_id), (LogEntry.id <= end_id)).execute() diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py new file mode 100644 index 000000000..40bb690a8 --- /dev/null +++ b/workers/logrotateworker.py @@ -0,0 +1,95 @@ +import logging +import json +import time + +from datetime import timedelta, datetime +from itertools import chain + +import features +from app import app, storage +from data.database import UseThenDisconnect +from data.model import db_transaction +from data.model.log import (get_stale_logs, get_stale_logs_start_id, + get_stale_logs_end_id, delete_stale_logs) +from util.registry.gzipwrap import GzipWrap +from workers.globalworkerbase import GlobalWorker + +logger = logging.getLogger(__name__) + +WORKER_FREQUENCY = 3600 +STALE_AFTER = timedelta(days=30) +MIN_LOGS_PER_ROTATION = 10000 +BATCH_SIZE = 500 +SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') +SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') + +class LogRotateWorker(GlobalWorker): + """ Worker used to rotate old logs out the database and into storage. """ + def __init__(self): + super(LogRotateWorker, self).__init__(app, sleep_period_seconds=WORKER_FREQUENCY) + + def perform_global_work(self): + logger.debug('Attempting to rotate log entries') + with UseThenDisconnect(app.config): + with db_transaction(): + cutoff = datetime.now() - STALE_AFTER + start_id = get_stale_logs_start_id() + end_id = get_stale_logs_end_id(cutoff) + + if start_id is None or end_id is None: + logger.warning('No logs to be archived.') + return + + logger.debug('Found starting ID %s and ending ID %s', start_id, end_id) + + approx_count = end_id - start_id + if approx_count < MIN_LOGS_PER_ROTATION: + logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count) + return + + # Build a generator of all the logs we want to archive in batches of 500. + log_generator = (_ for _ in ()) + logs_to_fetch = range(start_id, end_id+1) + while len(logs_to_fetch) != 0: + (batch, logs_to_fetch) = _take(BATCH_SIZE, logs_to_fetch) + logger.debug('Reading archived logs from IDs %s to %s', batch[0], batch[-1]) + batch_generator = (pretty_print_in_json(log) + '\n' + for log in get_stale_logs(batch[0], batch[-1])) + log_generator = chain(log_generator, batch_generator) + + logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) + filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) + storage.stream_write(SAVE_LOCATION, filename, GzipWrap(log_generator)) + + logs_to_delete = range(start_id, end_id+1) + while len(logs_to_delete) != 0: + (batch, logs_to_delete) = _take(BATCH_SIZE, logs_to_delete) + logger.debug('Deleting archived logs from IDs %s to %s', batch[0], batch[-1]) + delete_stale_logs(batch[0], batch[-1]) + + +def _take(count, lst): + return lst[:count], lst[count:] + +def pretty_print_in_json(log): + """ Pretty prints a LogEntry in JSON. """ + return json.dumps({'kind_id': log.kind_id, + 'account_id': log.account_id, + 'performer_id': log.performer_id, + 'repository_id': log.repository_id, + 'datetime': str(log.datetime), + 'ip': str(log.ip), + 'metadata_json': json.loads(str(log.metadata_json))}) + + +def main(): + if not features.ACTION_LOG_ROTATION or None in [SAVE_PATH, SAVE_LOCATION]: + while True: + time.sleep(100000) + + worker = LogRotateWorker() + worker.start() + + +if __name__ == "__main__": + main()