Merge pull request #1356 from jzelinskie/actionlogarchive

logrotateworker: save to storage via userfiles
This commit is contained in:
Jimmy Zelinskie 2016-04-15 13:57:11 -04:00
commit 5585e16c90
3 changed files with 46 additions and 18 deletions

View file

@ -38,7 +38,8 @@ class LogArchive(object):
handler_name = 'logarchive_handlers' handler_name = 'logarchive_handlers'
log_archive = DelegateUserfiles(app, distributed_storage, location, path, handler_name) log_archive = DelegateUserfiles(app, distributed_storage, location, path,
handler_name=handler_name)
app.add_url_rule('/logarchive/<file_id>', app.add_url_rule('/logarchive/<file_id>',
view_func=LogArchiveHandlers.as_view(handler_name, view_func=LogArchiveHandlers.as_view(handler_name,

View file

@ -1,12 +1,14 @@
import os import os
import logging import logging
import magic
import urlparse import urlparse
from uuid import uuid4 from uuid import uuid4
from _pyio import BufferedReader
import magic
from flask import url_for, request, send_file, make_response, abort from flask import url_for, request, send_file, make_response, abort
from flask.views import View from flask.views import View
from _pyio import BufferedReader
from util import get_app_url from util import get_app_url
@ -53,8 +55,12 @@ class UserfilesHandlers(View):
return self.put(file_id) return self.put(file_id)
class MissingHandlerException(Exception):
pass
class DelegateUserfiles(object): class DelegateUserfiles(object):
def __init__(self, app, distributed_storage, location, path, handler_name): def __init__(self, app, distributed_storage, location, path, handler_name=None):
self._app = app self._app = app
self._storage = distributed_storage self._storage = distributed_storage
self._locations = {location} self._locations = {location}
@ -77,6 +83,9 @@ class DelegateUserfiles(object):
url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors) url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors)
if url is None: if url is None:
if self._handler_name is None:
raise MissingHandlerException()
with self._app.app_context() as ctx: with self._app.app_context() as ctx:
ctx.url_adapter = self._build_url_adapter() ctx.url_adapter = self._build_url_adapter()
file_relative_url = url_for(self._handler_name, file_id=file_id) file_relative_url = url_for(self._handler_name, file_id=file_id)
@ -99,6 +108,9 @@ class DelegateUserfiles(object):
url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors) url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors)
if url is None: if url is None:
if self._handler_name is None:
raise MissingHandlerException()
with self._app.app_context() as ctx: with self._app.app_context() as ctx:
ctx.url_adapter = self._build_url_adapter() ctx.url_adapter = self._build_url_adapter()
file_relative_url = url_for(self._handler_name, file_id=file_id) file_relative_url = url_for(self._handler_name, file_id=file_id)
@ -125,7 +137,8 @@ class Userfiles(object):
handler_name = 'userfiles_handlers' handler_name = 'userfiles_handlers'
userfiles = DelegateUserfiles(app, distributed_storage, location, path, handler_name) userfiles = DelegateUserfiles(app, distributed_storage, location, path,
handler_name=handler_name)
app.add_url_rule('/userfiles/<file_id>', app.add_url_rule('/userfiles/<file_id>',
view_func=UserfilesHandlers.as_view(handler_name, view_func=UserfilesHandlers.as_view(handler_name,

View file

@ -3,6 +3,8 @@ import json
import time import time
from datetime import timedelta, datetime from datetime import timedelta, datetime
from gzip import GzipFile
from tempfile import SpooledTemporaryFile
import features import features
from app import app, storage from app import app, storage
@ -10,14 +12,17 @@ from data.database import UseThenDisconnect
from data.model import db_transaction from data.model import db_transaction
from data.model.log import (get_stale_logs, get_stale_logs_start_id, from data.model.log import (get_stale_logs, get_stale_logs_start_id,
get_stale_logs_cutoff_id, delete_stale_logs) get_stale_logs_cutoff_id, delete_stale_logs)
from util.registry.gzipwrap import GzipWrap from data.userfiles import DelegateUserfiles
from util.locking import GlobalLock from util.locking import GlobalLock
from util.streamingjsonencoder import StreamingJSONEncoder
from workers.worker import Worker from workers.worker import Worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
JSON_MIMETYPE = 'application/json'
STALE_AFTER = timedelta(days=30) STALE_AFTER = timedelta(days=30)
MIN_LOGS_PER_ROTATION = 10000 MIN_LOGS_PER_ROTATION = 10000
MEMORY_TEMPFILE_SIZE = 64 * 1024
WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6) WORKER_FREQUENCY = app.config.get('ACTION_LOG_ROTATION_FREQUENCY', 3600 * 6)
SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH') SAVE_PATH = app.config.get('ACTION_LOG_ARCHIVE_PATH')
@ -31,6 +36,9 @@ class LogRotateWorker(Worker):
def _archive_logs(self): def _archive_logs(self):
logger.debug('Attempting to rotate log entries') logger.debug('Attempting to rotate log entries')
log_archive = DelegateUserfiles(app, storage, SAVE_LOCATION, SAVE_PATH)
while True: while True:
with GlobalLock('ACTION_LOG_ROTATION') as gl: with GlobalLock('ACTION_LOG_ROTATION') as gl:
if not gl: if not gl:
@ -55,26 +63,32 @@ class LogRotateWorker(Worker):
return return
end_id = start_id + MIN_LOGS_PER_ROTATION end_id = start_id + MIN_LOGS_PER_ROTATION
logs = (pretty_print_in_json(log) logs_generator = (log_dict(log) for log in get_stale_logs(start_id, end_id))
for log in get_stale_logs(start_id, end_id))
logger.debug('Archiving logs from IDs %s to %s', start_id, end_id) logger.debug('Archiving logs from IDs %s to %s', start_id, end_id)
filename = '%s%d-%d.txt.gz' % (SAVE_PATH, start_id, end_id) with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
storage.stream_write(SAVE_LOCATION, filename, GzipWrap(logs)) with GzipFile('temp_action_log_rotate', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(logs_generator):
zipstream.write(chunk)
tempfile.seek(0)
filename = '%d-%d.txt.gz' % (start_id, end_id)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=filename)
with UseThenDisconnect(app.config): with UseThenDisconnect(app.config):
delete_stale_logs(start_id, end_id) delete_stale_logs(start_id, end_id)
def pretty_print_in_json(log): def log_dict(log):
""" Pretty prints a LogEntry in JSON. """ """ Pretty prints a LogEntry in JSON. """
return json.dumps({'kind_id': log.kind_id, return {'kind_id': log.kind_id,
'account_id': log.account_id, 'account_id': log.account_id,
'performer_id': log.performer_id, 'performer_id': log.performer_id,
'repository_id': log.repository_id, 'repository_id': log.repository_id,
'datetime': str(log.datetime), 'datetime': str(log.datetime),
'ip': str(log.ip), 'ip': str(log.ip),
'metadata_json': json.loads(str(log.metadata_json))}) 'metadata_json': json.loads(str(log.metadata_json))}
def main(): def main():