From d4c74bc1d39bf6e4fb9672bddcdb49ba0b41a0bc Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 28 Jan 2019 17:16:52 -0500 Subject: [PATCH 1/2] Optimize listing of logs by changing to bucket by datetime, over which we have an index --- data/logs_model/table_logs_model.py | 6 +- data/model/modelutil.py | 65 ++++++++++++++-------- endpoints/api/repository_models_pre_oci.py | 2 +- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/data/logs_model/table_logs_model.py b/data/logs_model/table_logs_model.py index 6c9d4e1cc..ad9382415 100644 --- a/data/logs_model/table_logs_model.py +++ b/data/logs_model/table_logs_model.py @@ -48,9 +48,11 @@ class TableLogsModel(ActionLogsDataInterface): ignore=filter_kinds, model=m) logs, next_page_token = model.modelutil.paginate(logs_query, m, - descending=True, page_token=page_token, + descending=True, + page_token=page_token, limit=20, - max_page=max_page_count) + max_page=max_page_count, + sort_field_name='datetime') return LogEntriesPage([Log.for_logentry(log) for log in logs], next_page_token) # First check the LogEntry3 table for the most recent logs, unless we've been expressly told diff --git a/data/model/modelutil.py b/data/model/modelutil.py index b1e6cd62b..4048e4eff 100644 --- a/data/model/modelutil.py +++ b/data/model/modelutil.py @@ -1,31 +1,39 @@ +import dateutil.parser + +from datetime import datetime + from peewee import SQL -def paginate(query, model, descending=False, page_token=None, limit=50, id_alias=None, - max_page=None): - """ Paginates the given query using an ID range, starting at the optional page_token. + +def paginate(query, model, descending=False, page_token=None, limit=50, sort_field_alias=None, + max_page=None, sort_field_name=None): + """ Paginates the given query using an field range, starting at the optional page_token. Returns a *list* of matching results along with an unencrypted page_token for the - next page, if any. If descending is set to True, orders by the ID descending rather + next page, if any. If descending is set to True, orders by the field descending rather than ascending. """ - # Note: We use the id_alias for the order_by, but not the where below. The alias is necessary - # for certain queries that use unions in MySQL, as it gets confused on which ID to order by. - # The where clause, on the other hand, cannot use the alias because Postgres does not allow - # aliases in where clauses. - id_field = model.id - if id_alias is not None: - id_field = SQL(id_alias) + # Note: We use the sort_field_alias for the order_by, but not the where below. The alias is + # necessary for certain queries that use unions in MySQL, as it gets confused on which field + # to order by. The where clause, on the other hand, cannot use the alias because Postgres does + # not allow aliases in where clauses. + sort_field_name = sort_field_name or 'id' + sort_field = getattr(model, sort_field_name) + + if sort_field_alias is not None: + sort_field_name = sort_field_alias + sort_field = SQL(sort_field_alias) if descending: - query = query.order_by(id_field.desc()) + query = query.order_by(sort_field.desc()) else: - query = query.order_by(id_field) + query = query.order_by(sort_field) - start_id = pagination_start(page_token) - if start_id is not None: + start_index = pagination_start(page_token) + if start_index is not None: if descending: - query = query.where(model.id <= start_id) + query = query.where(sort_field <= start_index) else: - query = query.where(model.id >= start_id) + query = query.where(sort_field >= start_index) query = query.limit(limit + 1) @@ -33,28 +41,37 @@ def paginate(query, model, descending=False, page_token=None, limit=50, id_alias if page_number is not None and max_page is not None and page_number > max_page: return [], None - return paginate_query(query, limit=limit, id_alias=id_alias, page_number=page_number) + return paginate_query(query, limit=limit, sort_field_name=sort_field_name, + page_number=page_number) def pagination_start(page_token=None): - """ Returns the start ID for pagination for the given page token. Will return None if None. """ + """ Returns the start index for pagination for the given page token. Will return None if None. """ if page_token is not None: - return page_token.get('start_id') - + start_index = page_token.get('start_index') + if page_token.get('is_datetime'): + start_index = dateutil.parser.parse(start_index) + return start_index return None -def paginate_query(query, limit=50, id_alias=None, page_number=None): +def paginate_query(query, limit=50, sort_field_name=None, page_number=None): """ Executes the given query and returns a page's worth of results, as well as the page token for the next page (if any). """ results = list(query) page_token = None if len(results) > limit: - start_id = getattr(results[limit], id_alias or 'id') + start_index = getattr(results[limit], sort_field_name or 'id') + is_datetime = False + if isinstance(start_index, datetime): + start_index = start_index.isoformat() + "Z" + is_datetime = True + page_token = { - 'start_id': start_id, + 'start_index': start_index, 'page_number': page_number + 1 if page_number else 1, + 'is_datetime': is_datetime, } return results[0:limit], page_token diff --git a/endpoints/api/repository_models_pre_oci.py b/endpoints/api/repository_models_pre_oci.py index 4e6409c20..690a66fc0 100644 --- a/endpoints/api/repository_models_pre_oci.py +++ b/endpoints/api/repository_models_pre_oci.py @@ -89,7 +89,7 @@ class PreOCIModel(RepositoryDataInterface): kind_filter=repo_kind) repos, next_page_token = model.modelutil.paginate_query(repo_query, limit=REPOS_PER_PAGE, - id_alias='rid') + sort_field_name='rid') # Collect the IDs of the repositories found for subequent lookup of popularity # and/or last modified. From ec9913326a4fdf4d70383b76b0c6a8b4022e50a5 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 28 Jan 2019 17:43:48 -0500 Subject: [PATCH 2/2] Change bulk logs downloader to use datetime ranges as well --- data/logs_model/table_logs_model.py | 54 +++++++++-------------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/data/logs_model/table_logs_model.py b/data/logs_model/table_logs_model.py index ad9382415..b8a6d78dc 100644 --- a/data/logs_model/table_logs_model.py +++ b/data/logs_model/table_logs_model.py @@ -16,8 +16,8 @@ from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _ logger = logging.getLogger(__name__) -MINIMUM_RANGE_SIZE = 1000 -MAXIMUM_RANGE_SIZE = 100000 +MINIMUM_RANGE_SIZE = 1 # second +MAXIMUM_RANGE_SIZE = 60 * 60 * 24 * 30 # seconds ~= 1 month EXPECTED_ITERATION_LOG_COUNT = 1000 @@ -151,40 +151,16 @@ class TableLogsModel(ActionLogsDataInterface): def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, namespace_id=None, max_query_time=None): - # Lookup the starting and ending IDs for the log range in the table. This operation is quite - # quick, so we use it as a bounding box for the later lookups. - min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_datetime, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of min ID for export logs `%s/%s` timed out with time of `%s`', - namespace_id, repository_id, elapsed) - raise LogsIterationTimeout() - - max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_datetime, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of max ID for export logs `%s/%s` timed out with time of `%s`', - namespace_id, repository_id, elapsed) - raise LogsIterationTimeout() - - min_id = min_id or 1 - max_id = max_id or 1 - - logger.info('Found log range of %s to %s for export logs `%s/%s`', min_id, max_id, - namespace_id, repository_id) - # Using an adjusting scale, start downloading log rows in batches, starting at # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() - current_start_id = min_id - current_batch_size = MINIMUM_RANGE_SIZE + current_start_datetime = start_datetime + current_batch_size = timedelta(seconds=MINIMUM_RANGE_SIZE) - while current_start_id <= max_id: + while current_start_datetime < end_datetime: # Verify we haven't been working for too long. work_elapsed = datetime.utcnow() - batch_start_time if work_elapsed > max_query_time: @@ -192,32 +168,36 @@ class TableLogsModel(ActionLogsDataInterface): namespace_id, repository_id, work_elapsed) raise LogsIterationTimeout() - id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] + current_end_datetime = current_start_datetime + current_batch_size + current_end_datetime = min(current_end_datetime, end_datetime) # Load the next set of logs. def load_logs(): - logger.debug('Retrieving logs over range %s with namespace %s and repository %s', - id_range, namespace_id, repository_id) + logger.debug('Retrieving logs over range %s-%s with namespace %s and repository %s', + current_start_datetime, current_end_datetime, namespace_id, repository_id) logs_query = model.log.get_logs_query(namespace=namespace_id, repository=repository_id, - id_range=id_range) + start_time=current_start_datetime, + end_time=current_end_datetime) return [Log.for_logentry(log) for log in logs_query] logs, elapsed = _run_and_time(load_logs) if elapsed > max_query_time: - logger.error('Retrieval of logs for export logs `%s/%s` with range `%s` timed out at `%s`', - namespace_id, repository_id, id_range, elapsed) + logger.error('Retrieval of logs for export `%s/%s` with range `%s-%s` timed out at `%s`', + namespace_id, repository_id, current_start_datetime, current_end_datetime, + elapsed) raise LogsIterationTimeout() yield logs # Move forward. - current_start_id = id_range[1] + 1 + current_start_datetime = current_end_datetime # Increase the batch size if necessary. if len(logs) < EXPECTED_ITERATION_LOG_COUNT: - current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) + seconds = min(MAXIMUM_RANGE_SIZE, current_batch_size.total_seconds() * 2) + current_batch_size = timedelta(seconds=seconds) def _run_and_time(fn):