Implement a worker for batch exporting of usage logs
This will allow customers to request their usage logs for a repository or an entire namespace, and we can export the logs in a manner that doesn't absolutely destroy the database, with every step along the way timed.
This commit is contained in:
parent
b8d2e1be9c
commit
8a212728a3
18 changed files with 768 additions and 15 deletions
283
workers/exportactionlogsworker.py
Normal file
283
workers/exportactionlogsworker.py
Normal file
|
@ -0,0 +1,283 @@
|
|||
import logging
|
||||
import os.path
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from io import BytesIO
|
||||
|
||||
from enum import Enum, unique
|
||||
|
||||
from app import app, export_action_logs_queue, storage, get_app_url
|
||||
from data import model
|
||||
from endpoints.api import format_date
|
||||
from endpoints.api.logs_models_pre_oci import create_log
|
||||
from workers.queueworker import QueueWorker, JobException
|
||||
from util.log import logfile_path
|
||||
from util.useremails import send_logs_exported_email
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
POLL_PERIOD_SECONDS = 1
|
||||
|
||||
EXPORT_LOGS_STORAGE_PATH = app.config.get('EXPORT_ACTION_LOGS_STORAGE_PATH', 'exportedactionlogs')
|
||||
MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS', 60 * 60) # 1 hour
|
||||
MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30)
|
||||
EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour
|
||||
|
||||
MINIMUM_RANGE_SIZE = 1000
|
||||
MAXIMUM_RANGE_SIZE = 100000
|
||||
EXPECTED_ITERATION_LOG_COUNT = 1000
|
||||
|
||||
|
||||
@unique
|
||||
class ExportResult(Enum):
|
||||
# NOTE: Make sure to handle these in `logsexported.html` in `emails`
|
||||
INVALID_REQUEST = 'invalidrequest'
|
||||
OPERATION_TIMEDOUT = 'timedout'
|
||||
SUCCESSFUL_EXPORT = 'success'
|
||||
|
||||
|
||||
class ExportActionLogsWorker(QueueWorker):
|
||||
""" Worker which exports action logs for a namespace or a repository based on
|
||||
a queued request from the API.
|
||||
"""
|
||||
def process_queue_item(self, job_details):
|
||||
logger.info('Got export actions logs queue item: %s', job_details)
|
||||
|
||||
# job_details block (as defined in the logs.py API endpoint):
|
||||
# {
|
||||
# 'export_id': export_id,
|
||||
# 'repository_id': repository.id or None,
|
||||
# 'namespace_id': namespace.id,
|
||||
# 'namespace_name': namespace.username,
|
||||
# 'repository_name': repository.name or None,
|
||||
# 'start_time': start_time,
|
||||
# 'end_time': end_time,
|
||||
# 'callback_url': callback_url or None,
|
||||
# 'callback_email': callback_email or None,
|
||||
# }
|
||||
export_id = job_details['export_id']
|
||||
|
||||
start_time = _parse_time(job_details['start_time'])
|
||||
end_time = _parse_time(job_details['end_time'])
|
||||
|
||||
# Make sure the end time has the whole day.
|
||||
if start_time is None or end_time is None:
|
||||
self._report_results(job_details, ExportResult.INVALID_REQUEST)
|
||||
return
|
||||
|
||||
end_time = end_time + timedelta(days=1) - timedelta(milliseconds=1)
|
||||
|
||||
# Select the minimum and maximum IDs for the logs for the repository/namespace
|
||||
# over the time range.
|
||||
namespace_id = job_details['namespace_id']
|
||||
repository_id = job_details['repository_id']
|
||||
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
|
||||
|
||||
min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time,
|
||||
repository_id,
|
||||
namespace_id))
|
||||
if elapsed > max_query_time:
|
||||
logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`',
|
||||
export_id, elapsed)
|
||||
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
|
||||
return
|
||||
|
||||
max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time,
|
||||
repository_id,
|
||||
namespace_id))
|
||||
if elapsed > max_query_time:
|
||||
logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`',
|
||||
export_id, elapsed)
|
||||
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
|
||||
return
|
||||
|
||||
min_id = min_id or 1
|
||||
max_id = max_id or 1
|
||||
|
||||
logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id)
|
||||
|
||||
# Generate a file key so that if we return an API URL, it cannot simply be constructed from
|
||||
# just the export ID.
|
||||
file_key = str(uuid.uuid4())
|
||||
exported_filename = '%s-%s' % (export_id, file_key)
|
||||
|
||||
# Start a chunked upload for the logs and stream them.
|
||||
upload_id, upload_metadata = storage.initiate_chunked_upload(storage.preferred_locations)
|
||||
export_storage_path = os.path.join(EXPORT_LOGS_STORAGE_PATH, exported_filename)
|
||||
logger.debug('Starting chunked upload to path `%s`', export_storage_path)
|
||||
|
||||
# Start with a 'json' header that contains the opening bracket, as well as basic
|
||||
# information and the start of the `logs` array.
|
||||
details = {
|
||||
'start_time': format_date(start_time),
|
||||
'end_time': format_date(end_time),
|
||||
'namespace': job_details['namespace_name'],
|
||||
'repository': job_details['repository_name'],
|
||||
}
|
||||
|
||||
prefix_data = """{
|
||||
"export_id": "%s",
|
||||
"details": %s,
|
||||
"logs": [
|
||||
""" % (export_id, json.dumps(details))
|
||||
|
||||
upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, 0, -1,
|
||||
BytesIO(str(prefix_data)), upload_metadata)
|
||||
uploaded_byte_count = len(prefix_data)
|
||||
|
||||
try:
|
||||
# Stream the logs to storage as chunks.
|
||||
updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata,
|
||||
uploaded_byte_count, min_id, max_id,
|
||||
job_details)
|
||||
if updated_metadata is None:
|
||||
storage.cancel_chunked_upload(upload_id, upload_metadata)
|
||||
return
|
||||
|
||||
# Close the JSON block.
|
||||
suffix_data = """
|
||||
{"terminator": true}]
|
||||
}"""
|
||||
|
||||
upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id,
|
||||
uploaded_byte_count, -1,
|
||||
BytesIO(str(suffix_data)),
|
||||
upload_metadata)
|
||||
if updated_metadata is None:
|
||||
storage.cancel_chunked_upload(upload_id, upload_metadata)
|
||||
return
|
||||
|
||||
# Complete the upload.
|
||||
storage.complete_chunked_upload(storage.preferred_locations, upload_id, export_storage_path,
|
||||
updated_metadata)
|
||||
except:
|
||||
logger.exception('Exception when exporting logs for `%s`', export_id)
|
||||
storage.cancel_chunked_upload(storage.preferred_locations, upload_id, upload_metadata)
|
||||
raise JobException
|
||||
|
||||
# Invoke the callbacks.
|
||||
export_url = storage.get_direct_download_url(storage.preferred_locations, export_storage_path,
|
||||
expires_in=EXPORTED_LOGS_EXPIRATION_SECONDS)
|
||||
if export_url is None:
|
||||
export_url = '%s/exportedlogs/%s' % (get_app_url(), exported_filename)
|
||||
|
||||
self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url)
|
||||
|
||||
def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id,
|
||||
job_details):
|
||||
export_id = job_details['export_id']
|
||||
max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS)
|
||||
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
|
||||
kinds = model.log.get_log_entry_kinds()
|
||||
|
||||
# Using an adjusting scale, start downloading log rows in batches, starting at
|
||||
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
|
||||
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
|
||||
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
|
||||
batch_start_time = datetime.utcnow()
|
||||
|
||||
current_start_id = min_id
|
||||
current_batch_size = MINIMUM_RANGE_SIZE
|
||||
|
||||
while current_start_id <= max_id:
|
||||
# Verify we haven't been working for too long.
|
||||
work_elapsed = datetime.utcnow() - batch_start_time
|
||||
if work_elapsed > max_work_period:
|
||||
logger.error('Retrieval of logs `%s` timed out with time of `%s`',
|
||||
export_id, work_elapsed)
|
||||
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
|
||||
return None, None
|
||||
|
||||
id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)]
|
||||
|
||||
# Load the next set of logs.
|
||||
def retrieve_and_write_logs():
|
||||
namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None
|
||||
repository_id = job_details.get('repository_id')
|
||||
logger.debug('Retrieving logs over range %s with namespace %s and repository %s',
|
||||
id_range, namespace_id, repository_id)
|
||||
|
||||
logs_query = model.log.get_logs_query(namespace=namespace_id,
|
||||
repository=repository_id,
|
||||
id_range=id_range)
|
||||
return [create_log(log) for log in logs_query]
|
||||
|
||||
logs, elapsed = _run_and_time(retrieve_and_write_logs)
|
||||
if elapsed > max_query_time:
|
||||
logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`',
|
||||
export_id, id_range, elapsed)
|
||||
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
|
||||
return None, None
|
||||
|
||||
# Write the logs to storage.
|
||||
logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range)
|
||||
if logs:
|
||||
logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ','
|
||||
logs_data = logs_data.encode('utf-8')
|
||||
upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id,
|
||||
uploaded_byte_count, -1,
|
||||
BytesIO(logs_data),
|
||||
upload_metadata)
|
||||
uploaded_byte_count += len(logs_data)
|
||||
|
||||
# Move forward.
|
||||
current_start_id = id_range[1] + 1
|
||||
|
||||
# Increase the batch size if necessary.
|
||||
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
|
||||
current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2)
|
||||
|
||||
return upload_metadata, uploaded_byte_count
|
||||
|
||||
def _report_results(self, job_details, result_status, exported_data_url=None):
|
||||
logger.info('Reporting result of `%s` for %s; %s', result_status, job_details,
|
||||
exported_data_url)
|
||||
|
||||
if job_details.get('callback_url'):
|
||||
# Post the results to the callback URL.
|
||||
client = app.config['HTTPCLIENT']
|
||||
result = client.post(job_details['callback_url'], json={
|
||||
'export_id': job_details['export_id'],
|
||||
'start_time': job_details['start_time'],
|
||||
'end_time': job_details['end_time'],
|
||||
'namespace': job_details['namespace_name'],
|
||||
'repository': job_details['repository_name'],
|
||||
'exported_data_url': exported_data_url,
|
||||
'status': result_status.value,
|
||||
})
|
||||
|
||||
if result.status_code != 200:
|
||||
logger.error('Got `%s` status code for callback URL `%s` for export `%s`',
|
||||
result.status_code, job_details['callback_url'],
|
||||
job_details['export_id'])
|
||||
raise Exception('Got non-200 for batch logs reporting; retrying later')
|
||||
|
||||
if job_details.get('callback_email'):
|
||||
with app.app_context():
|
||||
send_logs_exported_email(job_details['callback_email'], job_details['export_id'],
|
||||
result_status, exported_data_url,
|
||||
EXPORTED_LOGS_EXPIRATION_SECONDS)
|
||||
|
||||
|
||||
def _parse_time(specified_time):
|
||||
try:
|
||||
return datetime.strptime(specified_time + ' UTC', '%m/%d/%Y %Z')
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _run_and_time(fn):
|
||||
start_time = datetime.utcnow()
|
||||
result = fn()
|
||||
return result, datetime.utcnow() - start_time
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
|
||||
|
||||
logger.debug('Starting export action logs worker')
|
||||
worker = ExportActionLogsWorker(export_action_logs_queue,
|
||||
poll_period_seconds=POLL_PERIOD_SECONDS)
|
||||
worker.start()
|
66
workers/test/test_exportactionlogsworker.py
Normal file
66
workers/test/test_exportactionlogsworker.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
import json
|
||||
|
||||
from app import storage
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from httmock import urlmatch, HTTMock
|
||||
|
||||
from data import model, database
|
||||
from workers.exportactionlogsworker import ExportActionLogsWorker
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
|
||||
@pytest.mark.parametrize('namespace,repo_name,expects_logs', [
|
||||
('buynlarge', 'orgrepo', True),
|
||||
('devtable', 'history', False),
|
||||
])
|
||||
def test_process_queue_item(namespace, repo_name, expects_logs, app):
|
||||
end_time = datetime.utcnow() + timedelta(days=365)
|
||||
start_time = datetime.utcnow() - timedelta(days=365)
|
||||
|
||||
repo = model.repository.get_repository(namespace, repo_name)
|
||||
|
||||
assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs
|
||||
assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs
|
||||
|
||||
worker = ExportActionLogsWorker(None)
|
||||
called = [{}]
|
||||
|
||||
@urlmatch(netloc=r'testcallback')
|
||||
def handle_request(url, request):
|
||||
called[0] = json.loads(request.body)
|
||||
return {'status_code': 200, 'content': '{}'}
|
||||
|
||||
def format_date(datetime):
|
||||
return datetime.strftime("%m/%d/%Y")
|
||||
|
||||
with HTTMock(handle_request):
|
||||
worker.process_queue_item({
|
||||
'export_id': 'someid',
|
||||
'repository_id': repo.id,
|
||||
'namespace_id': repo.namespace_user.id,
|
||||
'namespace_name': namespace,
|
||||
'repository_name': repo_name,
|
||||
'start_time': format_date(start_time),
|
||||
'end_time': format_date(end_time),
|
||||
'callback_url': 'http://testcallback/',
|
||||
'callback_email': None,
|
||||
})
|
||||
|
||||
assert called[0]
|
||||
assert called[0][u'export_id'] == 'someid'
|
||||
assert called[0][u'status'] == 'success'
|
||||
|
||||
url = called[0][u'exported_data_url']
|
||||
assert url.find('http://localhost:5000/exportedlogs/') == 0
|
||||
|
||||
storage_id = url[len('http://localhost:5000/exportedlogs/'):]
|
||||
created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id)
|
||||
created_json = json.loads(created)
|
||||
|
||||
expected_count = database.LogEntry.select().where(database.LogEntry.repository == repo).count()
|
||||
assert (expected_count > 1) == expects_logs
|
||||
|
||||
assert created_json['export_id'] == 'someid'
|
||||
assert len(created_json['logs']) == (expected_count + 1)
|
Reference in a new issue