import logging import os.path import json import uuid from datetime import datetime, timedelta from io import BytesIO from enum import Enum, unique from app import app, export_action_logs_queue, storage, get_app_url from data import model from endpoints.api import format_date from endpoints.api.logs_models_pre_oci import create_log from workers.queueworker import QueueWorker, JobException from util.log import logfile_path from util.useremails import send_logs_exported_email logger = logging.getLogger(__name__) POLL_PERIOD_SECONDS = 1 EXPORT_LOGS_STORAGE_PATH = app.config.get('EXPORT_ACTION_LOGS_STORAGE_PATH', 'exportedactionlogs') MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS', 60 * 60) # 1 hour MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30) EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour MINIMUM_RANGE_SIZE = 1000 MAXIMUM_RANGE_SIZE = 100000 EXPECTED_ITERATION_LOG_COUNT = 1000 @unique class ExportResult(Enum): # NOTE: Make sure to handle these in `logsexported.html` in `emails` INVALID_REQUEST = 'invalidrequest' OPERATION_TIMEDOUT = 'timedout' SUCCESSFUL_EXPORT = 'success' class ExportActionLogsWorker(QueueWorker): """ Worker which exports action logs for a namespace or a repository based on a queued request from the API. """ def process_queue_item(self, job_details): logger.info('Got export actions logs queue item: %s', job_details) # job_details block (as defined in the logs.py API endpoint): # { # 'export_id': export_id, # 'repository_id': repository.id or None, # 'namespace_id': namespace.id, # 'namespace_name': namespace.username, # 'repository_name': repository.name or None, # 'start_time': start_time, # 'end_time': end_time, # 'callback_url': callback_url or None, # 'callback_email': callback_email or None, # } export_id = job_details['export_id'] start_time = _parse_time(job_details['start_time']) end_time = _parse_time(job_details['end_time']) # Make sure the end time has the whole day. if start_time is None or end_time is None: self._report_results(job_details, ExportResult.INVALID_REQUEST) return end_time = end_time + timedelta(days=1) - timedelta(milliseconds=1) # Select the minimum and maximum IDs for the logs for the repository/namespace # over the time range. namespace_id = job_details['namespace_id'] repository_id = job_details['repository_id'] max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time, repository_id, namespace_id)) if elapsed > max_query_time: logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`', export_id, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time, repository_id, namespace_id)) if elapsed > max_query_time: logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`', export_id, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return min_id = min_id or 1 max_id = max_id or 1 logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id) # Generate a file key so that if we return an API URL, it cannot simply be constructed from # just the export ID. file_key = str(uuid.uuid4()) exported_filename = '%s-%s' % (export_id, file_key) # Start a chunked upload for the logs and stream them. upload_id, upload_metadata = storage.initiate_chunked_upload(storage.preferred_locations) export_storage_path = os.path.join(EXPORT_LOGS_STORAGE_PATH, exported_filename) logger.debug('Starting chunked upload to path `%s`', export_storage_path) # Start with a 'json' header that contains the opening bracket, as well as basic # information and the start of the `logs` array. details = { 'start_time': format_date(start_time), 'end_time': format_date(end_time), 'namespace': job_details['namespace_name'], 'repository': job_details['repository_name'], } prefix_data = """{ "export_id": "%s", "details": %s, "logs": [ """ % (export_id, json.dumps(details)) upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, 0, -1, BytesIO(str(prefix_data)), upload_metadata) uploaded_byte_count = len(prefix_data) try: # Stream the logs to storage as chunks. updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, job_details) if updated_metadata is None: storage.cancel_chunked_upload(upload_id, upload_metadata) return # Close the JSON block. suffix_data = """ {"terminator": true}] }""" upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, uploaded_byte_count, -1, BytesIO(str(suffix_data)), upload_metadata) if updated_metadata is None: storage.cancel_chunked_upload(upload_id, upload_metadata) return # Complete the upload. storage.complete_chunked_upload(storage.preferred_locations, upload_id, export_storage_path, updated_metadata) except: logger.exception('Exception when exporting logs for `%s`', export_id) storage.cancel_chunked_upload(storage.preferred_locations, upload_id, upload_metadata) raise JobException # Invoke the callbacks. export_url = storage.get_direct_download_url(storage.preferred_locations, export_storage_path, expires_in=EXPORTED_LOGS_EXPIRATION_SECONDS) if export_url is None: export_url = '%s/exportedlogs/%s' % (get_app_url(), exported_filename) self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url) def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, job_details): export_id = job_details['export_id'] max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS) max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) kinds = model.log.get_log_entry_kinds() # Using an adjusting scale, start downloading log rows in batches, starting at # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() current_start_id = min_id current_batch_size = MINIMUM_RANGE_SIZE while current_start_id <= max_id: # Verify we haven't been working for too long. work_elapsed = datetime.utcnow() - batch_start_time if work_elapsed > max_work_period: logger.error('Retrieval of logs `%s` timed out with time of `%s`', export_id, work_elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return None, None id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] # Load the next set of logs. def retrieve_and_write_logs(): namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None repository_id = job_details.get('repository_id') logger.debug('Retrieving logs over range %s with namespace %s and repository %s', id_range, namespace_id, repository_id) logs_query = model.log.get_logs_query(namespace=namespace_id, repository=repository_id, id_range=id_range) return [create_log(log) for log in logs_query] logs, elapsed = _run_and_time(retrieve_and_write_logs) if elapsed > max_query_time: logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`', export_id, id_range, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return None, None # Write the logs to storage. logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range) if logs: logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ',' logs_data = logs_data.encode('utf-8') upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, uploaded_byte_count, -1, BytesIO(logs_data), upload_metadata) uploaded_byte_count += len(logs_data) # Move forward. current_start_id = id_range[1] + 1 # Increase the batch size if necessary. if len(logs) < EXPECTED_ITERATION_LOG_COUNT: current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) return upload_metadata, uploaded_byte_count def _report_results(self, job_details, result_status, exported_data_url=None): logger.info('Reporting result of `%s` for %s; %s', result_status, job_details, exported_data_url) if job_details.get('callback_url'): # Post the results to the callback URL. client = app.config['HTTPCLIENT'] result = client.post(job_details['callback_url'], json={ 'export_id': job_details['export_id'], 'start_time': job_details['start_time'], 'end_time': job_details['end_time'], 'namespace': job_details['namespace_name'], 'repository': job_details['repository_name'], 'exported_data_url': exported_data_url, 'status': result_status.value, }) if result.status_code != 200: logger.error('Got `%s` status code for callback URL `%s` for export `%s`', result.status_code, job_details['callback_url'], job_details['export_id']) raise Exception('Got non-200 for batch logs reporting; retrying later') if job_details.get('callback_email'): with app.app_context(): send_logs_exported_email(job_details['callback_email'], job_details['export_id'], result_status, exported_data_url, EXPORTED_LOGS_EXPIRATION_SECONDS) def _parse_time(specified_time): try: return datetime.strptime(specified_time + ' UTC', '%m/%d/%Y %Z') except ValueError: return None def _run_and_time(fn): start_time = datetime.utcnow() result = fn() return result, datetime.utcnow() - start_time if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) logger.debug('Starting export action logs worker') worker = ExportActionLogsWorker(export_action_logs_queue, poll_period_seconds=POLL_PERIOD_SECONDS) worker.start()