From b773a18ed822cc005a5be2ac47c3dbc6fbc96ef0 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 8 Jan 2019 14:03:28 -0500 Subject: [PATCH] Interface out all action log data model operations This will allow us to reimplement the logs data model against a non-database system in the near future --- data/logs_model/__init__.py | 20 ++ data/logs_model/datatypes.py | 120 +++++++++ data/logs_model/interface.py | 62 +++++ data/logs_model/table_logs_model.py | 227 +++++++++++++++++ data/model/repositoryactioncount.py | 26 +- endpoints/api/__init__.py | 18 +- endpoints/api/__init__models_pre_oci.py | 15 +- endpoints/api/logs.py | 228 +++++++++--------- endpoints/api/logs_models_interface.py | 141 ----------- endpoints/api/logs_models_pre_oci.py | 122 ---------- endpoints/api/superuser.py | 40 +-- endpoints/api/superuser_models_interface.py | 108 --------- endpoints/api/superuser_models_pre_oci.py | 44 +--- .../api/test/test_logs_models_pre_oci.py | 131 ---------- endpoints/appr/registry.py | 27 ++- endpoints/building.py | 5 +- endpoints/keyserver/__init__.py | 7 +- endpoints/keyserver/models_interface.py | 7 - endpoints/keyserver/models_pre_oci.py | 4 - initdb.py | 111 ++++----- test/test_api_usage.py | 9 +- util/audit.py | 6 +- util/generatepresharedkey.py | 6 +- workers/exportactionlogsworker.py | 109 ++------- workers/repositoryactioncounter.py | 13 +- workers/test/test_exportactionlogsworker.py | 10 +- 26 files changed, 714 insertions(+), 902 deletions(-) create mode 100644 data/logs_model/__init__.py create mode 100644 data/logs_model/datatypes.py create mode 100644 data/logs_model/interface.py create mode 100644 data/logs_model/table_logs_model.py delete mode 100644 endpoints/api/logs_models_interface.py delete mode 100644 endpoints/api/logs_models_pre_oci.py delete mode 100644 endpoints/api/test/test_logs_models_pre_oci.py diff --git a/data/logs_model/__init__.py b/data/logs_model/__init__.py new file mode 100644 index 000000000..d452e499c --- /dev/null +++ b/data/logs_model/__init__.py @@ -0,0 +1,20 @@ +import os +import logging + +from data.logs_model.table_logs_model import table_logs_model + +logger = logging.getLogger(__name__) + + +class LogsModelProxy(object): + def __init__(self): + self._model = table_logs_model + + def __getattr__(self, attr): + return getattr(self._model, attr) + + +logs_model = LogsModelProxy() +logger.info('===============================') +logger.info('Using logs model `%s`', logs_model._model) +logger.info('===============================') diff --git a/data/logs_model/datatypes.py b/data/logs_model/datatypes.py new file mode 100644 index 000000000..a11725563 --- /dev/null +++ b/data/logs_model/datatypes.py @@ -0,0 +1,120 @@ +import json + +from calendar import timegm +from collections import namedtuple +from email.utils import formatdate + +from cachetools import lru_cache + +from data import model +from util.morecollections import AttrDict + + +def _format_date(date): + """ Output an RFC822 date format. """ + if date is None: + return None + + return formatdate(timegm(date.utctimetuple())) + + +@lru_cache(maxsize=1) +def _kinds(): + return model.log.get_log_entry_kinds() + + +class LogEntriesPage(namedtuple('LogEntriesPage', ['logs', 'next_page_token'])): + """ Represents a page returned by the lookup_logs call. The `logs` contains the logs + found for the page and `next_page_token`, if not None, contains the token to be + encoded and returned for the followup call. + """ + + +class Log(namedtuple('Log', [ + 'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot', + 'account_organization', 'account_username', 'account_email', 'account_robot', 'kind_id'])): + """ Represents a single log entry returned by the logs model. """ + + @classmethod + def for_logentry(cls, log): + account_organization = None + account_username = None + account_email = None + account_robot = None + try: + account_organization = log.account.organization + account_username = log.account.username + account_email = log.account.email + account_robot = log.account.robot + except AttributeError: + pass + + performer_robot = None + performer_username = None + performer_email = None + + try: + performer_robot = log.performer.robot + performer_username = log.performer.username + performer_email = log.performer.email + except AttributeError: + pass + + return Log(log.metadata_json, log.ip, log.datetime, performer_email, performer_username, + performer_robot, account_organization, account_username, + account_email, account_robot, log.kind_id) + + def to_dict(self, avatar, include_namespace=False): + view = { + 'kind': _kinds()[self.kind_id], + 'metadata': json.loads(self.metadata_json), + 'ip': self.ip, + 'datetime': _format_date(self.datetime), + } + + if self.performer_username: + performer = AttrDict({'username': self.performer_username, 'email': self.performer_email}) + performer.robot = None + if self.performer_robot: + performer.robot = self.performer_robot + + view['performer'] = { + 'kind': 'user', + 'name': self.performer_username, + 'is_robot': self.performer_robot, + 'avatar': avatar.get_data_for_user(performer), + } + + if include_namespace: + if self.account_username: + account = AttrDict({'username': self.account_username, 'email': self.account_email}) + if self.account_organization: + + view['namespace'] = { + 'kind': 'org', + 'name': self.account_username, + 'avatar': avatar.get_data_for_org(account), + } + else: + account.robot = None + if self.account_robot: + account.robot = self.account_robot + view['namespace'] = { + 'kind': 'user', + 'name': self.account_username, + 'avatar': avatar.get_data_for_user(account), + } + + return view + + +class AggregatedLogCount(namedtuple('AggregatedLogCount', ['kind_id', 'count', 'datetime'])): + """ Represents the aggregated count of the number of logs, of a particular kind, on a day. """ + def to_dict(self): + view = { + 'kind': _kinds()[self.kind_id], + 'count': self.count, + 'datetime': _format_date(self.datetime), + } + + return view diff --git a/data/logs_model/interface.py b/data/logs_model/interface.py new file mode 100644 index 000000000..499da019e --- /dev/null +++ b/data/logs_model/interface.py @@ -0,0 +1,62 @@ +from abc import ABCMeta, abstractmethod +from six import add_metaclass + +class LogsIterationTimeout(Exception): + """ Exception raised if logs iteration times out. """ + + +@add_metaclass(ABCMeta) +class ActionLogsDataInterface(object): + """ Interface for code to work with the logs data model. The logs data model consists + of all access for reading and writing action logs. + """ + @abstractmethod + def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None, + namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None): + """ Looks up all logs between the start_datetime and end_datetime, filtered + by performer (a user), repository or namespace. Note that one (and only one) of the three + can be specified. Returns a LogEntriesPage. filter, if specified, is a set/list of the + kinds of logs to filter. + """ + + @abstractmethod + def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None, + repository_name=None, namespace_name=None, filter_kinds=None): + """ Returns the aggregated count of logs, by kind, between the start_datetime and end_datetime, + filtered by performer (a user), repository or namespace. Note that one (and only one) of + the three can be specified. Returns a list of AggregatedLogCount. + """ + + @abstractmethod + def count_repository_actions(self, repository, day): + """ Returns the total number of repository actions over the given day, in the given repository + or None on error. + """ + + @abstractmethod + def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue, + namespace_name=None, repository_name=None, callback_url=None, + callback_email=None, filter_kinds=None): + """ Queues logs between the start_datetime and end_time, filtered by a repository or namespace, + for export to the specified URL and/or email address. Returns the ID of the export job + queued or None if error. + """ + + @abstractmethod + def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None, + repository=None, repository_name=None, timestamp=None): + """ Logs a single action as having taken place. """ + + @abstractmethod + def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, + namespace_id=None, max_query_time=None): + """ Returns an iterator that yields bundles of all logs found between the start_datetime and + end_datetime, optionally filtered by the repository or namespace. This function should be + used for any bulk lookup operations, and should be implemented by implementors to put + minimal strain on the backing storage for large operations. If there was an error in setting + up, returns None. + + If max_query_time is specified, each iteration that yields a log bundle will have its + queries run with a maximum timeout of that specified, and, if any exceed that threshold, + LogsIterationTimeout will be raised instead of returning the logs bundle. + """ diff --git a/data/logs_model/table_logs_model.py b/data/logs_model/table_logs_model.py new file mode 100644 index 000000000..6c9d4e1cc --- /dev/null +++ b/data/logs_model/table_logs_model.py @@ -0,0 +1,227 @@ +# pylint: disable=protected-access + +import logging +import json +import uuid + +from datetime import datetime, timedelta + +from tzlocal import get_localzone +from dateutil.relativedelta import relativedelta + +from data import model +from data.database import LogEntry, LogEntry2, LogEntry3 +from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout +from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _format_date + +logger = logging.getLogger(__name__) + +MINIMUM_RANGE_SIZE = 1000 +MAXIMUM_RANGE_SIZE = 100000 +EXPECTED_ITERATION_LOG_COUNT = 1000 + + +LOG_MODELS = [LogEntry3, LogEntry2, LogEntry] + + +class TableLogsModel(ActionLogsDataInterface): + """ + TableLogsModel implements the data model for the logs API backed by a single table + in the database. + """ + def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None, + namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None): + assert start_datetime is not None + assert end_datetime is not None + + repository = None + if repository_name and namespace_name: + repository = model.repository.get_repository(namespace_name, repository_name) + + performer = None + if performer_name: + performer = model.user.get_user(performer_name) + + def get_logs(m): + logs_query = model.log.get_logs_query(start_datetime, end_datetime, performer=performer, + repository=repository, namespace=namespace_name, + ignore=filter_kinds, model=m) + + logs, next_page_token = model.modelutil.paginate(logs_query, m, + descending=True, page_token=page_token, + limit=20, + max_page=max_page_count) + return LogEntriesPage([Log.for_logentry(log) for log in logs], next_page_token) + + # First check the LogEntry3 table for the most recent logs, unless we've been expressly told + # to look inside the other tables. + TOKEN_TABLE_ID = 'tti' + + table_index = 0 + table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None + if table_specified: + table_index = page_token.get(TOKEN_TABLE_ID) + + page_result = get_logs(LOG_MODELS[table_index]) + if page_result.next_page_token is None and table_index < len(LOG_MODELS) - 1: + page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1}) + + return page_result + + def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None, + repository_name=None, namespace_name=None, filter_kinds=None): + if end_datetime - start_datetime >= timedelta(weeks=4): + raise Exception('Cannot lookup aggregated logs over a period longer than a month') + + repository = None + if repository_name and namespace_name: + repository = model.repository.get_repository(namespace_name, repository_name) + + performer = None + if performer_name: + performer = model.user.get_user(performer_name) + + entries = {} + for log_model in LOG_MODELS: + aggregated = model.log.get_aggregated_logs(start_datetime, end_datetime, + performer=performer, + repository=repository, + namespace=namespace_name, + ignore=filter_kinds, + model=log_model) + + for entry in aggregated: + synthetic_date = datetime(start_datetime.year, start_datetime.month, int(entry.day), + tzinfo=get_localzone()) + if synthetic_date.day < start_datetime.day: + synthetic_date = synthetic_date + relativedelta(months=1) + + key = '%s-%s' % (entry.kind_id, entry.day) + + if key in entries: + entries[key] = AggregatedLogCount(entry.kind_id, entry.count + entries[key].count, + synthetic_date) + else: + entries[key] = AggregatedLogCount(entry.kind_id, entry.count, synthetic_date) + + return entries.values() + + def count_repository_actions(self, repository, day): + return model.repositoryactioncount.count_repository_actions(repository, day) + + def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None, + repository=None, repository_name=None, timestamp=None): + if repository_name is not None: + assert repository is None + assert namespace_name is not None + repository = model.repository.get_repository(namespace_name, repository_name) + + model.log.log_action(kind_name, namespace_name, performer=performer, repository=repository, + ip=ip, metadata=metadata or {}, timestamp=timestamp) + + def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue, + namespace_name=None, repository_name=None, callback_url=None, + callback_email=None, filter_kinds=None): + export_id = str(uuid.uuid4()) + namespace = model.user.get_namespace_user(namespace_name) + if namespace is None: + return None + + repository = None + if repository_name is not None: + repository = model.repository.get_repository(namespace_name, repository_name) + if repository is None: + return None + + export_action_logs_queue.put([namespace_name], json.dumps({ + 'export_id': export_id, + 'repository_id': repository.id if repository else None, + 'namespace_id': namespace.id, + 'namespace_name': namespace.username, + 'repository_name': repository.name if repository else None, + 'start_time': _format_date(start_datetime), + 'end_time': _format_date(end_datetime), + 'callback_url': callback_url, + 'callback_email': callback_email, + }), retries_remaining=3) + + return export_id + + def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, + namespace_id=None, max_query_time=None): + # Lookup the starting and ending IDs for the log range in the table. This operation is quite + # quick, so we use it as a bounding box for the later lookups. + min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_datetime, + repository_id, + namespace_id)) + if elapsed > max_query_time: + logger.error('Retrieval of min ID for export logs `%s/%s` timed out with time of `%s`', + namespace_id, repository_id, elapsed) + raise LogsIterationTimeout() + + max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_datetime, + repository_id, + namespace_id)) + if elapsed > max_query_time: + logger.error('Retrieval of max ID for export logs `%s/%s` timed out with time of `%s`', + namespace_id, repository_id, elapsed) + raise LogsIterationTimeout() + + min_id = min_id or 1 + max_id = max_id or 1 + + logger.info('Found log range of %s to %s for export logs `%s/%s`', min_id, max_id, + namespace_id, repository_id) + + # Using an adjusting scale, start downloading log rows in batches, starting at + # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or + # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes + # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. + batch_start_time = datetime.utcnow() + + current_start_id = min_id + current_batch_size = MINIMUM_RANGE_SIZE + + while current_start_id <= max_id: + # Verify we haven't been working for too long. + work_elapsed = datetime.utcnow() - batch_start_time + if work_elapsed > max_query_time: + logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`', + namespace_id, repository_id, work_elapsed) + raise LogsIterationTimeout() + + id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] + + # Load the next set of logs. + def load_logs(): + logger.debug('Retrieving logs over range %s with namespace %s and repository %s', + id_range, namespace_id, repository_id) + + logs_query = model.log.get_logs_query(namespace=namespace_id, + repository=repository_id, + id_range=id_range) + return [Log.for_logentry(log) for log in logs_query] + + logs, elapsed = _run_and_time(load_logs) + if elapsed > max_query_time: + logger.error('Retrieval of logs for export logs `%s/%s` with range `%s` timed out at `%s`', + namespace_id, repository_id, id_range, elapsed) + raise LogsIterationTimeout() + + yield logs + + # Move forward. + current_start_id = id_range[1] + 1 + + # Increase the batch size if necessary. + if len(logs) < EXPECTED_ITERATION_LOG_COUNT: + current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) + + +def _run_and_time(fn): + start_time = datetime.utcnow() + result = fn() + return result, datetime.utcnow() - start_time + + +table_logs_model = TableLogsModel() diff --git a/data/model/repositoryactioncount.py b/data/model/repositoryactioncount.py index b9bd76b21..5c223f477 100644 --- a/data/model/repositoryactioncount.py +++ b/data/model/repositoryactioncount.py @@ -44,30 +44,34 @@ def find_uncounted_repository(): return None -def count_repository_actions(to_count): - """ Aggregates repository actions from the LogEntry table for the last day and writes them to - the RepositoryActionCount table. Return True if the repository was updated and False - otherwise. +def count_repository_actions(to_count, day): + """ Aggregates repository actions from the LogEntry table for the specified day. Returns the + count or None on error. """ - today = date.today() - yesterday = today - timedelta(days=1) - # TODO(LogMigrate): Remove the branch once we're back on a single table. def lookup_action_count(model): return (model .select() .where(model.repository == to_count, - model.datetime >= yesterday, - model.datetime < today) + model.datetime >= day, + model.datetime < (day + timedelta(days=1))) .count()) actions = (lookup_action_count(LogEntry3) + lookup_action_count(LogEntry2) + lookup_action_count(LogEntry)) + + return actions + + +def store_repository_action_count(repository, day, action_count): + """ Stores the action count for a repository for a specific day. Returns False if the + repository already has an entry for the specified day. + """ try: - RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions) + RepositoryActionCount.create(repository=repository, date=day, count=action_count) return True except IntegrityError: - logger.debug('Count already written for repository %s', to_count.id) + logger.debug('Count already written for repository %s', repository.id) return False diff --git a/endpoints/api/__init__.py b/endpoints/api/__init__.py index e6162c02e..47daa6b45 100644 --- a/endpoints/api/__init__.py +++ b/endpoints/api/__init__.py @@ -18,6 +18,8 @@ from auth import scopes from auth.auth_context import (get_authenticated_context, get_authenticated_user, get_validated_oauth_token) from auth.decorators import process_oauth +from data.logs_model import logs_model +from data import model as data_model from endpoints.csrf import csrf_protect from endpoints.exception import (Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired, NotFound) @@ -365,7 +367,7 @@ def request_error(exception=None, **kwargs): def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None): if not metadata: metadata = {} - + oauth_token = get_validated_oauth_token() if oauth_token: metadata['oauth_token_id'] = oauth_token.id @@ -373,11 +375,15 @@ def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None): metadata['oauth_token_application'] = oauth_token.application.name performer = get_authenticated_user() - - if repo: - repo_name = repo.name - - model.log_action(kind, user_or_orgname, repo_name, performer, request.remote_addr, metadata) + + if repo_name is not None: + repo = data_model.repository.get_repository(user_or_orgname, repo_name) + + logs_model.log_action(kind, user_or_orgname, + repository=repo, + performer=performer, + ip=request.remote_addr, + metadata=metadata) def define_json_response(schema_name): diff --git a/endpoints/api/__init__models_pre_oci.py b/endpoints/api/__init__models_pre_oci.py index 8db474380..f14e7267c 100644 --- a/endpoints/api/__init__models_pre_oci.py +++ b/endpoints/api/__init__models_pre_oci.py @@ -1,18 +1,19 @@ from __init__models_interface import InitDataInterface from data import model +from data.logs_model import logs_model class PreOCIModel(InitDataInterface): - def is_app_repository(self, namespace_name, repository_name): - return model.repository.get_repository(namespace_name, repository_name, kind_filter='application') is not None - + return model.repository.get_repository(namespace_name, repository_name, + kind_filter='application') is not None + def repository_is_public(self, namespace_name, repository_name): return model.repository.repository_is_public(namespace_name, repository_name) - + def log_action(self, kind, namespace_name, repository_name, performer, ip, metadata): repository = model.repository.get_repository(namespace_name, repository_name) - model.log.log_action(kind, namespace_name, performer=performer, ip=ip, metadata=metadata, repository=repository) + logs_model.log_action(kind, namespace_name, performer=performer, ip=ip, metadata=metadata, + repository=repository) - -pre_oci_model = PreOCIModel() \ No newline at end of file +pre_oci_model = PreOCIModel() diff --git a/endpoints/api/logs.py b/endpoints/api/logs.py index 0cae6b095..899897f19 100644 --- a/endpoints/api/logs.py +++ b/endpoints/api/logs.py @@ -1,81 +1,70 @@ """ Access usage logs for organizations or repositories. """ -import json -import uuid - from datetime import datetime, timedelta from flask import request import features -from app import export_action_logs_queue +from app import app, export_action_logs_queue, avatar +from auth.permissions import AdministerOrganizationPermission +from auth.auth_context import get_authenticated_user +from auth import scopes +from data.logs_model import logs_model +from data.registry_model import registry_model from endpoints.api import (resource, nickname, ApiResource, query_param, parse_args, RepositoryParamResource, require_repo_admin, related_user_resource, format_date, require_user_admin, path_param, require_scope, page_support, validate_json_request, InvalidRequest, show_if) -from data import model as data_model -from endpoints.api.logs_models_pre_oci import pre_oci_model as model from endpoints.exception import Unauthorized, NotFound -from auth.permissions import AdministerOrganizationPermission -from auth.auth_context import get_authenticated_user -from auth import scopes + LOGS_PER_PAGE = 20 SERVICE_LEVEL_LOG_KINDS = set(['service_key_create', 'service_key_approve', 'service_key_delete', 'service_key_modify', 'service_key_extend', 'service_key_rotate']) +def _parse_datetime(dt_string): + if not dt_string: + return None + + try: + return datetime.strptime(dt_string + ' UTC', '%m/%d/%Y %Z') + except ValueError: + return None + + def _validate_logs_arguments(start_time, end_time): - if start_time: - try: - start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z') - except ValueError: - start_time = None - - if not start_time: - start_time = datetime.today() - timedelta(7) # One week - - if end_time: - try: - end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z') - end_time = end_time + timedelta(days=1) - except ValueError: - end_time = None - - if not end_time: - end_time = datetime.today() - + start_time = _parse_datetime(start_time) or (datetime.today() - timedelta(days=1)) + end_time = _parse_datetime(end_time) or datetime.today() + end_time = end_time + timedelta(days=1) return start_time, end_time -def get_logs(start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, - page_token=None, ignore=None): +def _get_logs(start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, + page_token=None, filter_kinds=None): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) - - kinds = model.get_log_entry_kinds() - log_entry_page = model.get_logs_query(start_time, end_time, performer_name, repository_name, - namespace_name, ignore, page_token) - + log_entry_page = logs_model.lookup_logs(start_time, end_time, performer_name, repository_name, + namespace_name, filter_kinds, page_token, + app.config['ACTION_LOG_MAX_PAGE']) include_namespace = namespace_name is None and repository_name is None - return { 'start_time': format_date(start_time), 'end_time': format_date(end_time), - 'logs': [log.to_dict(kinds, include_namespace) for log in log_entry_page.logs], + 'logs': [log.to_dict(avatar, include_namespace) for log in log_entry_page.logs], }, log_entry_page.next_page_token -def get_aggregate_logs(start_time, end_time, performer_name=None, repository=None, namespace=None, - ignore=None): +def _get_aggregate_logs(start_time, end_time, performer_name=None, repository=None, namespace=None, + filter_kinds=None): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) - - kinds = model.get_log_entry_kinds() - aggregated_logs = model.get_aggregated_logs(start_time, end_time, performer_name=performer_name, - repository_name=repository, namespace_name=namespace, - ignore=ignore) + aggregated_logs = logs_model.get_aggregated_log_counts(start_time, end_time, + performer_name=performer_name, + repository_name=repository, + namespace_name=namespace, + filter_kinds=filter_kinds) return { - 'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs] + 'aggregated': [log.to_dict() for log in aggregated_logs] } @@ -87,18 +76,20 @@ class RepositoryLogs(RepositoryParamResource): @require_repo_admin @nickname('listRepoLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. The time should be formatted "%m/%d/%Y" in UTC.', type=str) - @query_param('endtime', 'Latest time to which to get logs. The time should be formatted "%m/%d/%Y" in UTC.', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @page_support() def get(self, namespace, repository, page_token, parsed_args): """ List the logs for the specified repository. """ - if model.repo_exists(namespace, repository) is False: + if registry_model.lookup_repository(namespace, repository) is None: raise NotFound() start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return get_logs(start_time, end_time, repository_name=repository, page_token=page_token, - namespace_name=namespace) + return _get_logs(start_time, end_time, + repository_name=repository, + page_token=page_token, + namespace_name=namespace) @resource('/v1/user/logs') @@ -108,8 +99,8 @@ class UserLogs(ApiResource): @require_user_admin @nickname('listUserLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @page_support() def get(self, parsed_args, page_token): @@ -119,9 +110,11 @@ class UserLogs(ApiResource): end_time = parsed_args['endtime'] user = get_authenticated_user() - return get_logs(start_time, end_time, performer_name=performer_name, - namespace_name=user.username, page_token=page_token, - ignore=SERVICE_LEVEL_LOG_KINDS) + return _get_logs(start_time, end_time, + performer_name=performer_name, + namespace_name=user.username, + page_token=page_token, + filter_kinds=SERVICE_LEVEL_LOG_KINDS) @resource('/v1/organization//logs') @@ -132,8 +125,8 @@ class OrgLogs(ApiResource): @nickname('listOrgLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @page_support() @require_scope(scopes.ORG_ADMIN) @@ -145,8 +138,10 @@ class OrgLogs(ApiResource): start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return get_logs(start_time, end_time, namespace_name=orgname, performer_name=performer_name, - page_token=page_token, ignore=SERVICE_LEVEL_LOG_KINDS) + return _get_logs(start_time, end_time, + namespace_name=orgname, + performer_name=performer_name, + page_token=page_token) raise Unauthorized() @@ -160,16 +155,18 @@ class RepositoryAggregateLogs(RepositoryParamResource): @require_repo_admin @nickname('getAggregateRepoLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) def get(self, namespace, repository, parsed_args): """ Returns the aggregated logs for the specified repository. """ - if model.repo_exists(namespace, repository) is False: + if registry_model.lookup_repository(namespace, repository) is None: raise NotFound() start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return get_aggregate_logs(start_time, end_time, repository=repository, namespace=namespace) + return _get_aggregate_logs(start_time, end_time, + repository=repository, + namespace=namespace) @resource('/v1/user/aggregatelogs') @@ -180,8 +177,8 @@ class UserAggregateLogs(ApiResource): @require_user_admin @nickname('getAggregateUserLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) def get(self, parsed_args): """ Returns the aggregated logs for the current user. """ @@ -190,8 +187,10 @@ class UserAggregateLogs(ApiResource): end_time = parsed_args['endtime'] user = get_authenticated_user() - return get_aggregate_logs(start_time, end_time, performer_name=performer_name, - namespace=user.username, ignore=SERVICE_LEVEL_LOG_KINDS) + return _get_aggregate_logs(start_time, end_time, + performer_name=performer_name, + namespace=user.username, + filter_kinds=SERVICE_LEVEL_LOG_KINDS) @resource('/v1/organization//aggregatelogs') @@ -203,8 +202,8 @@ class OrgAggregateLogs(ApiResource): @nickname('getAggregateOrgLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @require_scope(scopes.ORG_ADMIN) def get(self, orgname, parsed_args): @@ -215,46 +214,13 @@ class OrgAggregateLogs(ApiResource): start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return get_aggregate_logs(start_time, end_time, namespace=orgname, - performer_name=performer_name, ignore=SERVICE_LEVEL_LOG_KINDS) + return _get_aggregate_logs(start_time, end_time, + namespace=orgname, + performer_name=performer_name) raise Unauthorized() -def queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None): - export_id = str(uuid.uuid4()) - namespace = data_model.user.get_namespace_user(namespace_name) - if namespace is None: - raise InvalidRequest('Unknown namespace') - - repository = None - if repository_name is not None: - repository = data_model.repository.get_repository(namespace_name, repository_name) - if repository is None: - raise InvalidRequest('Unknown repository') - - callback_url = options.get('callback_url') - if callback_url: - if not callback_url.startswith('https://') and not callback_url.startswith('http://'): - raise InvalidRequest('Invalid callback URL') - - export_action_logs_queue.put([namespace_name], json.dumps({ - 'export_id': export_id, - 'repository_id': repository.id if repository else None, - 'namespace_id': namespace.id, - 'namespace_name': namespace.username, - 'repository_name': repository.name if repository else None, - 'start_time': start_time, - 'end_time': end_time, - 'callback_url': callback_url, - 'callback_email': options.get('callback_email'), - }), retries_remaining=3) - - return { - 'export_id': export_id, - } - - EXPORT_LOGS_SCHEMA = { 'type': 'object', 'description': 'Configuration for an export logs operation', @@ -271,6 +237,27 @@ EXPORT_LOGS_SCHEMA = { } +def _queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None): + callback_url = options.get('callback_url') + if callback_url: + if not callback_url.startswith('https://') and not callback_url.startswith('http://'): + raise InvalidRequest('Invalid callback URL') + + callback_email = options.get('callback_email') + if callback_email: + if callback_email.find('@') < 0: + raise InvalidRequest('Invalid callback e-mail') + + (start_time, end_time) = _validate_logs_arguments(start_time, end_time) + export_id = logs_model.queue_logs_export(start_time, end_time, export_action_logs_queue, + namespace_name, repository_name, callback_url, + callback_email) + if export_id is None: + raise InvalidRequest('Invalid export request') + + return export_id + + @resource('/v1/repository//exportlogs') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class ExportRepositoryLogs(RepositoryParamResource): @@ -282,18 +269,21 @@ class ExportRepositoryLogs(RepositoryParamResource): @require_repo_admin @nickname('exportRepoLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @validate_json_request('ExportLogs') def post(self, namespace, repository, parsed_args): """ Queues an export of the logs for the specified repository. """ - if model.repo_exists(namespace, repository) is False: + if registry_model.lookup_repository(namespace, repository) is None: raise NotFound() start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return queue_logs_export(start_time, end_time, request.get_json(), namespace, - repository_name=repository) + export_id = _queue_logs_export(start_time, end_time, request.get_json(), namespace, + repository_name=repository) + return { + 'export_id': export_id, + } @resource('/v1/user/exportlogs') @@ -306,8 +296,8 @@ class ExportUserLogs(ApiResource): @require_user_admin @nickname('exportUserLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @validate_json_request('ExportLogs') def post(self, parsed_args): """ Returns the aggregated logs for the current user. """ @@ -315,7 +305,10 @@ class ExportUserLogs(ApiResource): end_time = parsed_args['endtime'] user = get_authenticated_user() - return queue_logs_export(start_time, end_time, request.get_json(), user.username) + export_id = _queue_logs_export(start_time, end_time, request.get_json(), user.username) + return { + 'export_id': export_id, + } @resource('/v1/organization//exportlogs') @@ -330,8 +323,8 @@ class ExportOrgLogs(ApiResource): @nickname('exportOrgLogs') @parse_args() - @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) - @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) + @query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) + @query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @require_scope(scopes.ORG_ADMIN) @validate_json_request('ExportLogs') def post(self, orgname, parsed_args): @@ -341,6 +334,9 @@ class ExportOrgLogs(ApiResource): start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] - return queue_logs_export(start_time, end_time, request.get_json(), orgname) + export_id = _queue_logs_export(start_time, end_time, request.get_json(), orgname) + return { + 'export_id': export_id, + } raise Unauthorized() diff --git a/endpoints/api/logs_models_interface.py b/endpoints/api/logs_models_interface.py deleted file mode 100644 index e8db6960e..000000000 --- a/endpoints/api/logs_models_interface.py +++ /dev/null @@ -1,141 +0,0 @@ -import json -from abc import ABCMeta, abstractmethod -from collections import namedtuple - -from datetime import datetime - -from dateutil.relativedelta import relativedelta -from six import add_metaclass -from tzlocal import get_localzone - -from app import avatar -from endpoints.api import format_date -from util.morecollections import AttrDict - - -class LogEntry( - namedtuple('LogEntry', [ - 'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot', - 'account_organization', 'account_username', 'account_email', 'account_robot', 'kind_id' - ])): - """ - LogEntry a single log entry. - :type metadata_json: string - :type ip: string - :type datetime: string - :type performer_email: int - :type performer_username: string - :type performer_robot: boolean - :type account_organization: boolean - :type account_username: string - :type account_email: string - :type account_robot: boolean - :type kind_id: int - """ - - def to_dict(self, kinds, include_namespace): - view = { - 'kind': kinds[self.kind_id], - 'metadata': json.loads(self.metadata_json), - 'ip': self.ip, - 'datetime': format_date(self.datetime), - } - - if self.performer_username: - performer = AttrDict({'username': self.performer_username, 'email': self.performer_email}) - performer.robot = None - if self.performer_robot: - performer.robot = self.performer_robot - - view['performer'] = { - 'kind': 'user', - 'name': self.performer_username, - 'is_robot': self.performer_robot, - 'avatar': avatar.get_data_for_user(performer), - } - - if include_namespace: - if self.account_username: - account = AttrDict({'username': self.account_username, 'email': self.account_email}) - if self.account_organization: - - view['namespace'] = { - 'kind': 'org', - 'name': self.account_username, - 'avatar': avatar.get_data_for_org(account), - } - else: - account.robot = None - if self.account_robot: - account.robot = self.account_robot - view['namespace'] = { - 'kind': 'user', - 'name': self.account_username, - 'avatar': avatar.get_data_for_user(account), - } - - return view - - -class LogEntryPage( - namedtuple('LogEntryPage', ['logs', 'next_page_token'])): - """ - LogEntryPage represents a single page of logs. - :type logs: [LogEntry] - :type next_page_token: {any -> any} - """ - - -class AggregatedLogEntry( - namedtuple('AggregatedLogEntry', ['count', 'kind_id', 'day'])): - """ - AggregatedLogEntry represents an aggregated view of logs. - :type count: int - :type kind_id: int - :type day: string - """ - def to_dict(self, kinds, start_time): - synthetic_date = datetime(start_time.year, start_time.month, int(self.day), tzinfo=get_localzone()) - if synthetic_date.day < start_time.day: - synthetic_date = synthetic_date + relativedelta(months=1) - - view = { - 'kind': kinds[self.kind_id], - 'count': self.count, - 'datetime': format_date(synthetic_date), - } - - return view - - -@add_metaclass(ABCMeta) -class LogEntryDataInterface(object): - """ - Interface that represents all data store interactions required by a Log. - """ - - @abstractmethod - def get_logs_query(self, start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, - ignore=None, page_token=None): - """ - Returns a LogEntryPage. - """ - - @abstractmethod - def get_log_entry_kinds(self): - """ - Returns a map of LogEntryKind id -> name and name -> id - """ - - @abstractmethod - def repo_exists(self, namespace_name, repository_name): - """ - Returns whether or not a repo exists. - """ - - @abstractmethod - def get_aggregated_logs(self, start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, - ignore=None): - """ - Returns a list of aggregated logs - """ diff --git a/endpoints/api/logs_models_pre_oci.py b/endpoints/api/logs_models_pre_oci.py deleted file mode 100644 index b4dbbb060..000000000 --- a/endpoints/api/logs_models_pre_oci.py +++ /dev/null @@ -1,122 +0,0 @@ -import itertools - -from app import app -from data import model, database -from endpoints.api.logs_models_interface import LogEntryDataInterface, LogEntryPage, LogEntry, AggregatedLogEntry - - -def create_log(log): - account_organization = None - account_username = None - account_email = None - account_robot = None - try: - account_organization = log.account.organization - account_username = log.account.username - account_email = log.account.email - account_robot = log.account.robot - except AttributeError: - pass - - performer_robot = None - performer_username = None - performer_email = None - - try: - performer_robot = log.performer.robot - performer_username = log.performer.username - performer_email = log.performer.email - except AttributeError: - pass - - return LogEntry(log.metadata_json, log.ip, log.datetime, performer_email, performer_username, - performer_robot, account_organization, account_username, - account_email, account_robot, log.kind_id) - - -class PreOCIModel(LogEntryDataInterface): - """ - PreOCIModel implements the data model for the Tags using a database schema - before it was changed to support the OCI specification. - """ - - def get_logs_query(self, start_time, end_time, performer_name=None, repository_name=None, - namespace_name=None, ignore=None, page_token=None): - repo = None - if repository_name and namespace_name: - repo = model.repository.get_repository(namespace_name, repository_name) - - performer = None - if performer_name: - performer = model.user.get_user(performer_name) - - # TODO(LogMigrate): Remove the branch once we're back on a single table. - def get_logs(m): - logs_query = model.log.get_logs_query(start_time, end_time, performer=performer, - repository=repo, namespace=namespace_name, - ignore=ignore, model=m) - - logs, next_page_token = model.modelutil.paginate(logs_query, m, - descending=True, page_token=page_token, - limit=20, - max_page=app.config['ACTION_LOG_MAX_PAGE']) - return LogEntryPage([create_log(log) for log in logs], next_page_token) - - # First check the LogEntry3 table for the most recent logs, unless we've been expressly told - # to look inside the other tables. - TOKEN_TABLE_ID = 'tti' - tables = [database.LogEntry3, database.LogEntry2, database.LogEntry] - - table_index = 0 - table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None - if table_specified: - table_index = page_token.get(TOKEN_TABLE_ID) - - page_result = get_logs(tables[table_index]) - if page_result.next_page_token is None and table_index < len(tables) - 1: - page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1}) - - return page_result - - def get_log_entry_kinds(self): - return model.log.get_log_entry_kinds() - - def repo_exists(self, namespace_name, repository_name): - repo = model.repository.get_repository(namespace_name, repository_name) - if repo is None: - return False - return True - - def get_aggregated_logs(self, start_time, end_time, performer_name=None, repository_name=None, - namespace_name=None, ignore=None): - repo = None - if repository_name and namespace_name: - repo = model.repository.get_repository(namespace_name, repository_name) - - performer = None - if performer_name: - performer = model.user.get_user(performer_name) - - # TODO(LogMigrate): Remove the branch once we're back on a single table. - aggregated_logs = model.log.get_aggregated_logs(start_time, end_time, performer=performer, - repository=repo, namespace=namespace_name, - ignore=ignore, model=database.LogEntry) - aggregated_logs_2 = model.log.get_aggregated_logs(start_time, end_time, performer=performer, - repository=repo, namespace=namespace_name, - ignore=ignore, model=database.LogEntry2) - aggregated_logs_3 = model.log.get_aggregated_logs(start_time, end_time, performer=performer, - repository=repo, namespace=namespace_name, - ignore=ignore, model=database.LogEntry3) - - entries = {} - for log in itertools.chain(aggregated_logs, aggregated_logs_2, aggregated_logs_3): - key = '%s-%s' % (log.kind_id, log.day) - if key in entries: - entries[key] = AggregatedLogEntry(log.count + entries[key].count, log.kind_id, log.day) - else: - entries[key] = AggregatedLogEntry(log.count, log.kind_id, log.day) - - return entries.values() - - -pre_oci_model = PreOCIModel() diff --git a/endpoints/api/superuser.py b/endpoints/api/superuser.py index 40e3b4e12..72cb1c4f9 100644 --- a/endpoints/api/superuser.py +++ b/endpoints/api/superuser.py @@ -4,7 +4,7 @@ import os import string import socket -from datetime import datetime, timedelta +from datetime import datetime from random import SystemRandom from flask import request, make_response, jsonify @@ -16,6 +16,7 @@ from auth import scopes from auth.auth_context import get_authenticated_user from auth.permissions import SuperUserPermission from data.database import ServiceKeyApprovalType +from data.logs_model import logs_model from endpoints.api import (ApiResource, nickname, resource, validate_json_request, internal_only, require_scope, show_if, parse_args, query_param, require_fresh_login, path_param, verify_not_prod, @@ -25,36 +26,13 @@ from endpoints.api.build import get_logs_or_log_url from endpoints.api.superuser_models_pre_oci import (pre_oci_model, ServiceKeyDoesNotExist, ServiceKeyAlreadyApproved, InvalidRepositoryBuildException) -from endpoints.api.logs_models_pre_oci import pre_oci_model as log_model +from endpoints.api.logs import _validate_logs_arguments from util.useremails import send_confirmation_email, send_recovery_email from _init import ROOT_DIR logger = logging.getLogger(__name__) -def _validate_logs_arguments(start_time, end_time): - if start_time: - try: - start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z') - except ValueError: - start_time = None - - if not start_time: - start_time = datetime.today() - timedelta(7) # One week - - if end_time: - try: - end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z') - end_time = end_time + timedelta(days=1) - except ValueError: - end_time = None - - if not end_time: - end_time = datetime.today() - - return start_time, end_time - - def get_immediate_subdirectories(directory): return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))] @@ -134,10 +112,9 @@ class SuperUserAggregateLogs(ApiResource): if SuperUserPermission().can(): (start_time, end_time) = _validate_logs_arguments(parsed_args['starttime'], parsed_args['endtime']) - aggregated_logs = log_model.get_aggregated_logs(start_time, end_time) - kinds = log_model.get_log_entry_kinds() + aggregated_logs = logs_model.get_aggregated_log_counts(start_time, end_time) return { - 'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs] + 'aggregated': [log.to_dict() for log in aggregated_logs] } raise Unauthorized() @@ -166,13 +143,12 @@ class SuperUserLogs(ApiResource): end_time = parsed_args['endtime'] (start_time, end_time) = _validate_logs_arguments(start_time, end_time) - log_page = log_model.get_logs_query(start_time, end_time, page_token=page_token) - kinds = log_model.get_log_entry_kinds() + log_entry_page = logs_model.lookup_logs(start_time, end_time, page_token=page_token) return { 'start_time': format_date(start_time), 'end_time': format_date(end_time), - 'logs': [log.to_dict(kinds, include_namespace=True) for log in log_page.logs], - }, log_page.next_page_token + 'logs': [log.to_dict(avatar, include_namespace=True) for log in log_entry_page.logs], + }, log_entry_page.next_page_token raise Unauthorized() diff --git a/endpoints/api/superuser_models_interface.py b/endpoints/api/superuser_models_interface.py index 48bd9f716..e03d98e8c 100644 --- a/endpoints/api/superuser_models_interface.py +++ b/endpoints/api/superuser_models_interface.py @@ -208,120 +208,12 @@ class Organization(namedtuple('Organization', ['username', 'email'])): } -class LogEntry( - namedtuple('LogEntry', [ - 'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot', - 'account_organization', 'account_username', 'account_email', 'account_robot', 'kind', - ])): - """ - LogEntry a single log entry. - :type metadata_json: string - :type ip: string - :type datetime: string - :type performer_email: int - :type performer_username: string - :type performer_robot: boolean - :type account_organization: boolean - :type account_username: string - :type account_email: string - :type account_robot: boolean - :type kind_id: int - """ - - def to_dict(self): - view = { - 'kind': self.kind, - 'metadata': json.loads(self.metadata_json), - 'ip': self.ip, - 'datetime': format_date(self.datetime), - } - - if self.performer_username: - performer = AttrDict({'username': self.performer_username, 'email': self.performer_email}) - performer.robot = None - if self.performer_robot: - performer.robot = self.performer_robot - - view['performer'] = { - 'kind': 'user', - 'name': self.performer_username, - 'is_robot': self.performer_robot, - 'avatar': avatar.get_data_for_user(performer), - } - - if self.account_username: - account = AttrDict({'username': self.account_username, 'email': self.account_email}) - if self.account_organization: - - view['namespace'] = { - 'kind': 'org', - 'name': self.account_username, - 'avatar': avatar.get_data_for_org(account), - } - else: - account.robot = None - if self.account_robot: - account.robot = self.account_robot - view['namespace'] = { - 'kind': 'user', - 'name': self.account_username, - 'avatar': avatar.get_data_for_user(account), - } - - return view - - -class LogEntryPage( - namedtuple('LogEntryPage', ['logs', 'next_page_token'])): - """ - LogEntryPage represents a single page of logs. - :type logs: [LogEntry] - :type next_page_token: {any -> any} - """ - - -class AggregatedLogEntry( - namedtuple('AggregatedLogEntry', ['count', 'kind_id', 'day', 'start_time'])): - """ - AggregatedLogEntry represents an aggregated view of logs. - :type count: int - :type kind_id: int - :type day: string - :type start_time: Date - """ - - def to_dict(self): - synthetic_date = datetime(self.start_time.year, self.start_time.month, int(self.day), tzinfo=get_localzone()) - if synthetic_date.day < self.start_time.day: - synthetic_date = synthetic_date + relativedelta(months=1) - kinds = model.log.get_log_entry_kinds() - view = { - 'kind': kinds[self.kind_id], - 'count': self.count, - 'datetime': format_date(synthetic_date), - } - - return view - - @add_metaclass(ABCMeta) class SuperuserDataInterface(object): """ Interface that represents all data store interactions required by a superuser api. """ - @abstractmethod - def get_logs_query(self, start_time, end_time, page_token=None): - """ - Returns a LogEntryPage. - """ - - @abstractmethod - def get_aggregated_logs(self, start_time, end_time): - """ - Returns a list of AggregatedLogEntry - """ - @abstractmethod def get_organizations(self): """ diff --git a/endpoints/api/superuser_models_pre_oci.py b/endpoints/api/superuser_models_pre_oci.py index 27a62b0c4..53e791798 100644 --- a/endpoints/api/superuser_models_pre_oci.py +++ b/endpoints/api/superuser_models_pre_oci.py @@ -7,37 +7,8 @@ from auth.permissions import ReadRepositoryPermission, ModifyRepositoryPermissio from data import model, database from endpoints.api.build import get_job_config, _get_build_status from endpoints.api.superuser_models_interface import BuildTrigger -from endpoints.api.superuser_models_interface import SuperuserDataInterface, LogEntryPage, LogEntry, Organization, User, \ - ServiceKey, Approval, RepositoryBuild, AggregatedLogEntry - - -def _create_log(log, log_kind): - account_organization = None - account_username = None - account_email = None - account_robot = None - try: - account_organization = log.account.organization - account_username = log.account.username - account_email = log.account.email - account_robot = log.account.robot - except AttributeError: - pass - - performer_robot = None - performer_username = None - performer_email = None - - try: - performer_robot = log.performer.robot - performer_username = log.performer.username - performer_email = log.performer.email - except AttributeError: - pass - - return LogEntry(log.metadata_json, log.ip, log.datetime, performer_email, performer_username, - performer_robot, account_organization, account_username, - account_email, account_robot, log_kind[log.kind_id]) +from endpoints.api.superuser_models_interface import SuperuserDataInterface, Organization, User, \ + ServiceKey, Approval, RepositoryBuild def _create_user(user): @@ -205,16 +176,5 @@ class PreOCIModel(SuperuserDataInterface): def get_organizations(self): return [Organization(org.username, org.email) for org in model.organization.get_organizations()] - def get_aggregated_logs(self, start_time, end_time): - aggregated_logs = model.log.get_aggregated_logs(start_time, end_time) - return [AggregatedLogEntry(log.count, log.kind_id, log.day, start_time) for log in aggregated_logs] - - def get_logs_query(self, start_time, end_time, page_token=None): - logs_query = model.log.get_logs_query(start_time, end_time) - logs, next_page_token = model.modelutil.paginate(logs_query, database.LogEntry, descending=True, - page_token=page_token, limit=20) - kinds = model.log.get_log_entry_kinds() - return LogEntryPage([_create_log(log, kinds) for log in logs], next_page_token) - pre_oci_model = PreOCIModel() diff --git a/endpoints/api/test/test_logs_models_pre_oci.py b/endpoints/api/test/test_logs_models_pre_oci.py deleted file mode 100644 index f9a40b235..000000000 --- a/endpoints/api/test/test_logs_models_pre_oci.py +++ /dev/null @@ -1,131 +0,0 @@ -import pytest -from mock import Mock - -from data import model, database -from endpoints.api.logs_models_interface import LogEntry, LogEntryPage, AggregatedLogEntry -from endpoints.api.logs_models_pre_oci import pre_oci_model -from util.morecollections import AttrDict - - -def test_get_logs_query(monkeypatch): - get_repository_mock = Mock() - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - - get_user_mock = Mock() - monkeypatch.setattr(model.user, 'get_user', get_user_mock) - - get_logs_query_mock = Mock() - monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock) - - paginate_mock = Mock() - paginate_mock.return_value = ([], {}) - monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock) - - assert pre_oci_model.get_logs_query('start_time', 'end_time', 'preformer_namne', 'repository_name', 'namespace_name', - set(), None) == LogEntryPage([], {}) - - -def test_get_logs_query_returns_list_log_entries(monkeypatch): - get_repository_mock = Mock() - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - - get_user_mock = Mock() - monkeypatch.setattr(model.user, 'get_user', get_user_mock) - - get_logs_query_mock = Mock() - monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock) - - paginate_mock = Mock() - paginate_mock.return_value = ([AttrDict({'kind': 1, 'datetime': 'datetime', 'ip': 'ip', 'metadata_json': '{}', - 'account': AttrDict( - {'username': 'account_username', 'email': 'account_email', 'robot': False, - 'organization': False}), - 'performer': AttrDict( - {'email': 'performer_email', 'username': 'performer_username', - 'robot': False}), 'kind_id': 1})], {'key': 'value'}) - monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock) - - assert pre_oci_model.get_logs_query('start_time', 'end_time', 'performer_username', 'repository_name', - 'namespace_name', - set(), {'start_id': 1}) == LogEntryPage([ - LogEntry('{}', 'ip', 'datetime', 'performer_email', 'performer_username', False, - False, 'account_username', 'account_email', False, 1)], {'key': 'value'}) - - -@pytest.mark.skip('Turned off until we move back to a single LogEntry table') -def test_get_logs_query_calls_get_repository(monkeypatch): - repo_mock = Mock() - performer_mock = Mock() - query_mock = Mock() - - get_repository_mock = Mock() - get_repository_mock.return_value = repo_mock - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - - get_user_mock = Mock() - get_user_mock.return_value = performer_mock - monkeypatch.setattr(model.user, 'get_user', get_user_mock) - - get_logs_query_mock = Mock() - get_logs_query_mock.return_value = query_mock - monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock) - - paginate_mock = Mock() - page_token = {} - paginate_mock.return_value = ([], page_token) - monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock) - - ignore = set() - pre_oci_model.get_logs_query('start_time', 'end_time', 'performer_username', 'repository_name', 'namespace_name', - ignore, page_token) - - get_repository_mock.assert_called_once_with('namespace_name', 'repository_name') - get_user_mock.assert_called_once_with('performer_username') - get_logs_query_mock.assert_called_once_with('start_time', 'end_time', performer=performer_mock, repository=repo_mock, - namespace='namespace_name', ignore=ignore) - paginate_mock.assert_called_once_with(query_mock, database.LogEntry, descending=True, - page_token=page_token, limit=20) - - -def test_get_log_entry_kinds(monkeypatch): - get_log_entry_kinds_mock = Mock() - monkeypatch.setattr(model.log, 'get_log_entry_kinds', get_log_entry_kinds_mock) - pre_oci_model.get_log_entry_kinds() - get_log_entry_kinds_mock.assert_called_once_with() - - -def test_does_repo_exist_returns_false(monkeypatch): - get_repository_mock = Mock() - get_repository_mock.return_value = None - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - assert pre_oci_model.repo_exists('namespace_name', 'repository_name') is False - - -def test_does_repo_exist_returns_true(monkeypatch): - get_repository_mock = Mock() - get_repository_mock.return_value = True - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - assert pre_oci_model.repo_exists('namespace_name', 'repository_name') is True - - -def test_get_aggregated_logs(monkeypatch): - get_aggregated_logs_mock = Mock() - get_aggregated_logs_mock.side_effect = [[AttrDict({'day': '1', 'kind_id': 4, 'count': 6})], - [AttrDict({'day': '1', 'kind_id': 4, 'count': 12})], - [AttrDict({'day': '1', 'kind_id': 4, 'count': 3})]] - monkeypatch.setattr(model.log, 'get_aggregated_logs', get_aggregated_logs_mock) - - repo_mock = Mock() - get_repository_mock = Mock() - get_repository_mock.return_value = repo_mock - monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock) - - performer_mock = Mock() - get_user_mock = Mock() - get_user_mock.return_value = performer_mock - monkeypatch.setattr(model.user, 'get_user', get_user_mock) - - actual = pre_oci_model.get_aggregated_logs('start_time', 'end_time', 'performer_name', 'repository_name', - 'namespace_name', set()) - - assert actual == [AggregatedLogEntry(21, 4, '1')] diff --git a/endpoints/appr/registry.py b/endpoints/appr/registry.py index b97ccb2d6..0b470f878 100644 --- a/endpoints/appr/registry.py +++ b/endpoints/appr/registry.py @@ -14,6 +14,7 @@ from auth.auth_context import get_authenticated_user from auth.credentials import validate_credentials from auth.decorators import process_auth from auth.permissions import CreateRepositoryPermission, ModifyRepositoryPermission +from data.logs_model import logs_model from endpoints.appr import appr_bp, require_app_repo_read, require_app_repo_write from endpoints.appr.cnr_backend import Blob, Channel, Package, User from endpoints.appr.decorators import disallow_for_image_repository @@ -105,8 +106,8 @@ def list_packages(): def delete_package(namespace, package_name, release, media_type): reponame = repo_name(namespace, package_name) result = cnr_registry.delete_package(reponame, release, media_type, package_class=Package) - model.log_action('delete_tag', namespace, repo_name=package_name, - metadata={'release': release, 'mediatype': media_type}) + logs_model.log_action('delete_tag', namespace, repository_name=package_name, + metadata={'release': release, 'mediatype': media_type}) return jsonify(result) @@ -160,8 +161,8 @@ def pull(namespace, package_name, release, media_type): logger.debug('Pull of release %s of app repository %s/%s', release, namespace, package_name) reponame = repo_name(namespace, package_name) data = cnr_registry.pull(reponame, release, media_type, Package, blob_class=Blob) - model.log_action('pull_repo', namespace, repo_name=package_name, - metadata={'release': release, 'mediatype': media_type}) + logs_model.log_action('pull_repo', namespace, repository_name=package_name, + metadata={'release': release, 'mediatype': media_type}) json_format = request.args.get('format', None) == 'json' return _pull(data, json_format) @@ -188,7 +189,7 @@ def push(namespace, package_name): {"package": reponame, "scopes": ['create']}) Package.create_repository(reponame, private, owner) - model.log_action('create_repo', namespace, repo_name=package_name) + logs_model.log_action('create_repo', namespace, repository_name=package_name) if not ModifyRepositoryPermission(namespace, package_name).can(): raise Forbidden("Unauthorized access for: %s" % reponame, @@ -211,8 +212,8 @@ def push(namespace, package_name): blob = Blob(reponame, values['blob']) app_release = cnr_registry.push(reponame, release_version, media_type, blob, force, package_class=Package, user=owner, visibility=private) - model.log_action('push_repo', namespace, repo_name=package_name, - metadata={'release': release_version}) + logs_model.log_action('push_repo', namespace, repository_name=package_name, + metadata={'release': release_version}) return jsonify(app_release) @@ -265,8 +266,8 @@ def add_channel_release(namespace, package_name, channel_name, release): reponame = repo_name(namespace, package_name) result = cnr_registry.add_channel_release(reponame, channel_name, release, channel_class=Channel, package_class=Package) - model.log_action('create_tag', namespace, repo_name=package_name, - metadata={'channel': channel_name, 'release': release}) + logs_model.log_action('create_tag', namespace, repository_name=package_name, + metadata={'channel': channel_name, 'release': release}) return jsonify(result) @@ -296,8 +297,8 @@ def delete_channel_release(namespace, package_name, channel_name, release): reponame = repo_name(namespace, package_name) result = cnr_registry.delete_channel_release(reponame, channel_name, release, channel_class=Channel, package_class=Package) - model.log_action('delete_tag', namespace, repo_name=package_name, - metadata={'channel': channel_name, 'release': release}) + logs_model.log_action('delete_tag', namespace, repository_name=package_name, + metadata={'channel': channel_name, 'release': release}) return jsonify(result) @@ -312,6 +313,6 @@ def delete_channel(namespace, package_name, channel_name): _check_channel_name(channel_name) reponame = repo_name(namespace, package_name) result = cnr_registry.delete_channel(reponame, channel_name, channel_class=Channel) - model.log_action('delete_tag', namespace, repo_name=package_name, - metadata={'channel': channel_name}) + logs_model.log_action('delete_tag', namespace, repository_name=package_name, + metadata={'channel': channel_name}) return jsonify(result) diff --git a/endpoints/building.py b/endpoints/building.py index 05dfd64f9..1e5dd97e9 100644 --- a/endpoints/building.py +++ b/endpoints/building.py @@ -7,6 +7,7 @@ from flask import request from app import app, dockerfile_build_queue, metric_queue from data import model +from data.logs_model import logs_model from data.database import db from auth.auth_context import get_authenticated_user from notifications import spawn_notification @@ -109,8 +110,8 @@ def start_build(repository, prepared_build, pull_robot_name=None): event_log_metadata['trigger_kind'] = prepared_build.trigger.service.name event_log_metadata['trigger_metadata'] = prepared_build.metadata or {} - model.log.log_action('build_dockerfile', repository.namespace_user.username, - ip=request.remote_addr, metadata=event_log_metadata, repository=repository) + logs_model.log_action('build_dockerfile', repository.namespace_user.username, + ip=request.remote_addr, metadata=event_log_metadata, repository=repository) # TODO(jzelinskie): remove when more endpoints have been converted to using interfaces repo = AttrDict({ diff --git a/endpoints/keyserver/__init__.py b/endpoints/keyserver/__init__.py index 3a90169ba..95731be14 100644 --- a/endpoints/keyserver/__init__.py +++ b/endpoints/keyserver/__init__.py @@ -6,6 +6,7 @@ from flask import Blueprint, jsonify, abort, request, make_response from jwt import get_unverified_header from app import app +from data.logs_model import logs_model from endpoints.keyserver.models_interface import ServiceKeyDoesNotExist from endpoints.keyserver.models_pre_oci import pre_oci_model as model from util.security import jwtutil @@ -127,7 +128,7 @@ def put_service_key(service, kid): model.create_service_key('', kid, service, jwk, metadata, expiration_date, rotation_duration=rotation_duration) - model.log_action('service_key_create', request.remote_addr, metadata_dict={ + logs_model.log_action('service_key_create', ip=request.remote_addr, metadata={ 'kid': kid, 'preshared': False, 'service': service, @@ -151,7 +152,7 @@ def put_service_key(service, kid): except ServiceKeyDoesNotExist: abort(404) - model.log_action('service_key_rotate', request.remote_addr, metadata_dict={ + logs_model.log_action('service_key_rotate', ip=request.remote_addr, metadata={ 'kid': kid, 'signer_kid': signer_key.kid, 'service': service, @@ -187,7 +188,7 @@ def delete_service_key(service, kid): except ServiceKeyDoesNotExist: abort(404) - model.log_action('service_key_delete', request.remote_addr, metadata_dict={ + logs_model.log_action('service_key_delete', ip=request.remote_addr, metadata={ 'kid': kid, 'signer_kid': signer_key.kid, 'service': service, diff --git a/endpoints/keyserver/models_interface.py b/endpoints/keyserver/models_interface.py index 3b6f566c5..977c2f6b4 100644 --- a/endpoints/keyserver/models_interface.py +++ b/endpoints/keyserver/models_interface.py @@ -63,10 +63,3 @@ class KeyServerDataInterface(object): Deletes and returns a service key with the given kid or raises ServiceKeyNotFound. """ pass - - @abstractmethod - def log_action(self, action_name, ip, metadata_dict=None): - """ - Records a particular serivce key interaction to an audit log. - """ - pass diff --git a/endpoints/keyserver/models_pre_oci.py b/endpoints/keyserver/models_pre_oci.py index 8bcb65986..8dfb119b9 100644 --- a/endpoints/keyserver/models_pre_oci.py +++ b/endpoints/keyserver/models_pre_oci.py @@ -38,10 +38,6 @@ class PreOCIModel(KeyServerDataInterface): except data.model.ServiceKeyDoesNotExist: raise ServiceKeyDoesNotExist() - def log_action(self, action_name, ip, metadata_dict=None): - metadata_dict = {} if metadata_dict is None else metadata_dict - data.model.log.log_action(action_name, None, metadata=metadata_dict, ip=ip) - pre_oci_model = PreOCIModel() diff --git a/initdb.py b/initdb.py index 08e7c377d..9b27312d4 100644 --- a/initdb.py +++ b/initdb.py @@ -23,6 +23,7 @@ from data.database import (db, all_models, Role, TeamRole, Visibility, LoginServ ApprTagKind, ApprBlobPlacementLocation, Repository, TagKind, ManifestChild, TagToRepositoryTag, get_epoch_timestamp_ms) from data import model +from data.logs_model import logs_model from data.queue import WorkQueue from data.registry_model import registry_model from data.registry_model.registry_pre_oci_model import pre_oci_model @@ -179,11 +180,11 @@ def __generate_service_key(kid, name, user, timestamp, approval_type, expiration 'auto_approved': True } - model.log.log_action('service_key_approve', None, performer=user, - timestamp=timestamp, metadata=key_metadata) + logs_model.log_action('service_key_approve', None, performer=user, + timestamp=timestamp, metadata=key_metadata) - model.log.log_action('service_key_create', None, performer=user, - timestamp=timestamp, metadata=key_metadata) + logs_model.log_action('service_key_create', None, performer=user, + timestamp=timestamp, metadata=key_metadata) def __generate_repository(with_storage, user_obj, name, description, is_public, permissions, structure): @@ -583,9 +584,9 @@ def populate_database(minimal=False, with_storage=False): 'manifest_digest': tag_manifest.digest } - model.log.log_action('manifest_label_add', new_user_1.username, performer=new_user_1, - timestamp=datetime.now(), metadata=label_metadata, - repository=tag_manifest.tag.repository) + logs_model.log_action('manifest_label_add', new_user_1.username, performer=new_user_1, + timestamp=datetime.now(), metadata=label_metadata, + repository=tag_manifest.tag.repository) model.blob.initiate_upload(new_user_1.username, simple_repo.name, str(uuid4()), 'local_us', {}) model.notification.create_repo_notification(simple_repo, 'repo_push', 'quay_notification', {}, {}) @@ -839,66 +840,66 @@ def populate_database(minimal=False, with_storage=False): token.token_code = 'test' token.save() - model.log.log_action('org_create_team', org.username, performer=new_user_1, - timestamp=week_ago, metadata={'team': 'readers'}) + logs_model.log_action('org_create_team', org.username, performer=new_user_1, + timestamp=week_ago, metadata={'team': 'readers'}) - model.log.log_action('org_set_team_role', org.username, performer=new_user_1, - timestamp=week_ago, - metadata={'team': 'readers', 'role': 'read'}) + logs_model.log_action('org_set_team_role', org.username, performer=new_user_1, + timestamp=week_ago, + metadata={'team': 'readers', 'role': 'read'}) - model.log.log_action('create_repo', org.username, performer=new_user_1, - repository=org_repo, timestamp=week_ago, - metadata={'namespace': org.username, 'repo': 'orgrepo'}) + logs_model.log_action('create_repo', org.username, performer=new_user_1, + repository=org_repo, timestamp=week_ago, + metadata={'namespace': org.username, 'repo': 'orgrepo'}) - model.log.log_action('change_repo_permission', org.username, - performer=new_user_2, repository=org_repo, - timestamp=six_ago, - metadata={'username': new_user_1.username, - 'repo': 'orgrepo', 'role': 'admin'}) + logs_model.log_action('change_repo_permission', org.username, + performer=new_user_2, repository=org_repo, + timestamp=six_ago, + metadata={'username': new_user_1.username, + 'repo': 'orgrepo', 'role': 'admin'}) - model.log.log_action('change_repo_permission', org.username, - performer=new_user_1, repository=org_repo, - timestamp=six_ago, - metadata={'username': new_user_2.username, - 'repo': 'orgrepo', 'role': 'read'}) + logs_model.log_action('change_repo_permission', org.username, + performer=new_user_1, repository=org_repo, + timestamp=six_ago, + metadata={'username': new_user_2.username, + 'repo': 'orgrepo', 'role': 'read'}) - model.log.log_action('add_repo_accesstoken', org.username, performer=new_user_1, - repository=org_repo, timestamp=four_ago, - metadata={'repo': 'orgrepo', 'token': 'deploytoken'}) + logs_model.log_action('add_repo_accesstoken', org.username, performer=new_user_1, + repository=org_repo, timestamp=four_ago, + metadata={'repo': 'orgrepo', 'token': 'deploytoken'}) - model.log.log_action('push_repo', org.username, performer=new_user_2, - repository=org_repo, timestamp=today, - metadata={'username': new_user_2.username, - 'repo': 'orgrepo'}) + logs_model.log_action('push_repo', org.username, performer=new_user_2, + repository=org_repo, timestamp=today, + metadata={'username': new_user_2.username, + 'repo': 'orgrepo'}) - model.log.log_action('pull_repo', org.username, performer=new_user_2, - repository=org_repo, timestamp=today, - metadata={'username': new_user_2.username, - 'repo': 'orgrepo'}) + logs_model.log_action('pull_repo', org.username, performer=new_user_2, + repository=org_repo, timestamp=today, + metadata={'username': new_user_2.username, + 'repo': 'orgrepo'}) - model.log.log_action('pull_repo', org.username, repository=org_repo, - timestamp=today, - metadata={'token': 'sometoken', 'token_code': 'somecode', - 'repo': 'orgrepo'}) + logs_model.log_action('pull_repo', org.username, repository=org_repo, + timestamp=today, + metadata={'token': 'sometoken', 'token_code': 'somecode', + 'repo': 'orgrepo'}) - model.log.log_action('delete_tag', org.username, performer=new_user_2, - repository=org_repo, timestamp=today, - metadata={'username': new_user_2.username, - 'repo': 'orgrepo', 'tag': 'sometag'}) + logs_model.log_action('delete_tag', org.username, performer=new_user_2, + repository=org_repo, timestamp=today, + metadata={'username': new_user_2.username, + 'repo': 'orgrepo', 'tag': 'sometag'}) - model.log.log_action('pull_repo', org.username, repository=org_repo, - timestamp=today, - metadata={'token_code': 'somecode', 'repo': 'orgrepo'}) + logs_model.log_action('pull_repo', org.username, repository=org_repo, + timestamp=today, + metadata={'token_code': 'somecode', 'repo': 'orgrepo'}) - model.log.log_action('pull_repo', new_user_2.username, repository=publicrepo, - timestamp=yesterday, - metadata={'token_code': 'somecode', 'repo': 'publicrepo'}) + logs_model.log_action('pull_repo', new_user_2.username, repository=publicrepo, + timestamp=yesterday, + metadata={'token_code': 'somecode', 'repo': 'publicrepo'}) - model.log.log_action('build_dockerfile', new_user_1.username, repository=building, - timestamp=today, - metadata={'repo': 'building', 'namespace': new_user_1.username, - 'trigger_id': trigger.uuid, 'config': json.loads(trigger.config), - 'service': trigger.service.name}) + logs_model.log_action('build_dockerfile', new_user_1.username, repository=building, + timestamp=today, + metadata={'repo': 'building', 'namespace': new_user_1.username, + 'trigger_id': trigger.uuid, 'config': json.loads(trigger.config), + 'service': trigger.service.name}) model.message.create([{'content': 'We love you, Quay customers!', 'severity': 'info', 'media_type': 'text/plain'}]) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 996a86a59..9406df7e1 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -26,9 +26,10 @@ from app import (app, config_provider, all_queues, dockerfile_build_queue, notif from buildtrigger.basehandler import BuildTriggerHandler from initdb import setup_database_for_testing, finished_database_for_testing from data import database, model, appr_model -from data.registry_model import registry_model from data.appr_model.models import NEW_MODELS from data.database import RepositoryActionCount, Repository as RepositoryTable +from data.logs_model import logs_model +from data.registry_model import registry_model from test.helpers import assert_action_logged from util.secscan.fake import fake_security_scanner @@ -2174,8 +2175,8 @@ class TestDeleteRepository(ApiTestCase): model.notification.create_repo_notification(repository, 'build_queued', 'slack', {}, {}) # Create some logs. - model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) - model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) + logs_model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) + logs_model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) # Create some build triggers. user = model.user.get_user(ADMIN_ACCESS_USER) @@ -3326,7 +3327,7 @@ class TestOrgRobots(ApiTestCase): trigger, pull_robot_name=membername) # Add some log entries for the robot. - model.log.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo) + logs_model.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo) # Delete the robot and verify it works. self.deleteEmptyResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) diff --git a/util/audit.py b/util/audit.py index a24f6d8d4..614165bad 100644 --- a/util/audit.py +++ b/util/audit.py @@ -7,8 +7,8 @@ from urlparse import urlparse from flask import request from app import analytics, userevents, ip_resolver -from data import model from auth.auth_context import get_authenticated_context, get_authenticated_user +from data.logs_model import logs_model logger = logging.getLogger(__name__) @@ -74,6 +74,6 @@ def track_and_log(event_name, repo_obj, analytics_name=None, analytics_sample=1, # Log the action to the database. logger.debug('Logging the %s to logs system', event_name) - model.log.log_action(event_name, namespace_name, performer=get_authenticated_user(), - ip=request.remote_addr, metadata=metadata, repository=repo_obj) + logs_model.log_action(event_name, namespace_name, performer=get_authenticated_user(), + ip=request.remote_addr, metadata=metadata, repository=repo_obj) logger.debug('Track and log of %s complete', event_name) diff --git a/util/generatepresharedkey.py b/util/generatepresharedkey.py index 27e17128a..475c560ad 100644 --- a/util/generatepresharedkey.py +++ b/util/generatepresharedkey.py @@ -5,7 +5,7 @@ from dateutil.parser import parse as parse_date from app import app from data import model from data.database import ServiceKeyApprovalType -from data.model.log import log_action +from data.logs_model import logs_model def generate_key(service, name, expiration_date=None, notes=None): @@ -30,8 +30,8 @@ def generate_key(service, name, expiration_date=None, notes=None): 'auto_approved': True, } - log_action('service_key_create', None, metadata=key_log_metadata) - log_action('service_key_approve', None, metadata=key_log_metadata) + logs_model.log_action('service_key_create', metadata=key_log_metadata) + logs_model.log_action('service_key_approve', metadata=key_log_metadata) return private_key, key.kid diff --git a/workers/exportactionlogsworker.py b/workers/exportactionlogsworker.py index 857bdd094..2c4b5b66c 100644 --- a/workers/exportactionlogsworker.py +++ b/workers/exportactionlogsworker.py @@ -11,10 +11,10 @@ from enum import Enum, unique import features -from app import app, export_action_logs_queue, storage, get_app_url -from data import model +from app import app, export_action_logs_queue, storage, get_app_url, avatar from endpoints.api import format_date -from endpoints.api.logs_models_pre_oci import create_log +from data.logs_model import logs_model +from data.logs_model.interface import LogsIterationTimeout from workers.queueworker import QueueWorker, JobException from util.log import logfile_path from util.useremails import send_logs_exported_email @@ -29,10 +29,6 @@ MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30) EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour -MINIMUM_RANGE_SIZE = 1000 -MAXIMUM_RANGE_SIZE = 100000 -EXPECTED_ITERATION_LOG_COUNT = 1000 - @unique class ExportResult(Enum): @@ -79,29 +75,6 @@ class ExportActionLogsWorker(QueueWorker): repository_id = job_details['repository_id'] max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) - min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`', - export_id, elapsed) - self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) - return - - max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time, - repository_id, - namespace_id)) - if elapsed > max_query_time: - logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`', - export_id, elapsed) - self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) - return - - min_id = min_id or 1 - max_id = max_id or 1 - - logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id) - # Generate a file key so that if we return an API URL, it cannot simply be constructed from # just the export ID. file_key = str(uuid.uuid4()) @@ -131,10 +104,13 @@ class ExportActionLogsWorker(QueueWorker): BytesIO(str(prefix_data)), upload_metadata) uploaded_byte_count = len(prefix_data) + logs_iterator = logs_model.yield_logs_for_export(start_time, end_time, repository_id, + namespace_id, max_query_time) + try: # Stream the logs to storage as chunks. updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata, - uploaded_byte_count, min_id, max_id, + uploaded_byte_count, logs_iterator, job_details) if updated_metadata is None: storage.cancel_chunked_upload(upload_id, upload_metadata) @@ -169,69 +145,35 @@ class ExportActionLogsWorker(QueueWorker): self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url) - def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id, + def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, logs_iterator, job_details): export_id = job_details['export_id'] max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS) - max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS) - kinds = model.log.get_log_entry_kinds() - - # Using an adjusting scale, start downloading log rows in batches, starting at - # MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or - # the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes - # longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out. batch_start_time = datetime.utcnow() - current_start_id = min_id - current_batch_size = MINIMUM_RANGE_SIZE + try: + for logs in logs_iterator: + work_elapsed = datetime.utcnow() - batch_start_time + if work_elapsed > max_work_period: + logger.error('Retrieval of logs `%s` timed out with time of `%s`', + export_id, work_elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return None, None - while current_start_id <= max_id: - # Verify we haven't been working for too long. - work_elapsed = datetime.utcnow() - batch_start_time - if work_elapsed > max_work_period: - logger.error('Retrieval of logs `%s` timed out with time of `%s`', - export_id, work_elapsed) - self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) - return None, None + logs_data = '' + if logs: + logs_data = ','.join([json.dumps(log.to_dict(avatar, False)) for log in logs]) + ',' - id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)] - - # Load the next set of logs. - def retrieve_and_write_logs(): - namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None - repository_id = job_details.get('repository_id') - logger.debug('Retrieving logs over range %s with namespace %s and repository %s', - id_range, namespace_id, repository_id) - - logs_query = model.log.get_logs_query(namespace=namespace_id, - repository=repository_id, - id_range=id_range) - return [create_log(log) for log in logs_query] - - logs, elapsed = _run_and_time(retrieve_and_write_logs) - if elapsed > max_query_time: - logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`', - export_id, id_range, elapsed) - self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) - return None, None - - # Write the logs to storage. - logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range) - if logs: - logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ',' logs_data = logs_data.encode('utf-8') upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id, uploaded_byte_count, -1, BytesIO(logs_data), upload_metadata) uploaded_byte_count += len(logs_data) - - # Move forward. - current_start_id = id_range[1] + 1 - - # Increase the batch size if necessary. - if len(logs) < EXPECTED_ITERATION_LOG_COUNT: - current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2) + except LogsIterationTimeout: + logger.error('Retrieval of logs for export logs timed out at `%s`', work_elapsed) + self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT) + return None, None return upload_metadata, uploaded_byte_count @@ -271,11 +213,6 @@ def _parse_time(specified_time): except ValueError: return None -def _run_and_time(fn): - start_time = datetime.utcnow() - result = fn() - return result, datetime.utcnow() - start_time - if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) diff --git a/workers/repositoryactioncounter.py b/workers/repositoryactioncounter.py index d80389a6d..d2331b7f9 100644 --- a/workers/repositoryactioncounter.py +++ b/workers/repositoryactioncounter.py @@ -1,7 +1,10 @@ import logging +from datetime import date, timedelta + from app import app # This is required to initialize the database. from data import model +from data.logs_model import logs_model from workers.worker import Worker POLL_PERIOD_SECONDS = 10 @@ -21,8 +24,16 @@ class RepositoryActionCountWorker(Worker): logger.debug('No further repositories to count') return False + yesterday = date.today() - timedelta(days=1) + logger.debug('Found repository #%s to count', to_count.id) - was_counted = model.repositoryactioncount.count_repository_actions(to_count) + daily_count = logs_model.count_repository_actions(to_count, yesterday) + if daily_count is None: + logger.debug('Could not load count for repository #%s', to_count.id) + return False + + was_counted = model.repositoryactioncount.store_repository_action_count(to_count, yesterday, + daily_count) if not was_counted: logger.debug('Repository #%s was counted by another worker', to_count.id) return False diff --git a/workers/test/test_exportactionlogsworker.py b/workers/test/test_exportactionlogsworker.py index 9eba55179..44f919113 100644 --- a/workers/test/test_exportactionlogsworker.py +++ b/workers/test/test_exportactionlogsworker.py @@ -5,7 +5,8 @@ from datetime import datetime, timedelta from httmock import urlmatch, HTTMock -from data import model, database +from data import model +from data.logs_model import logs_model from workers.exportactionlogsworker import ExportActionLogsWorker from test.fixtures import * @@ -21,9 +22,6 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app): repo = model.repository.get_repository(namespace, repo_name) - assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs - assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs - worker = ExportActionLogsWorker(None) called = [{}] @@ -59,7 +57,9 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app): created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id) created_json = json.loads(created) - expected_count = database.LogEntry3.select().where(database.LogEntry3.repository == repo).count() + expected_count = len(logs_model.lookup_logs(start_time, end_time, namespace_name=namespace, + repository_name=repo_name).logs) + assert (expected_count > 1) == expects_logs assert created_json['export_id'] == 'someid'