From c7c52e6c743e6fef0943bece71aa7e8d2f415c0b Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 8 Apr 2016 13:04:55 -0400 Subject: [PATCH 1/2] logrotateworker: save to storage via userfiles --- workers/logrotateworker.py | 39 +++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index c29b2b597..4cec6fcde 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -3,6 +3,8 @@ import json import time from datetime import timedelta, datetime +from gzip import GzipFile +from tempfile import SpooledTemporaryFile import features from app import app, storage @@ -10,14 +12,17 @@ from data.database import UseThenDisconnect from data.model import db_transaction from data.model.log import (get_stale_logs, get_stale_logs_start_id, get_stale_logs_cutoff_id, delete_stale_logs) -from util.registry.gzipwrap import GzipWrap +from data.userfiles import DelegateUserfiles from util.locking import GlobalLock +from util.streamingjsonencoder import StreamingJSONEncoder from workers.worker import Worker logger = logging.getLogger(__name__) +JSON_MIMETYPE = 'application/json' STALE_AFTER = timedelta(days=30) MIN_LOGS_PER_ROTATION = 10000 +MEMORY_TEMPFILE_SIZE = 64 * 1024 WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6) SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') @@ -31,6 +36,8 @@ class LogRotateWorker(Worker): def _archive_logs(self): logger.debug('Attempting to rotate log entries') + log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH, + 'action_log_archive_handlers') while True: with GlobalLock('ACTION_LOG_ROTATION') as gl: if not gl: @@ -55,26 +62,32 @@ class LogRotateWorker(Worker): return end_id = start_id + MIN_LOGS_PER_ROTATION - logs = (pretty_print_in_json(log) - for log in get_stale_logs(start_id, end_id)) + logs_generator = (log_dict(log) for log in get_stale_logs(start_id, end_id)) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) - filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) - storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('temp_action_log_rotate', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(logs_generator): + zipstream.write(chunk) + + tempfile.seek(0) + filename = '%d-%d.txt.gz' % (start_id, end_id) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=filename) with UseThenDisconnect(app.config): delete_stale_logs(start_id, end_id) -def pretty_print_in_json(log): +def log_dict(log): """ Pretty prints a LogEntry in JSON. """ - return json.dumps({'kind_id': log.kind_id, - 'account_id': log.account_id, - 'performer_id': log.performer_id, - 'repository_id': log.repository_id, - 'datetime': str(log.datetime), - 'ip': str(log.ip), - 'metadata_json': json.loads(str(log.metadata_json))}) + return {'kind_id': log.kind_id, + 'account_id': log.account_id, + 'performer_id': log.performer_id, + 'repository_id': log.repository_id, + 'datetime': str(log.datetime), + 'ip': str(log.ip), + 'metadata_json': json.loads(str(log.metadata_json))} def main(): From 3d190b786f625c73692db22d760e8b6e517dcf56 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 15 Apr 2016 13:51:54 -0400 Subject: [PATCH 2/2] userfiles: make handler optional --- data/archivedlogs.py | 3 ++- data/userfiles.py | 21 +++++++++++++++++---- workers/logrotateworker.py | 5 +++-- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/data/archivedlogs.py b/data/archivedlogs.py index dcd3d3a95..aec20cafc 100644 --- a/data/archivedlogs.py +++ b/data/archivedlogs.py @@ -38,7 +38,8 @@ class LogArchive(object): handler_name = 'logarchive_handlers' - log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name) + log_archive = DelegateUserfiles(app, distributed_storage, location, path, + handler_name=handler_name) app.add_url_rule('/logarchive/', view_func=LogArchiveHandlers.as_view(handler_name, diff --git a/data/userfiles.py b/data/userfiles.py index 6ad461df6..fb5e62fa7 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -1,12 +1,14 @@ import os import logging -import magic import urlparse from uuid import uuid4 +from _pyio import BufferedReader + +import magic + from flask import url_for, request, send_file, make_response, abort from flask.views import View -from _pyio import BufferedReader from util import get_app_url @@ -53,8 +55,12 @@ class UserfilesHandlers(View): return self.put(file_id) +class MissingHandlerException(Exception): + pass + + class DelegateUserfiles(object): - def __init__(self, app, distributed_storage, location, path, handler_name): + def __init__(self, app, distributed_storage, location, path, handler_name=None): self._app = app self._storage = distributed_storage self._locations = {location} @@ -77,6 +83,9 @@ class DelegateUserfiles(object): url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors) if url is None: + if self._handler_name is None: + raise MissingHandlerException() + with self._app.app_context() as ctx: ctx.url_adapter = self._build_url_adapter() file_relative_url = url_for(self._handler_name, file_id=file_id) @@ -99,6 +108,9 @@ class DelegateUserfiles(object): url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors) if url is None: + if self._handler_name is None: + raise MissingHandlerException() + with self._app.app_context() as ctx: ctx.url_adapter = self._build_url_adapter() file_relative_url = url_for(self._handler_name, file_id=file_id) @@ -125,7 +137,8 @@ class Userfiles(object): handler_name = 'userfiles_handlers' - userfiles = DelegateUserfiles(app, distributed_storage, location, path, handler_name) + userfiles = DelegateUserfiles(app, distributed_storage, location, path, + handler_name=handler_name) app.add_url_rule('/userfiles/', view_func=UserfilesHandlers.as_view(handler_name, diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index 4cec6fcde..b5028176f 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -36,8 +36,9 @@ class LogRotateWorker(Worker): def _archive_logs(self): logger.debug('Attempting to rotate log entries') - log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH, - 'action_log_archive_handlers') + + log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH) + while True: with GlobalLock('ACTION_LOG_ROTATION') as gl: if not gl: