diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index c29b2b597..4cec6fcde 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -3,6 +3,8 @@ import json import time from datetime import timedelta, datetime +from gzip import GzipFile +from tempfile import SpooledTemporaryFile import features from app import app, storage @@ -10,14 +12,17 @@ from data.database import UseThenDisconnect from data.model import db_transaction from data.model.log import (get_stale_logs, get_stale_logs_start_id, get_stale_logs_cutoff_id, delete_stale_logs) -from util.registry.gzipwrap import GzipWrap +from data.userfiles import DelegateUserfiles from util.locking import GlobalLock +from util.streamingjsonencoder import StreamingJSONEncoder from workers.worker import Worker logger = logging.getLogger(__name__) +JSON_MIMETYPE = 'application/json' STALE_AFTER = timedelta(days=30) MIN_LOGS_PER_ROTATION = 10000 +MEMORY_TEMPFILE_SIZE = 64 * 1024 WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6) SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') @@ -31,6 +36,8 @@ class LogRotateWorker(Worker): def _archive_logs(self): logger.debug('Attempting to rotate log entries') + log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH, + 'action_log_archive_handlers') while True: with GlobalLock('ACTION_LOG_ROTATION') as gl: if not gl: @@ -55,26 +62,32 @@ class LogRotateWorker(Worker): return end_id = start_id + MIN_LOGS_PER_ROTATION - logs = (pretty_print_in_json(log) - for log in get_stale_logs(start_id, end_id)) + logs_generator = (log_dict(log) for log in get_stale_logs(start_id, end_id)) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) - filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) - storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('temp_action_log_rotate', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(logs_generator): + zipstream.write(chunk) + + tempfile.seek(0) + filename = '%d-%d.txt.gz' % (start_id, end_id) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=filename) with UseThenDisconnect(app.config): delete_stale_logs(start_id, end_id) -def pretty_print_in_json(log): +def log_dict(log): """ Pretty prints a LogEntry in JSON. """ - return json.dumps({'kind_id': log.kind_id, - 'account_id': log.account_id, - 'performer_id': log.performer_id, - 'repository_id': log.repository_id, - 'datetime': str(log.datetime), - 'ip': str(log.ip), - 'metadata_json': json.loads(str(log.metadata_json))}) + return {'kind_id': log.kind_id, + 'account_id': log.account_id, + 'performer_id': log.performer_id, + 'repository_id': log.repository_id, + 'datetime': str(log.datetime), + 'ip': str(log.ip), + 'metadata_json': json.loads(str(log.metadata_json))} def main():