import logging import json import time from datetime import timedelta, datetime from gzip import GzipFile from tempfile import SpooledTemporaryFile import features from app import app, storage from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3 from data.model.log import (get_stale_logs, get_stale_logs_start_id, delete_stale_logs) from data.userfiles import DelegateUserfiles from util.locking import GlobalLock, LockNotAcquiredException from util.log import logfile_path from util.streamingjsonencoder import StreamingJSONEncoder from util.timedeltastring import convert_to_timedelta from workers.worker import Worker logger = logging.getLogger(__name__) JSON_MIMETYPE = 'application/json' MIN_LOGS_PER_ROTATION = 5000 MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024 WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12) STALE_AFTER = convert_to_timedelta(app.config.get('ACTION_LOG_ROTATION_THRESHOLD', '30d')) SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION') class LogRotateWorker(Worker): """ Worker used to rotate old logs out the database and into storage. """ def __init__(self): super(LogRotateWorker, self).__init__() self.add_operation(self._archive_logs, WORKER_FREQUENCY) def _archive_logs(self): # TODO(LogMigrate): Remove the branch once we're back on a single table. models = [LogEntry, LogEntry2, LogEntry3] for model in models: self._archive_logs_for_model(model) def _archive_logs_for_model(self, model): logger.debug('Attempting to rotate log entries') logs_archived = True cutoff_date = datetime.now() - STALE_AFTER while logs_archived: try: with GlobalLock('ACTION_LOG_ROTATION'): logs_archived = self._perform_archiving(model, cutoff_date) except LockNotAcquiredException: return def _perform_archiving(self, model, cutoff_date): save_location = SAVE_LOCATION if not save_location: # Pick the *same* save location for all instances. This is a fallback if # a location was not configured. save_location = storage.locations[0] log_archive = DelegateUserfiles(app, storage, save_location, SAVE_PATH) with UseThenDisconnect(app.config): start_id = get_stale_logs_start_id(model) if start_id is None: logger.warning('Failed to find start id') return False logger.debug('Found starting ID %s', start_id) lookup_end_id = start_id + MIN_LOGS_PER_ROTATION logs = [log for log in get_stale_logs(start_id, lookup_end_id, model, cutoff_date)] if not logs: logger.debug('No further logs found') return False end_id = max([log.id for log in logs]) formatted_logs = [log_dict(log) for log in logs] logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream: for chunk in StreamingJSONEncoder().iterencode(formatted_logs): zipstream.write(chunk) tempfile.seek(0) filename = '%d-%d-%s.txt.gz' % (start_id, end_id, model.__name__.lower()) log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', file_id=filename) logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id) with UseThenDisconnect(app.config): logger.debug('Deleting logs from IDs %s to %s', start_id, end_id) delete_stale_logs(start_id, end_id, model) return True def log_dict(log): """ Pretty prints a LogEntry in JSON. """ try: metadata_json = json.loads(str(log.metadata_json)) except ValueError: logger.exception('Could not parse metadata JSON for log entry %s', log.id) metadata_json = {'__raw': log.metadata_json} except TypeError: logger.exception('Could not parse metadata JSON for log entry %s', log.id) metadata_json = {'__raw': log.metadata_json} return { 'kind_id': log.kind_id, 'account_id': log.account_id, 'performer_id': log.performer_id, 'repository_id': log.repository_id, 'datetime': str(log.datetime), 'ip': str(log.ip), 'metadata_json': metadata_json, } def main(): logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) if not features.LOG_EXPORT: logger.debug('Log export not enabled; skipping') while True: time.sleep(100000) worker = LogRotateWorker() worker.start() if __name__ == "__main__": main()