import logging import os.path import json import time import uuid from datetime import datetime, timedelta from io import BytesIO from enum import Enum, unique import features from app import app, export_action_logs_queue, storage, get_app_url from data import model from endpoints.api import format_date from endpoints.api.logs_models_pre_oci import create_log from workers.queueworker import QueueWorker, JobException from util.log import logfile_path from util.useremails import send_logs_exported_email logger = logging.getLogger(__name__) POLL_PERIOD_SECONDS = 1 EXPORT_LOGS_STORAGE_PATH = app.config.get('EXPORT_ACTION_LOGS_STORAGE_PATH', 'exportedactionlogs') MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS', 60 * 60) # 1 hour MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30) EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour MINIMUM_RANGE_SIZE = 1000 MAXIMUM_RANGE_SIZE = 100000 EXPECTED_ITERATION_LOG_COUNT = 1000 @unique class ExportResult(Enum): # NOTE: Make sure to handle these in `logsexported.html` in `emails` INVALID_REQUEST = 'invalidrequest' OPERATION_TIMEDOUT = 'timedout' SUCCESSFUL_EXPORT = 'success' class ExportActionLogsWorker(QueueWorker): """ Worker which exports action logs for a namespace or a repository based on a queued request from the API. """ def process_queue_item(self, job_details): logger.info('Got export actions logs queue item: %s', job_details) # job_details block (as defined in the logs.py API endpoint): # { # 'export_id': export_id, # 'repository_id': repository.id or None, # 'namespace_id': namespace.id, # 'namespace_name': namespace.username, # 'repository_name': repository.name or None, # 'start_time': start_time, # 'end_time': end_time, # 'callback_url': callback_url or None, # 'callback_email': callback_email or None, # } export_id = job_details['export_id'] start_time = _parse_time(job_details['start_time']) end_time = _parse_time(job_details['end_time']) # Make sure the end time has the whole day. if start_time is None or end_time is None: self._report_results(job_details, ExportResult.INVALID_REQUEST) return end_time = end_time + timedelta(days=1) - timedelta(milliseconds=1) # Select the minimum and maximum IDs for the logs for the repository/namespace # over the time range. namespace_id = job_details['namespace_id'] repository_id = job_details['repository_id'] max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time, repository_id, namespace_id)) if elapsed > max_query_time: logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`', export_id, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time, repository_id, namespace_id)) if elapsed > max_query_time: logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`', export_id, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return min_id = min_id or 1 max_id = max_id or 1 logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id) # Generate a file key so that if we return an API URL, it cannot simply be constructed from # just the export ID. file_key = str(uuid.uuid4()) exported_filename = '%s-%s' % (export_id, file_key) # Start a chunked upload for the logs and stream them. upload_id, upload_metadata = storage.initiate_chunked_upload(storage.preferred_locations) export_storage_path = os.path.join(EXPORT_LOGS_STORAGE_PATH, exported_filename) logger.debug('Starting chunked upload to path `%s`', export_storage_path) # Start with a 'json' header that contains the opening bracket, as well as basic # information and the start of the `logs` array. details = { 'start_time': format_date(start_time), 'end_time': format_date(end_time), 'namespace': job_details['namespace_name'], 'repository': job_details['repository_name'], } prefix_data = """{ "export_id": "%s", "details": %s, "logs": [ """ % (export_id, json.dumps(details)) upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, 0, -1, BytesIO(str(prefix_data)), upload_metadata) uploaded_byte_count = len(prefix_data) try: # Stream the logs to storage as chunks. updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, job_details) if updated_metadata is None: storage.cancel_chunked_upload(upload_id, upload_metadata) return # Close the JSON block. suffix_data = """ {"terminator": true}] }""" upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, uploaded_byte_count, -1, BytesIO(str(suffix_data)), upload_metadata) if updated_metadata is None: storage.cancel_chunked_upload(upload_id, upload_metadata) return # Complete the upload. storage.complete_chunked_upload(storage.preferred_locations, upload_id, export_storage_path, updated_metadata) except: logger.exception('Exception when exporting logs for `%s`', export_id) storage.cancel_chunked_upload(storage.preferred_locations, upload_id, upload_metadata) raise JobException # Invoke the callbacks. export_url = storage.get_direct_download_url(storage.preferred_locations, export_storage_path, expires_in=EXPORTED_LOGS_EXPIRATION_SECONDS) if export_url is None: export_url = '%s/exportedlogs/%s' % (get_app_url(), exported_filename) self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url) def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, job_details): export_id = job_details['export_id'] max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS) max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) kinds = model.log.get_log_entry_kinds() # Using an adjusting scale, start downloading log rows in batches, starting at # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() current_start_id = min_id current_batch_size = MINIMUM_RANGE_SIZE while current_start_id <= max_id: # Verify we haven't been working for too long. work_elapsed = datetime.utcnow() - batch_start_time if work_elapsed > max_work_period: logger.error('Retrieval of logs `%s` timed out with time of `%s`', export_id, work_elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return None, None id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] # Load the next set of logs. def retrieve_and_write_logs(): namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None repository_id = job_details.get('repository_id') logger.debug('Retrieving logs over range %s with namespace %s and repository %s', id_range, namespace_id, repository_id) logs_query = model.log.get_logs_query(namespace=namespace_id, repository=repository_id, id_range=id_range) return [create_log(log) for log in logs_query] logs, elapsed = _run_and_time(retrieve_and_write_logs) if elapsed > max_query_time: logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`', export_id, id_range, elapsed) self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) return None, None # Write the logs to storage. logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range) if logs: logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ',' logs_data = logs_data.encode('utf-8') upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, uploaded_byte_count, -1, BytesIO(logs_data), upload_metadata) uploaded_byte_count += len(logs_data) # Move forward. current_start_id = id_range[1] + 1 # Increase the batch size if necessary. if len(logs) < EXPECTED_ITERATION_LOG_COUNT: current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) return upload_metadata, uploaded_byte_count def _report_results(self, job_details, result_status, exported_data_url=None): logger.info('Reporting result of `%s` for %s; %s', result_status, job_details, exported_data_url) if job_details.get('callback_url'): # Post the results to the callback URL. client = app.config['HTTPCLIENT'] result = client.post(job_details['callback_url'], json={ 'export_id': job_details['export_id'], 'start_time': job_details['start_time'], 'end_time': job_details['end_time'], 'namespace': job_details['namespace_name'], 'repository': job_details['repository_name'], 'exported_data_url': exported_data_url, 'status': result_status.value, }) if result.status_code != 200: logger.error('Got `%s` status code for callback URL `%s` for export `%s`', result.status_code, job_details['callback_url'], job_details['export_id']) raise Exception('Got non-200 for batch logs reporting; retrying later') if job_details.get('callback_email'): with app.app_context(): send_logs_exported_email(job_details['callback_email'], job_details['export_id'], result_status, exported_data_url, EXPORTED_LOGS_EXPIRATION_SECONDS) def _parse_time(specified_time): try: return datetime.strptime(specified_time + ' UTC', '%m/%d/%Y %Z') except ValueError: return None def _run_and_time(fn): start_time = datetime.utcnow() result = fn() return result, datetime.utcnow() - start_time if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) if not features.LOG_EXPORT: logger.debug('Log export not enabled; skipping') while True: time.sleep(100000) logger.debug('Starting export action logs worker') worker = ExportActionLogsWorker(export_action_logs_queue, poll_period_seconds=POLL_PERIOD_SECONDS) worker.start()