This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/logrotateworker.py

142 lines
4.8 KiB
Python

import logging
import json
import time
from datetime import timedelta, datetime
from gzip import GzipFile
from tempfile import SpooledTemporaryFile
import features
from app import app, storage
from data.database import UseThenDisconnect, LogEntry, LogEntry2, LogEntry3
from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs)
from data.userfiles import DelegateUserfiles
from util.locking import GlobalLock, LockNotAcquiredException
from util.log import logfile_path
from util.streamingjsonencoder import StreamingJSONEncoder
from util.timedeltastring import convert_to_timedelta
from workers.worker import Worker
logger = logging.getLogger(__name__)
JSON_MIMETYPE = 'application/json'
MIN_LOGS_PER_ROTATION = 10000
MEMORY_TEMPFILE_SIZE = 12 * 1024 * 1024
WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 60 * 60 * 12)
STALE_AFTER = convert_to_timedelta(app.config.get('ACTION_LOG_ROTATION_THRESHOLD', '30d'))
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION')
class LogRotateWorker(Worker):
""" Worker used to rotate old logs out the database and into storage. """
def __init__(self):
super(LogRotateWorker, self).__init__()
self.add_operation(self._archive_logs, WORKER_FREQUENCY)
def _archive_logs(self):
# TODO(LogMigrate): Remove the branch once we're back on a single table.
models = [LogEntry, LogEntry2, LogEntry3]
for model in models:
self._archive_logs_for_model(model)
def _archive_logs_for_model(self, model):
logger.debug('Attempting to rotate log entries')
with UseThenDisconnect(app.config):
cutoff_date = datetime.now() - STALE_AFTER
cutoff_id = get_stale_logs_cutoff_id(cutoff_date, model)
if cutoff_id is None:
logger.warning('Failed to find cutoff id')
return
logs_archived = True
while logs_archived:
try:
with GlobalLock('ACTION_LOG_ROTATION'):
logs_archived = self._perform_archiving(cutoff_id, model)
except LockNotAcquiredException:
return
def _perform_archiving(self, cutoff_id, model):
save_location = SAVE_LOCATION
if not save_location:
# Pick the *same* save location for all instances. This is a fallback if
# a location was not configured.
save_location = storage.locations[0]
log_archive = DelegateUserfiles(app, storage, save_location, SAVE_PATH)
with UseThenDisconnect(app.config):
start_id = get_stale_logs_start_id(model)
if start_id is None:
logger.warning('Failed to find start id')
return False
logger.debug('Found starting ID %s and cutoff ID %s', start_id, cutoff_id)
approx_count = cutoff_id - start_id
if approx_count < MIN_LOGS_PER_ROTATION:
logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count)
return False
end_id = start_id + MIN_LOGS_PER_ROTATION
logs = [log_dict(log) for log in get_stale_logs(start_id, end_id, model)]
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
with GzipFile('temp_action_log_rotate', fileobj=tempfile, compresslevel=1) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(logs):
zipstream.write(chunk)
tempfile.seek(0)
filename = '%d-%d-%s.txt.gz' % (start_id, end_id, model.__name__.lower())
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=filename)
logger.debug('Finished archiving logs from IDs %s to %s', start_id, end_id)
with UseThenDisconnect(app.config):
logger.debug('Deleting logs from IDs %s to %s', start_id, end_id)
delete_stale_logs(start_id, end_id, model)
return True
def log_dict(log):
""" Pretty prints a LogEntry in JSON. """
try:
metadata_json = json.loads(str(log.metadata_json))
except ValueError:
logger.exception('Could not parse metadata JSON for log entry %s', log.id)
metadata_json = {'__raw': log.metadata_json}
except TypeError:
logger.exception('Could not parse metadata JSON for log entry %s', log.id)
metadata_json = {'__raw': log.metadata_json}
return {
'kind_id': log.kind_id,
'account_id': log.account_id,
'performer_id': log.performer_id,
'repository_id': log.repository_id,
'datetime': str(log.datetime),
'ip': str(log.ip),
'metadata_json': metadata_json,
}
def main():
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if not features.LOG_EXPORT:
logger.debug('Log export not enabled; skipping')
while True:
time.sleep(100000)
worker = LogRotateWorker()
worker.start()
if __name__ == "__main__":
main()