diff --git a/data/logs_model/table_logs_model.py b/data/logs_model/table_logs_model.py index ad9382415..b8a6d78dc 100644 --- a/data/logs_model/table_logs_model.py +++ b/data/logs_model/table_logs_model.py @@ -16,8 +16,8 @@ from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _ logger = logging.getLogger(__name__) -MINIMUM_RANGE_SIZE = 1000 -MAXIMUM_RANGE_SIZE = 100000 +MINIMUM_RANGE_SIZE = 1 # second +MAXIMUM_RANGE_SIZE = 60 * 60 * 24 * 30 # seconds ~= 1 month EXPECTED_ITERATION_LOG_COUNT = 1000 @@ -151,40 +151,16 @@ class TableLogsModel(ActionLogsDataInterface): def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, namespace_id=None, max_query_time=None): - # Lookup the starting and ending IDs for the log range in the table. This operation is quite - # quick, so we use it as a bounding box for the later lookups. - min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_datetime, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of min ID for export logs `%s/%s` timed out with time of `%s`', - namespace_id, repository_id, elapsed) - raise LogsIterationTimeout() - - max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_datetime, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of max ID for export logs `%s/%s` timed out with time of `%s`', - namespace_id, repository_id, elapsed) - raise LogsIterationTimeout() - - min_id = min_id or 1 - max_id = max_id or 1 - - logger.info('Found log range of %s to %s for export logs `%s/%s`', min_id, max_id, - namespace_id, repository_id) - # Using an adjusting scale, start downloading log rows in batches, starting at # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() - current_start_id = min_id - current_batch_size = MINIMUM_RANGE_SIZE + current_start_datetime = start_datetime + current_batch_size = timedelta(seconds=MINIMUM_RANGE_SIZE) - while current_start_id <= max_id: + while current_start_datetime < end_datetime: # Verify we haven't been working for too long. work_elapsed = datetime.utcnow() - batch_start_time if work_elapsed > max_query_time: @@ -192,32 +168,36 @@ class TableLogsModel(ActionLogsDataInterface): namespace_id, repository_id, work_elapsed) raise LogsIterationTimeout() - id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] + current_end_datetime = current_start_datetime + current_batch_size + current_end_datetime = min(current_end_datetime, end_datetime) # Load the next set of logs. def load_logs(): - logger.debug('Retrieving logs over range %s with namespace %s and repository %s', - id_range, namespace_id, repository_id) + logger.debug('Retrieving logs over range %s-%s with namespace %s and repository %s', + current_start_datetime, current_end_datetime, namespace_id, repository_id) logs_query = model.log.get_logs_query(namespace=namespace_id, repository=repository_id, - id_range=id_range) + start_time=current_start_datetime, + end_time=current_end_datetime) return [Log.for_logentry(log) for log in logs_query] logs, elapsed = _run_and_time(load_logs) if elapsed > max_query_time: - logger.error('Retrieval of logs for export logs `%s/%s` with range `%s` timed out at `%s`', - namespace_id, repository_id, id_range, elapsed) + logger.error('Retrieval of logs for export `%s/%s` with range `%s-%s` timed out at `%s`', + namespace_id, repository_id, current_start_datetime, current_end_datetime, + elapsed) raise LogsIterationTimeout() yield logs # Move forward. - current_start_id = id_range[1] + 1 + current_start_datetime = current_end_datetime # Increase the batch size if necessary. if len(logs) < EXPECTED_ITERATION_LOG_COUNT: - current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) + seconds = min(MAXIMUM_RANGE_SIZE, current_batch_size.total_seconds() * 2) + current_batch_size = timedelta(seconds=seconds) def _run_and_time(fn):