logrotateworker: save to storage via userfiles

This commit is contained in:
Jimmy Zelinskie 2016-04-08 13:04:55 -04:00
parent 3b5b5757e3
commit c7c52e6c74

View file

@ -3,6 +3,8 @@ import json
import time import time
from datetime import timedelta, datetime from datetime import timedelta, datetime
from gzip import GzipFile
from tempfile import SpooledTemporaryFile
import features import features
from app import app, storage from app import app, storage
@ -10,14 +12,17 @@ from data.database import UseThenDisconnect
from data.model import db_transaction from data.model import db_transaction
from data.model.log import (get_stale_logs, get_stale_logs_start_id, from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs) get_stale_logs_cutoff_id, delete_stale_logs)
from util.registry.gzipwrap import GzipWrap from data.userfiles import DelegateUserfiles
from util.locking import GlobalLock from util.locking import GlobalLock
from util.streamingjsonencoder import StreamingJSONEncoder
from workers.worker import Worker from workers.worker import Worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
JSON_MIMETYPE = 'application/json'
STALE_AFTER = timedelta(days=30) STALE_AFTER = timedelta(days=30)
MIN_LOGS_PER_ROTATION = 10000 MIN_LOGS_PER_ROTATION = 10000
MEMORY_TEMPFILE_SIZE = 64 * 1024
WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6) WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6)
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
@ -31,6 +36,8 @@ class LogRotateWorker(Worker):
def _archive_logs(self): def _archive_logs(self):
logger.debug('Attempting to rotate log entries') logger.debug('Attempting to rotate log entries')
log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH,
'action_log_archive_handlers')
while True: while True:
with GlobalLock('ACTION_LOG_ROTATION') as gl: with GlobalLock('ACTION_LOG_ROTATION') as gl:
if not gl: if not gl:
@ -55,26 +62,32 @@ class LogRotateWorker(Worker):
return return
end_id = start_id + MIN_LOGS_PER_ROTATION end_id = start_id + MIN_LOGS_PER_ROTATION
logs = (pretty_print_in_json(log) logs_generator = (log_dict(log) for log in get_stale_logs(start_id, end_id))
for log in get_stale_logs(start_id, end_id))
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) with GzipFile('temp_action_log_rotate', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(logs_generator):
zipstream.write(chunk)
tempfile.seek(0)
filename = '%d-%d.txt.gz' % (start_id, end_id)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=filename)
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
delete_stale_logs(start_id, end_id) delete_stale_logs(start_id, end_id)
def pretty_print_in_json(log): def log_dict(log):
""" Pretty prints a LogEntry in JSON. """ """ Pretty prints a LogEntry in JSON. """
return json.dumps({'kind_id': log.kind_id, return {'kind_id': log.kind_id,
'account_id': log.account_id, 'account_id': log.account_id,
'performer_id': log.performer_id, 'performer_id': log.performer_id,
'repository_id': log.repository_id, 'repository_id': log.repository_id,
'datetime': str(log.datetime), 'datetime': str(log.datetime),
'ip': str(log.ip), 'ip': str(log.ip),
'metadata_json': json.loads(str(log.metadata_json))}) 'metadata_json': json.loads(str(log.metadata_json))}
def main(): def main():