# pylint: disable=protected-access import logging import json import uuid from datetime import datetime, timedelta from tzlocal import get_localzone from dateutil.relativedelta import relativedelta from data import model from data.database import LogEntry, LogEntry2, LogEntry3 from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _format_date logger = logging.getLogger(__name__) MINIMUM_RANGE_SIZE = 1 # second MAXIMUM_RANGE_SIZE = 60 * 60 * 24 * 30 # seconds ~= 1 month EXPECTED_ITERATION_LOG_COUNT = 1000 LOG_MODELS = [LogEntry3, LogEntry2, LogEntry] class TableLogsModel(ActionLogsDataInterface): """ TableLogsModel implements the data model for the logs API backed by a single table in the database. """ def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None, namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None): assert start_datetime is not None assert end_datetime is not None repository = None if repository_name and namespace_name: repository = model.repository.get_repository(namespace_name, repository_name) performer = None if performer_name: performer = model.user.get_user(performer_name) def get_logs(m): logs_query = model.log.get_logs_query(start_datetime, end_datetime, performer=performer, repository=repository, namespace=namespace_name, ignore=filter_kinds, model=m) logs, next_page_token = model.modelutil.paginate(logs_query, m, descending=True, page_token=page_token, limit=20, max_page=max_page_count, sort_field_name='datetime') return LogEntriesPage([Log.for_logentry(log) for log in logs], next_page_token) # First check the LogEntry3 table for the most recent logs, unless we've been expressly told # to look inside the other tables. TOKEN_TABLE_ID = 'tti' table_index = 0 table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None if table_specified: table_index = page_token.get(TOKEN_TABLE_ID) page_result = get_logs(LOG_MODELS[table_index]) if page_result.next_page_token is None and table_index < len(LOG_MODELS) - 1: page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1}) return page_result def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None, repository_name=None, namespace_name=None, filter_kinds=None): if end_datetime - start_datetime >= timedelta(weeks=4): raise Exception('Cannot lookup aggregated logs over a period longer than a month') repository = None if repository_name and namespace_name: repository = model.repository.get_repository(namespace_name, repository_name) performer = None if performer_name: performer = model.user.get_user(performer_name) entries = {} for log_model in LOG_MODELS: aggregated = model.log.get_aggregated_logs(start_datetime, end_datetime, performer=performer, repository=repository, namespace=namespace_name, ignore=filter_kinds, model=log_model) for entry in aggregated: synthetic_date = datetime(start_datetime.year, start_datetime.month, int(entry.day), tzinfo=get_localzone()) if synthetic_date.day < start_datetime.day: synthetic_date = synthetic_date + relativedelta(months=1) key = '%s-%s' % (entry.kind_id, entry.day) if key in entries: entries[key] = AggregatedLogCount(entry.kind_id, entry.count + entries[key].count, synthetic_date) else: entries[key] = AggregatedLogCount(entry.kind_id, entry.count, synthetic_date) return entries.values() def count_repository_actions(self, repository, day): return model.repositoryactioncount.count_repository_actions(repository, day) def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None, repository=None, repository_name=None, timestamp=None): if repository_name is not None: assert repository is None assert namespace_name is not None repository = model.repository.get_repository(namespace_name, repository_name) model.log.log_action(kind_name, namespace_name, performer=performer, repository=repository, ip=ip, metadata=metadata or {}, timestamp=timestamp) def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue, namespace_name=None, repository_name=None, callback_url=None, callback_email=None, filter_kinds=None): export_id = str(uuid.uuid4()) namespace = model.user.get_namespace_user(namespace_name) if namespace is None: return None repository = None if repository_name is not None: repository = model.repository.get_repository(namespace_name, repository_name) if repository is None: return None export_action_logs_queue.put([namespace_name], json.dumps({ 'export_id': export_id, 'repository_id': repository.id if repository else None, 'namespace_id': namespace.id, 'namespace_name': namespace.username, 'repository_name': repository.name if repository else None, 'start_time': _format_date(start_datetime), 'end_time': _format_date(end_datetime), 'callback_url': callback_url, 'callback_email': callback_email, }), retries_remaining=3) return export_id def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, namespace_id=None, max_query_time=None): # Using an adjusting scale, start downloading log rows in batches, starting at # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() current_start_datetime = start_datetime current_batch_size = timedelta(seconds=MINIMUM_RANGE_SIZE) while current_start_datetime < end_datetime: # Verify we haven't been working for too long. work_elapsed = datetime.utcnow() - batch_start_time if work_elapsed > max_query_time: logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`', namespace_id, repository_id, work_elapsed) raise LogsIterationTimeout() current_end_datetime = current_start_datetime + current_batch_size current_end_datetime = min(current_end_datetime, end_datetime) # Load the next set of logs. def load_logs(): logger.debug('Retrieving logs over range %s-%s with namespace %s and repository %s', current_start_datetime, current_end_datetime, namespace_id, repository_id) logs_query = model.log.get_logs_query(namespace=namespace_id, repository=repository_id, start_time=current_start_datetime, end_time=current_end_datetime) return [Log.for_logentry(log) for log in logs_query] logs, elapsed = _run_and_time(load_logs) if elapsed > max_query_time: logger.error('Retrieval of logs for export `%s/%s` with range `%s-%s` timed out at `%s`', namespace_id, repository_id, current_start_datetime, current_end_datetime, elapsed) raise LogsIterationTimeout() yield logs # Move forward. current_start_datetime = current_end_datetime # Increase the batch size if necessary. if len(logs) < EXPECTED_ITERATION_LOG_COUNT: seconds = min(MAXIMUM_RANGE_SIZE, current_batch_size.total_seconds() * 2) current_batch_size = timedelta(seconds=seconds) def _run_and_time(fn): start_time = datetime.utcnow() result = fn() return result, datetime.utcnow() - start_time table_logs_model = TableLogsModel()