This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/logrotateworker.py
Jimmy Zelinskie e1f955a3f6 add a log rotation worker
Fixes #609.
2015-12-16 17:22:28 -05:00

95 lines
3.4 KiB
Python

import logging
import json
import time
from datetime import timedelta, datetime
from itertools import chain
import features
from app import app, storage
from data.database import UseThenDisconnect
from data.model import db_transaction
from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_end_id, delete_stale_logs)
from util.registry.gzipwrap import GzipWrap
from workers.globalworkerbase import GlobalWorker
logger = logging.getLogger(__name__)
WORKER_FREQUENCY = 3600
STALE_AFTER = timedelta(days=30)
MIN_LOGS_PER_ROTATION = 10000
BATCH_SIZE = 500
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
SAVE_LOCATION = app.config.get('ACTION_LOG_ARCHIVE_LOCATION')
class LogRotateWorker(GlobalWorker):
""" Worker used to rotate old logs out the database and into storage. """
def __init__(self):
super(LogRotateWorker, self).__init__(app, sleep_period_seconds=WORKER_FREQUENCY)
def perform_global_work(self):
logger.debug('Attempting to rotate log entries')
with UseThenDisconnect(app.config):
with db_transaction():
cutoff = datetime.now() - STALE_AFTER
start_id = get_stale_logs_start_id()
end_id = get_stale_logs_end_id(cutoff)
if start_id is None or end_id is None:
logger.warning('No logs to be archived.')
return
logger.debug('Found starting ID %s and ending ID %s', start_id, end_id)
approx_count = end_id - start_id
if approx_count < MIN_LOGS_PER_ROTATION:
logger.debug('Not enough stale logs to warrant rotation (approx %d)', approx_count)
return
# Build a generator of all the logs we want to archive in batches of 500.
log_generator = (_ for _ in ())
logs_to_fetch = range(start_id, end_id+1)
while len(logs_to_fetch) != 0:
(batch, logs_to_fetch) = _take(BATCH_SIZE, logs_to_fetch)
logger.debug('Reading archived logs from IDs %s to %s', batch[0], batch[-1])
batch_generator = (pretty_print_in_json(log) + '\n'
for log in get_stale_logs(batch[0], batch[-1]))
log_generator = chain(log_generator, batch_generator)
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id)
storage.stream_write(SAVE_LOCATION, filename, GzipWrap(log_generator))
logs_to_delete = range(start_id, end_id+1)
while len(logs_to_delete) != 0:
(batch, logs_to_delete) = _take(BATCH_SIZE, logs_to_delete)
logger.debug('Deleting archived logs from IDs %s to %s', batch[0], batch[-1])
delete_stale_logs(batch[0], batch[-1])
def _take(count, lst):
return lst[:count], lst[count:]
def pretty_print_in_json(log):
""" Pretty prints a LogEntry in JSON. """
return json.dumps({'kind_id': log.kind_id,
'account_id': log.account_id,
'performer_id': log.performer_id,
'repository_id': log.repository_id,
'datetime': str(log.datetime),
'ip': str(log.ip),
'metadata_json': json.loads(str(log.metadata_json))})
def main():
if not features.ACTION_LOG_ROTATION or None in [SAVE_PATH, SAVE_LOCATION]:
while True:
time.sleep(100000)
worker = LogRotateWorker()
worker.start()
if __name__ == "__main__":
main()