diff --git a/app.py b/app.py index dd2258299..c5b84e174 100644 --- a/app.py +++ b/app.py @@ -227,6 +227,9 @@ notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf, has_na secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf, has_namespace=False, metric_queue=metric_queue) +export_action_logs_queue = WorkQueue(app.config['EXPORT_ACTION_LOGS_QUEUE_NAME'], tf, + has_namespace=True, + metric_queue=metric_queue) # Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied # when a namespace is marked for deletion. diff --git a/conf/init/service/batch/exportactionlogsworker/log/run b/conf/init/service/batch/exportactionlogsworker/log/run new file mode 100755 index 000000000..a152ba029 --- /dev/null +++ b/conf/init/service/batch/exportactionlogsworker/log/run @@ -0,0 +1,4 @@ +#!/bin/sh + +# Start the logger +exec logger -i -t exportactionlogsworker diff --git a/conf/init/service/batch/exportactionlogsworker/run b/conf/init/service/batch/exportactionlogsworker/run new file mode 100755 index 000000000..a2f6194e7 --- /dev/null +++ b/conf/init/service/batch/exportactionlogsworker/run @@ -0,0 +1,9 @@ +#! /bin/bash + +echo 'Starting Export Actions Log worker' + +QUAYPATH=${QUAYPATH:-"."} +cd ${QUAYDIR:-"/"} +PYTHONPATH=$QUAYPATH venv/bin/python -m workers.exportactionlogsworker 2>&1 + +echo 'Export Actions Log worker exited' diff --git a/config.py b/config.py index 75794515e..856bb20d5 100644 --- a/config.py +++ b/config.py @@ -172,6 +172,7 @@ class DefaultConfig(ImmutableConfig): SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification' CHUNK_CLEANUP_QUEUE_NAME = 'chunk_cleanup' NAMESPACE_GC_QUEUE_NAME = 'namespacegc' + EXPORT_ACTION_LOGS_QUEUE_NAME = 'exportactionlogs' # Super user config. Note: This MUST BE an empty list for the default config. SUPER_USERS = [] diff --git a/data/model/log.py b/data/model/log.py index e1f1a2b0c..40f76c7a9 100644 --- a/data/model/log.py +++ b/data/model/log.py @@ -15,11 +15,16 @@ logger = logging.getLogger(__name__) ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING = ['pull_repo'] -def _logs_query(selections, start_time, end_time, performer=None, repository=None, namespace=None, - ignore=None, model=LogEntry): +def _logs_query(selections, start_time=None, end_time=None, performer=None, repository=None, + namespace=None, ignore=None, model=LogEntry, id_range=None): """ Returns a query for selecting logs from the table, with various options and filters. """ - joined = (model.select(*selections).switch(model) - .where(model.datetime >= start_time, model.datetime < end_time)) + assert (start_time is not None and end_time is not None) or (id_range is not None) + joined = (model.select(*selections).switch(model)) + + if id_range is not None: + joined = joined.where(model.id >= id_range[0], model.id <= id_range[1]) + else: + joined = joined.where(model.datetime >= start_time, model.datetime < end_time) if repository: joined = joined.where(model.repository == repository) @@ -67,8 +72,8 @@ def get_aggregated_logs(start_time, end_time, performer=None, repository=None, n return query.group_by(date, model.kind) -def get_logs_query(start_time, end_time, performer=None, repository=None, namespace=None, - ignore=None, model=LogEntry): +def get_logs_query(start_time=None, end_time=None, performer=None, repository=None, namespace=None, + ignore=None, model=LogEntry, id_range=None): """ Returns the logs matching the given filters. """ Performer = User.alias() Account = User.alias() @@ -78,13 +83,13 @@ def get_logs_query(start_time, end_time, performer=None, repository=None, namesp selections.append(Account) query = _logs_query(selections, start_time, end_time, performer, repository, namespace, ignore, - model=model) + model=model, id_range=id_range) query = (query.switch(model).join(Performer, JOIN.LEFT_OUTER, - on=(model.performer == Performer.id).alias('performer'))) + on=(model.performer == Performer.id).alias('performer'))) if namespace is None and repository is None: query = (query.switch(model).join(Account, JOIN.LEFT_OUTER, - on=(model.account == Account.id).alias('account'))) + on=(model.account == Account.id).alias('account'))) return query @@ -191,3 +196,54 @@ def get_repositories_action_sums(repository_ids): action_count_map[record[0]] = record[1] return action_count_map + + +def get_minimum_id_for_logs(start_time, repository_id=None, namespace_id=None): + """ Returns the minimum ID for logs matching the given repository or namespace in + the logs table, starting at the given start time. + """ + # First try bounded by a day. Most repositories will meet this criteria, and therefore + # can make a much faster query. + day_after = start_time + timedelta(days=1) + result = _get_bounded_id(fn.Min, LogEntry.datetime >= start_time, + repository_id, namespace_id, LogEntry.datetime < day_after) + if result is not None: + return result + + return _get_bounded_id(fn.Min, LogEntry.datetime >= start_time, repository_id, namespace_id) + + +def get_maximum_id_for_logs(end_time, repository_id=None, namespace_id=None): + """ Returns the maximum ID for logs matching the given repository or namespace in + the logs table, ending at the given end time. + """ + # First try bounded by a day. Most repositories will meet this criteria, and therefore + # can make a much faster query. + day_before = end_time - timedelta(days=1) + result = _get_bounded_id(fn.Max, LogEntry.datetime <= end_time, + repository_id, namespace_id, LogEntry.datetime > day_before) + if result is not None: + return result + + return _get_bounded_id(fn.Max, LogEntry.datetime <= end_time, repository_id, namespace_id) + + +def _get_bounded_id(fn, filter_clause, repository_id, namespace_id, reduction_clause=None): + assert (namespace_id is not None) or (repository_id is not None) + query = (LogEntry + .select(fn(LogEntry.id)) + .where(filter_clause)) + + if reduction_clause is not None: + query = query.where(reduction_clause) + + if repository_id is not None: + query = query.where(LogEntry.repository == repository_id) + else: + query = query.where(LogEntry.account == namespace_id) + + row = query.tuples()[0] + if not row: + return None + + return row[0] diff --git a/emails/logsexported.html b/emails/logsexported.html new file mode 100644 index 000000000..9695352d2 --- /dev/null +++ b/emails/logsexported.html @@ -0,0 +1,38 @@ +{% extends "base.html" %} + +{% block content %} + +

Usage Logs Export has completed

+

Export ID: {{ export_id }}

+
+ +{% if status == 'success' %} + + + + +
The exported logs information can be found at {{ exported_data_url }} and will remain accessible for {{ exported_data_expiration }} seconds before being deleted.
+{% elif status == 'timedout' %} + + + + +
The attempt to export the logs in the specified range has timed out. Please contact support if this problem persists.
+{% elif status == 'invalidrequest' %} + + + + +
The attempt to export the logs failed due to an invalid request. Please contact support if this problem persists.
+{% endif %} + + + + + +
If you did not initiate this operation, please delete this e-mail.
+ +Best Wishes,
+The {{ app_title }} Team
+ +{% endblock %} diff --git a/endpoints/api/logs.py b/endpoints/api/logs.py index 8d0068714..4c3928c43 100644 --- a/endpoints/api/logs.py +++ b/endpoints/api/logs.py @@ -1,10 +1,17 @@ """ Access usage logs for organizations or repositories. """ +import json +import uuid from datetime import datetime, timedelta +from flask import request + +from app import export_action_logs_queue from endpoints.api import (resource, nickname, ApiResource, query_param, parse_args, RepositoryParamResource, require_repo_admin, related_user_resource, - format_date, require_user_admin, path_param, require_scope, page_support) + format_date, require_user_admin, path_param, require_scope, page_support, + validate_json_request, InvalidRequest) +from data import model as data_model from endpoints.api.logs_models_pre_oci import pre_oci_model as model from endpoints.exception import Unauthorized, NotFound from auth.permissions import AdministerOrganizationPermission @@ -207,3 +214,127 @@ class OrgAggregateLogs(ApiResource): performer_name=performer_name, ignore=SERVICE_LEVEL_LOG_KINDS) raise Unauthorized() + + +def queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None): + export_id = str(uuid.uuid4()) + namespace = data_model.user.get_namespace_user(namespace_name) + if namespace is None: + raise InvalidRequest('Unknown namespace') + + repository = None + if repository_name is not None: + repository = data_model.repository.get_repository(namespace_name, repository_name) + if repository is None: + raise InvalidRequest('Unknown repository') + + callback_url = options.get('callback_url') + if callback_url: + if not callback_url.startswith('https://') and not callback_url.startswith('http://'): + raise InvalidRequest('Invalid callback URL') + + export_action_logs_queue.put([namespace_name], json.dumps({ + 'export_id': export_id, + 'repository_id': repository.id if repository else None, + 'namespace_id': namespace.id, + 'namespace_name': namespace.username, + 'repository_name': repository.name if repository else None, + 'start_time': start_time, + 'end_time': end_time, + 'callback_url': callback_url, + 'callback_email': options.get('callback_email'), + }), retries_remaining=3) + + return { + 'export_id': export_id, + } + + +EXPORT_LOGS_SCHEMA = { + 'type': 'object', + 'description': 'Configuration for an export logs operation', + 'properties': { + 'callback_url': { + 'type': 'string', + 'description': 'The callback URL to invoke with a link to the exported logs', + }, + 'callback_email': { + 'type': 'string', + 'description': 'The e-mail address at which to e-mail a link to the exported logs', + }, + }, +} + + +@resource('/v1/repository//exportlogs') +@path_param('repository', 'The full path of the repository. e.g. namespace/name') +class ExportRepositoryLogs(RepositoryParamResource): + """ Resource for exporting the logs for the specific repository. """ + schemas = { + 'ExportLogs': EXPORT_LOGS_SCHEMA + } + + @require_repo_admin + @nickname('exportRepoLogs') + @parse_args() + @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) + @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) + @validate_json_request('ExportLogs') + def post(self, namespace, repository, parsed_args): + """ Queues an export of the logs for the specified repository. """ + if model.repo_exists(namespace, repository) is False: + raise NotFound() + + start_time = parsed_args['starttime'] + end_time = parsed_args['endtime'] + return queue_logs_export(start_time, end_time, request.get_json(), namespace, + repository_name=repository) + + +@resource('/v1/user/exportlogs') +class ExportUserLogs(ApiResource): + """ Resource for exporting the logs for the current user repository. """ + schemas = { + 'ExportLogs': EXPORT_LOGS_SCHEMA + } + + @require_user_admin + @nickname('exportUserLogs') + @parse_args() + @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @validate_json_request('ExportLogs') + def post(self, parsed_args): + """ Returns the aggregated logs for the current user. """ + start_time = parsed_args['starttime'] + end_time = parsed_args['endtime'] + + user = get_authenticated_user() + return queue_logs_export(start_time, end_time, request.get_json(), user.username) + + +@resource('/v1/organization//exportlogs') +@path_param('orgname', 'The name of the organization') +@related_user_resource(ExportUserLogs) +class ExportOrgLogs(ApiResource): + """ Resource for exporting the logs for an entire organization. """ + schemas = { + 'ExportLogs': EXPORT_LOGS_SCHEMA + } + + @nickname('exportOrgLogs') + @parse_args() + @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @require_scope(scopes.ORG_ADMIN) + @validate_json_request('ExportLogs') + def post(self, orgname, parsed_args): + """ Gets the aggregated logs for the specified organization. """ + permission = AdministerOrganizationPermission(orgname) + if permission.can(): + start_time = parsed_args['starttime'] + end_time = parsed_args['endtime'] + + return queue_logs_export(start_time, end_time, request.get_json(), orgname) + + raise Unauthorized() diff --git a/endpoints/api/logs_models_pre_oci.py b/endpoints/api/logs_models_pre_oci.py index 8e7ea5606..ba46a931b 100644 --- a/endpoints/api/logs_models_pre_oci.py +++ b/endpoints/api/logs_models_pre_oci.py @@ -4,7 +4,7 @@ from data import model, database from endpoints.api.logs_models_interface import LogEntryDataInterface, LogEntryPage, LogEntry, AggregatedLogEntry -def _create_log(log): +def create_log(log): account_organization = None account_username = None account_email = None @@ -57,7 +57,7 @@ class PreOCIModel(LogEntryDataInterface): logs, next_page_token = model.modelutil.paginate(logs_query, m, descending=True, page_token=page_token, limit=20) - return LogEntryPage([_create_log(log) for log in logs], next_page_token) + return LogEntryPage([create_log(log) for log in logs], next_page_token) return get_logs(database.LogEntry) diff --git a/endpoints/api/test/test_logs.py b/endpoints/api/test/test_logs.py new file mode 100644 index 000000000..3720d3c95 --- /dev/null +++ b/endpoints/api/test/test_logs.py @@ -0,0 +1,31 @@ +import time + +from mock import patch + +from app import export_action_logs_queue +from endpoints.api.test.shared import conduct_api_call +from endpoints.api.logs import ExportOrgLogs +from endpoints.test.shared import client_with_identity + +from test.fixtures import * + +def test_export_logs(client): + with client_with_identity('devtable', client) as cl: + assert export_action_logs_queue.get() is None + + timecode = time.time() + def get_time(): + return timecode - 2 + + with patch('time.time', get_time): + # Call to export logs. + body = { + 'callback_url': 'http://some/url', + 'callback_email': 'a@b.com', + } + + conduct_api_call(cl, ExportOrgLogs, 'POST', {'orgname': 'buynlarge'}, + body, expected_code=200) + + # Ensure the request was queued. + assert export_action_logs_queue.get() is not None diff --git a/endpoints/api/test/test_security.py b/endpoints/api/test/test_security.py index 2c95c184d..be907715d 100644 --- a/endpoints/api/test/test_security.py +++ b/endpoints/api/test/test_security.py @@ -47,6 +47,7 @@ NOTIFICATION_PARAMS = {'namespace': 'devtable', 'repository': 'devtable/simple', TOKEN_PARAMS = {'token_uuid': 'someuuid'} TRIGGER_PARAMS = {'repository': 'devtable/simple', 'trigger_uuid': 'someuuid'} MANIFEST_PARAMS = {'repository': 'devtable/simple', 'manifestref': 'sha256:deadbeef'} +EXPORTLOGS_PARAMS = {'callback_url': 'http://foo'} SECURITY_TESTS = [ (AppTokens, 'GET', {}, {}, None, 401), @@ -1179,6 +1180,21 @@ SECURITY_TESTS = [ (RepositoryAggregateLogs, 'GET', {'repository': 'devtable/simple'}, None, 'freshuser', 403), (RepositoryAggregateLogs, 'GET', {'repository': 'devtable/simple'}, None, 'reader', 403), + (ExportUserLogs, 'POST', None, EXPORTLOGS_PARAMS, None, 401), + (ExportUserLogs, 'POST', None, EXPORTLOGS_PARAMS, 'devtable', 200), + (ExportUserLogs, 'POST', None, EXPORTLOGS_PARAMS, 'freshuser', 200), + (ExportUserLogs, 'POST', None, EXPORTLOGS_PARAMS, 'reader', 200), + + (ExportOrgLogs, 'POST', {'orgname': 'buynlarge'}, EXPORTLOGS_PARAMS, None, 401), + (ExportOrgLogs, 'POST', {'orgname': 'buynlarge'}, EXPORTLOGS_PARAMS, 'devtable', 200), + (ExportOrgLogs, 'POST', {'orgname': 'buynlarge'}, EXPORTLOGS_PARAMS, 'freshuser', 403), + (ExportOrgLogs, 'POST', {'orgname': 'buynlarge'}, EXPORTLOGS_PARAMS, 'reader', 403), + + (ExportRepositoryLogs, 'POST', {'repository': 'devtable/simple'}, EXPORTLOGS_PARAMS, None, 401), + (ExportRepositoryLogs, 'POST', {'repository': 'devtable/simple'}, EXPORTLOGS_PARAMS, 'devtable', 200), + (ExportRepositoryLogs, 'POST', {'repository': 'devtable/simple'}, EXPORTLOGS_PARAMS, 'freshuser', 403), + (ExportRepositoryLogs, 'POST', {'repository': 'devtable/simple'}, EXPORTLOGS_PARAMS, 'reader', 403), + (SuperUserAggregateLogs, 'GET', None, None, None, 401), (SuperUserAggregateLogs, 'GET', None, None, 'devtable', 200), (SuperUserAggregateLogs, 'GET', None, None, 'freshuser', 403), diff --git a/endpoints/web.py b/endpoints/web.py index 515187597..a1c667aac 100644 --- a/endpoints/web.py +++ b/endpoints/web.py @@ -12,7 +12,7 @@ from flask_login import current_user import features from app import (app, billing as stripe, build_logs, avatar, signer, log_archive, config_provider, - get_app_url, instance_keys, user_analytics) + get_app_url, instance_keys, user_analytics, storage) from auth import scopes from auth.auth_context import get_authenticated_user from auth.basic import has_basic_auth @@ -372,6 +372,33 @@ def buildlogs(build_uuid): return response +@web.route('/exportedlogs/', methods=['GET']) +def exportedlogs(file_id): + # Only enable this endpoint if local storage is available. + has_local_storage = False + for storage_type, _ in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values(): + if storage_type == 'LocalStorage': + has_local_storage = True + break + + if not has_local_storage: + abort(404) + + JSON_MIMETYPE = 'application/json' + exported_logs_storage_path = app.config.get('EXPORT_ACTION_LOGS_STORAGE_PATH', + 'exportedactionlogs') + export_storage_path = os.path.join(exported_logs_storage_path, file_id) + if not storage.exists(storage.preferred_locations, export_storage_path): + abort(404) + + try: + return send_file(storage.stream_read_file(storage.preferred_locations, export_storage_path), + mimetype=JSON_MIMETYPE) + except IOError: + logger.exception('Could not read exported logs') + abort(403) + + @web.route('/logarchive/', methods=['GET']) @route_show_if(features.BUILD_SUPPORT) @process_auth_or_cookie @@ -401,6 +428,7 @@ def logarchive(file_id): logger.exception('Could not read archived logs') abort(403) + @web.route('/receipt', methods=['GET']) @route_show_if(features.BILLING) @require_session_login diff --git a/static/css/directives/ui/logs-view.css b/static/css/directives/ui/logs-view.css index cabda9eff..982e57b2f 100644 --- a/static/css/directives/ui/logs-view.css +++ b/static/css/directives/ui/logs-view.css @@ -113,4 +113,29 @@ float: none !important; text-align: right; margin-bottom: 20px; +} + +.logs-view-element .toggle-icon { + vertical-align: middle; +} + +.logs-view-element .download-btn { + margin-left: 6px; + vertical-align: middle; +} + +.logs-view-element .download-btn i.fa { + margin-right: 4px; +} + +.logs-view-element .help-text { + font-size: 12px; + color: #888; + padding-top: 0px; + padding: 10px; +} + +.logs-view-element code.date { + font-size: 70%; + color: #2581c7; } \ No newline at end of file diff --git a/static/directives/logs-view.html b/static/directives/logs-view.html index f07368847..79ce3d7a2 100644 --- a/static/directives/logs-view.html +++ b/static/directives/logs-view.html @@ -23,6 +23,8 @@ @@ -98,4 +100,27 @@
+ + +
+
+
+ Enter an e-mail address or callback URL (must start with http:// or https://) + at which to receive the exported logs once they have been fully processed: +
+ +
+ Note: The export process can take up to an hour to process if there are many logs. As well, + only a single export process can run at a time for each namespace. Additional export requests will be + queued. +
+
+
diff --git a/static/js/directives/ui/logs-view.js b/static/js/directives/ui/logs-view.js index 9025fece6..a73e9f5f6 100644 --- a/static/js/directives/ui/logs-view.js +++ b/static/js/directives/ui/logs-view.js @@ -28,6 +28,7 @@ angular.module('quay').directive('logsView', function () { $scope.chartLoading = true; $scope.options = {}; + $scope.context = {}; var datetime = new Date(); $scope.options.logStartDate = new Date(datetime.getUTCFullYear(), datetime.getUTCMonth(), datetime.getUTCDate() - 7); @@ -479,6 +480,33 @@ angular.module('quay').directive('logsView', function () { return StringBuilderService.buildString(logDescriptions[log.kind] || log.kind, log.metadata); }; + $scope.showExportLogs = function() { + $scope.exportLogsInfo = {}; + }; + + $scope.exportLogs = function(exportLogsInfo, callback) { + if (!exportLogsInfo.urlOrEmail) { + callback(false); + return; + } + + var exportURL = getUrl('exportlogs').toString(); + var runExport = Restangular.one(exportURL); + + var urlOrEmail = exportLogsInfo.urlOrEmail; + var data = {}; + if (urlOrEmail.indexOf('http://') == 0 || urlOrEmail.indexOf('https://') == 0) { + data['callback_url'] = urlOrEmail; + } else { + data['callback_email'] = urlOrEmail; + } + + runExport.customPOST(data).then(function(resp) { + bootbox.alert('Usage logs export queued with ID `' + resp['export_id'] + '`') + callback(true); + }, ApiService.errorDisplay('Could not start logs export', callback)); + }; + $scope.$watch('organization', update); $scope.$watch('user', update); $scope.$watch('repository', update); diff --git a/util/config/schema.py b/util/config/schema.py index 1f66f5db1..684d07d17 100644 --- a/util/config/schema.py +++ b/util/config/schema.py @@ -16,6 +16,7 @@ INTERNAL_ONLY_PROPERTIES = { 'SECURITY_SCANNER_ISSUER_NAME', 'NOTIFICATION_QUEUE_NAME', 'NAMESPACE_GC_QUEUE_NAME', + 'EXPORT_ACTION_LOGS_QUEUE_NAME', 'FEATURE_BILLING', 'BILLING_TYPE', diff --git a/util/useremails.py b/util/useremails.py index 49a130490..6d44bb850 100644 --- a/util/useremails.py +++ b/util/useremails.py @@ -166,6 +166,16 @@ def send_invoice_email(email, contents): mail.send(msg) +def send_logs_exported_email(email, export_id, status, exported_data_url=None, + exported_data_expiration=None): + send_email(email, 'Export Action Logs Complete', 'logsexported', { + 'status': status, + 'export_id': export_id, + 'exported_data_url': exported_data_url, + 'exported_data_expiration': exported_data_expiration, + }) + + # INTERNAL EMAILS BELOW def send_subscription_change(change_description, customer_id, customer_email, quay_username): @@ -182,5 +192,3 @@ def send_subscription_change(change_description, customer_id, customer_email, qu msg.html = SUBSCRIPTION_CHANGE.format(change_description, customer_id, customer_email, quay_username) mail.send(msg) - - diff --git a/workers/exportactionlogsworker.py b/workers/exportactionlogsworker.py new file mode 100644 index 000000000..3c14ec006 --- /dev/null +++ b/workers/exportactionlogsworker.py @@ -0,0 +1,283 @@ +import logging +import os.path +import json +import uuid + +from datetime import datetime, timedelta +from io import BytesIO + +from enum import Enum, unique + +from app import app, export_action_logs_queue, storage, get_app_url +from data import model +from endpoints.api import format_date +from endpoints.api.logs_models_pre_oci import create_log +from workers.queueworker import QueueWorker, JobException +from util.log import logfile_path +from util.useremails import send_logs_exported_email + +logger = logging.getLogger(__name__) + + +POLL_PERIOD_SECONDS = 1 + +EXPORT_LOGS_STORAGE_PATH = app.config.get('EXPORT_ACTION_LOGS_STORAGE_PATH', 'exportedactionlogs') +MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS', 60 * 60) # 1 hour +MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30) +EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour + +MINIMUM_RANGE_SIZE = 1000 +MAXIMUM_RANGE_SIZE = 100000 +EXPECTED_ITERATION_LOG_COUNT = 1000 + + +@unique +class ExportResult(Enum): + # NOTE: Make sure to handle these in `logsexported.html` in `emails` + INVALID_REQUEST = 'invalidrequest' + OPERATION_TIMEDOUT = 'timedout' + SUCCESSFUL_EXPORT = 'success' + + +class ExportActionLogsWorker(QueueWorker): + """ Worker which exports action logs for a namespace or a repository based on + a queued request from the API. + """ + def process_queue_item(self, job_details): + logger.info('Got export actions logs queue item: %s', job_details) + + # job_details block (as defined in the logs.py API endpoint): + # { + # 'export_id': export_id, + # 'repository_id': repository.id or None, + # 'namespace_id': namespace.id, + # 'namespace_name': namespace.username, + # 'repository_name': repository.name or None, + # 'start_time': start_time, + # 'end_time': end_time, + # 'callback_url': callback_url or None, + # 'callback_email': callback_email or None, + # } + export_id = job_details['export_id'] + + start_time = _parse_time(job_details['start_time']) + end_time = _parse_time(job_details['end_time']) + + # Make sure the end time has the whole day. + if start_time is None or end_time is None: + self._report_results(job_details, ExportResult.INVALID_REQUEST) + return + + end_time = end_time + timedelta(days=1) - timedelta(milliseconds=1) + + # Select the minimum and maximum IDs for the logs for the repository/namespace + # over the time range. + namespace_id = job_details['namespace_id'] + repository_id = job_details['repository_id'] + max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) + + min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time, + repository_id, + namespace_id)) + if elapsed > max_query_time: + logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`', + export_id, elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return + + max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time, + repository_id, + namespace_id)) + if elapsed > max_query_time: + logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`', + export_id, elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return + + min_id = min_id or 1 + max_id = max_id or 1 + + logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id) + + # Generate a file key so that if we return an API URL, it cannot simply be constructed from + # just the export ID. + file_key = str(uuid.uuid4()) + exported_filename = '%s-%s' % (export_id, file_key) + + # Start a chunked upload for the logs and stream them. + upload_id, upload_metadata = storage.initiate_chunked_upload(storage.preferred_locations) + export_storage_path = os.path.join(EXPORT_LOGS_STORAGE_PATH, exported_filename) + logger.debug('Starting chunked upload to path `%s`', export_storage_path) + + # Start with a 'json' header that contains the opening bracket, as well as basic + # information and the start of the `logs` array. + details = { + 'start_time': format_date(start_time), + 'end_time': format_date(end_time), + 'namespace': job_details['namespace_name'], + 'repository': job_details['repository_name'], + } + + prefix_data = """{ + "export_id": "%s", + "details": %s, + "logs": [ + """ % (export_id, json.dumps(details)) + + upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, 0, -1, + BytesIO(str(prefix_data)), upload_metadata) + uploaded_byte_count = len(prefix_data) + + try: + # Stream the logs to storage as chunks. + updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata, + uploaded_byte_count, min_id, max_id, + job_details) + if updated_metadata is None: + storage.cancel_chunked_upload(upload_id, upload_metadata) + return + + # Close the JSON block. + suffix_data = """ + {"terminator": true}] + }""" + + upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, + uploaded_byte_count, -1, + BytesIO(str(suffix_data)), + upload_metadata) + if updated_metadata is None: + storage.cancel_chunked_upload(upload_id, upload_metadata) + return + + # Complete the upload. + storage.complete_chunked_upload(storage.preferred_locations, upload_id, export_storage_path, + updated_metadata) + except: + logger.exception('Exception when exporting logs for `%s`', export_id) + storage.cancel_chunked_upload(storage.preferred_locations, upload_id, upload_metadata) + raise JobException + + # Invoke the callbacks. + export_url = storage.get_direct_download_url(storage.preferred_locations, export_storage_path, + expires_in=EXPORTED_LOGS_EXPIRATION_SECONDS) + if export_url is None: + export_url = '%s/exportedlogs/%s' % (get_app_url(), exported_filename) + + self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url) + + def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, + job_details): + export_id = job_details['export_id'] + max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS) + max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) + kinds = model.log.get_log_entry_kinds() + + # Using an adjusting scale, start downloading log rows in batches, starting at + # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or + # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes + # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. + batch_start_time = datetime.utcnow() + + current_start_id = min_id + current_batch_size = MINIMUM_RANGE_SIZE + + while current_start_id <= max_id: + # Verify we haven't been working for too long. + work_elapsed = datetime.utcnow() - batch_start_time + if work_elapsed > max_work_period: + logger.error('Retrieval of logs `%s` timed out with time of `%s`', + export_id, work_elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return None, None + + id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] + + # Load the next set of logs. + def retrieve_and_write_logs(): + namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None + repository_id = job_details.get('repository_id') + logger.debug('Retrieving logs over range %s with namespace %s and repository %s', + id_range, namespace_id, repository_id) + + logs_query = model.log.get_logs_query(namespace=namespace_id, + repository=repository_id, + id_range=id_range) + return [create_log(log) for log in logs_query] + + logs, elapsed = _run_and_time(retrieve_and_write_logs) + if elapsed > max_query_time: + logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`', + export_id, id_range, elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return None, None + + # Write the logs to storage. + logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range) + if logs: + logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ',' + logs_data = logs_data.encode('utf-8') + upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, + uploaded_byte_count, -1, + BytesIO(logs_data), + upload_metadata) + uploaded_byte_count += len(logs_data) + + # Move forward. + current_start_id = id_range[1] + 1 + + # Increase the batch size if necessary. + if len(logs) < EXPECTED_ITERATION_LOG_COUNT: + current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) + + return upload_metadata, uploaded_byte_count + + def _report_results(self, job_details, result_status, exported_data_url=None): + logger.info('Reporting result of `%s` for %s; %s', result_status, job_details, + exported_data_url) + + if job_details.get('callback_url'): + # Post the results to the callback URL. + client = app.config['HTTPCLIENT'] + result = client.post(job_details['callback_url'], json={ + 'export_id': job_details['export_id'], + 'start_time': job_details['start_time'], + 'end_time': job_details['end_time'], + 'namespace': job_details['namespace_name'], + 'repository': job_details['repository_name'], + 'exported_data_url': exported_data_url, + 'status': result_status.value, + }) + + if result.status_code != 200: + logger.error('Got `%s` status code for callback URL `%s` for export `%s`', + result.status_code, job_details['callback_url'], + job_details['export_id']) + raise Exception('Got non-200 for batch logs reporting; retrying later') + + if job_details.get('callback_email'): + with app.app_context(): + send_logs_exported_email(job_details['callback_email'], job_details['export_id'], + result_status, exported_data_url, + EXPORTED_LOGS_EXPIRATION_SECONDS) + + +def _parse_time(specified_time): + try: + return datetime.strptime(specified_time + ' UTC', '%m/%d/%Y %Z') + except ValueError: + return None + +def _run_and_time(fn): + start_time = datetime.utcnow() + result = fn() + return result, datetime.utcnow() - start_time + + +if __name__ == "__main__": + logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) + + logger.debug('Starting export action logs worker') + worker = ExportActionLogsWorker(export_action_logs_queue, + poll_period_seconds=POLL_PERIOD_SECONDS) + worker.start() diff --git a/workers/test/test_exportactionlogsworker.py b/workers/test/test_exportactionlogsworker.py new file mode 100644 index 000000000..8e3f3df61 --- /dev/null +++ b/workers/test/test_exportactionlogsworker.py @@ -0,0 +1,66 @@ +import json + +from app import storage +from datetime import datetime, timedelta + +from httmock import urlmatch, HTTMock + +from data import model, database +from workers.exportactionlogsworker import ExportActionLogsWorker + +from test.fixtures import * + + +@pytest.mark.parametrize('namespace,repo_name,expects_logs', [ + ('buynlarge', 'orgrepo', True), + ('devtable', 'history', False), +]) +def test_process_queue_item(namespace, repo_name, expects_logs, app): + end_time = datetime.utcnow() + timedelta(days=365) + start_time = datetime.utcnow() - timedelta(days=365) + + repo = model.repository.get_repository(namespace, repo_name) + + assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs + assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs + + worker = ExportActionLogsWorker(None) + called = [{}] + + @urlmatch(netloc=r'testcallback') + def handle_request(url, request): + called[0] = json.loads(request.body) + return {'status_code': 200, 'content': '{}'} + + def format_date(datetime): + return datetime.strftime("%m/%d/%Y") + + with HTTMock(handle_request): + worker.process_queue_item({ + 'export_id': 'someid', + 'repository_id': repo.id, + 'namespace_id': repo.namespace_user.id, + 'namespace_name': namespace, + 'repository_name': repo_name, + 'start_time': format_date(start_time), + 'end_time': format_date(end_time), + 'callback_url': 'http://testcallback/', + 'callback_email': None, + }) + + assert called[0] + assert called[0][u'export_id'] == 'someid' + assert called[0][u'status'] == 'success' + + url = called[0][u'exported_data_url'] + assert url.find('http://localhost:5000/exportedlogs/') == 0 + + storage_id = url[len('http://localhost:5000/exportedlogs/'):] + created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id) + created_json = json.loads(created) + + expected_count = database.LogEntry.select().where(database.LogEntry.repository == repo).count() + assert (expected_count > 1) == expects_logs + + assert created_json['export_id'] == 'someid' + assert len(created_json['logs']) == (expected_count + 1)