From 035541c6f26ffc7e370b07a9aa3dae4e1fe4d6db Mon Sep 17 00:00:00 2001 From: Kenny Lee Sin Cheong Date: Mon, 21 Jan 2019 16:33:32 -0500 Subject: [PATCH] Implement logs model using Elasticsearch - Implement logs model using Elasticsearch with tests - Implement transition model using both elasticsearch and database model - Add LOGS_MODEL configuration to choose which to use. Co-authored-by: Sida Chen Co-authored-by: Kenny Lee Sin Cheong --- app.py | 2 + config.py | 3 + data/logs_model/__init__.py | 33 +- data/logs_model/datatypes.py | 39 +- data/logs_model/document_logs_model.py | 250 ++++++++++ data/logs_model/elastic_logs.py | 75 +++ data/logs_model/shared.py | 40 ++ .../table_document_transition_logs_model.py | 29 ++ data/logs_model/table_logs_model.py | 31 +- data/logs_model/test/mock_elasticsearch.py | 439 ++++++++++++++++++ data/logs_model/test/test_elasticsearch.py | 349 ++++++++++++++ data/model/log.py | 2 +- data/model/user.py | 12 + requirements-nover.txt | 3 + requirements.txt | 3 + test/registry/registry_tests.py | 1 + test/registry_tests.py | 5 + test/test_api_usage.py | 2 + util/config/schema.py | 1 + workers/test/test_exportactionlogsworker.py | 1 + 20 files changed, 1282 insertions(+), 38 deletions(-) create mode 100644 data/logs_model/document_logs_model.py create mode 100644 data/logs_model/elastic_logs.py create mode 100644 data/logs_model/shared.py create mode 100644 data/logs_model/table_document_transition_logs_model.py create mode 100644 data/logs_model/test/mock_elasticsearch.py create mode 100644 data/logs_model/test/test_elasticsearch.py diff --git a/app.py b/app.py index 0265c6294..656ecfddc 100644 --- a/app.py +++ b/app.py @@ -21,6 +21,7 @@ from avatars.avatars import Avatar from buildman.manager.buildcanceller import BuildCanceller from data import database from data import model +from data import logs_model from data.archivedlogs import LogArchive from data.billing import Billing from data.buildlogs import BuildLogs @@ -267,5 +268,6 @@ def load_user(user_uuid): logger.debug('User loader loading deferred user with uuid: %s', user_uuid) return LoginWrappedDBUser(user_uuid) +logs_model.configure(app.config) get_app_url = partial(get_app_url, app.config) diff --git a/config.py b/config.py index a23e3cb30..a0af03c77 100644 --- a/config.py +++ b/config.py @@ -562,3 +562,6 @@ class DefaultConfig(ImmutableConfig): # Maximum number of action logs pages that can be returned via the API. ACTION_LOG_MAX_PAGE = None + + # Log model + LOGS_MODEL = ('database', {}) diff --git a/data/logs_model/__init__.py b/data/logs_model/__init__.py index d452e499c..f79f6a90d 100644 --- a/data/logs_model/__init__.py +++ b/data/logs_model/__init__.py @@ -1,20 +1,41 @@ import os import logging -from data.logs_model.table_logs_model import table_logs_model +from data.logs_model.table_logs_model import TableLogsModel +from data.logs_model.document_logs_model import DocumentLogsModel +from data.logs_model.table_document_transition_logs_model import TableDocumentTransitionLogsModel +from data.logs_model.elastic_logs import AWSElasticsearchLogs +from data.logs_model.interface import ActionLogsDataInterface logger = logging.getLogger(__name__) +_LOG_MODELS = { + 'database': TableLogsModel, + 'transition': TableDocumentTransitionLogsModel, + 'elasticsearch': DocumentLogsModel, +} class LogsModelProxy(object): def __init__(self): - self._model = table_logs_model + self._model = None + + def initialize(self, model): + self._model = model + logger.info('===============================') + logger.info('Using logs model `%s`', self._model) + logger.info('===============================') + def __getattr__(self, attr): + if not self._model: + raise AttributeError("LogsModelProxy is not initialized") return getattr(self._model, attr) - logs_model = LogsModelProxy() -logger.info('===============================') -logger.info('Using logs model `%s`', logs_model._model) -logger.info('===============================') + +def configure(config_object): + logger.debug('Configuring Log Model') + model = config_object.get('LOGS_MODEL', ('database', {})) + model_name = model[0] + params = {} if len(model) == 1 else model[1] + logs_model.initialize(_LOG_MODELS[model_name](**params)) diff --git a/data/logs_model/datatypes.py b/data/logs_model/datatypes.py index a11725563..a6326e08b 100644 --- a/data/logs_model/datatypes.py +++ b/data/logs_model/datatypes.py @@ -41,6 +41,7 @@ class Log(namedtuple('Log', [ account_username = None account_email = None account_robot = None + try: account_organization = log.account.organization account_username = log.account.username @@ -61,8 +62,42 @@ class Log(namedtuple('Log', [ pass return Log(log.metadata_json, log.ip, log.datetime, performer_email, performer_username, - performer_robot, account_organization, account_username, - account_email, account_robot, log.kind_id) + performer_robot, account_organization, account_username, account_email, + account_robot, log.kind_id) + + @classmethod + def for_elasticsearch_log(cls, log, id_user_map): + account_organization = None + account_username = None + account_email = None + account_robot = None + + try: + if log.account_id: + account = id_user_map[log.account_id] + account_organization = account.organization + account_username = account.username + account_email = account.email + account_robot = account.robot + except AttributeError: + pass + + performer_robot = None + performer_username = None + performer_email = None + + try: + if log.performer_id: + performer = id_user_map[log.performer_id] + performer_robot = performer.robot + performer_username = performer.username + performer_email = performer.email + except AttributeError: + pass + + return Log(log.metadata_json, str(log.ip), log.datetime, performer_email, performer_username, + performer_robot, account_organization, account_username, account_email, + account_robot, log.kind_id) def to_dict(self, avatar, include_namespace=False): view = { diff --git a/data/logs_model/document_logs_model.py b/data/logs_model/document_logs_model.py new file mode 100644 index 000000000..bdfd50a5c --- /dev/null +++ b/data/logs_model/document_logs_model.py @@ -0,0 +1,250 @@ +# pylint: disable=protected-access + +import json +import logging + +from calendar import timegm +from datetime import timedelta, datetime +from random import getrandbits +from time import time + +from elasticsearch.exceptions import ConnectionTimeout, ElasticsearchException + +from data import model +from data.model import config +from data.model.log import _json_serialize, ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING, DataModelException +from data.logs_model.elastic_logs import LogEntry, AWSElasticsearchLogs +from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage +from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout +from data.logs_model.shared import SharedModel + +logger = logging.getLogger(__name__) + +PAGE_SIZE = 100 +MAX_RESULT_WINDOW = 100000 +# DATE_RANGE_LIMIT is to limit the query date time range to at most 1 month. +DATE_RANGE_LIMIT = 32 + +def _for_elasticsearch_logs(logs): + namespace_ids = set() + for log in logs: + namespace_ids.add(log.account_id) + namespace_ids.add(log.performer_id) + + id_user_map = model.user.get_user_map_by_ids(namespace_ids) + return [Log.for_elasticsearch_log(log, id_user_map) for log in logs] + + +def _epoch_ms(dt): + return (timegm(dt.utctimetuple()) * 1000) + (dt.microsecond / 1000) + + +def _random_id(): + """ generates 31 random bits as integer for random_id field in LogEntry. + It's used as tie-breaker for sorting logs based on datetime. + We assume the chance of collision on both fields is low, but when they do + collide, only the result of lookup_logs function will be affected because + some log may be missing or duplicated. + https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html + """ + return getrandbits(31) + + +class DocumentLogsModel(SharedModel, ActionLogsDataInterface): + """ + DocumentLogsModel implements the data model for the logs API backed by an + elasticsearch service. + """ + + def __init__(self, **kwargs): + self._client = AWSElasticsearchLogs(**kwargs) + + @staticmethod + def _get_ids_by_names(repository_name, namespace_name, performer_name): + """ retrieve repository/namespace/performer ids based on their names. + throws DataModelException when the namespace_name does not match any + user in the database. + returns database ID or None if not exists. + """ + repository_id = None + account_id = None + performer_id = None + + if repository_name and namespace_name: + repository = model.repository.get_repository(namespace_name, repository_name) + if repository: + repository_id = repository.id + account_id = repository.namespace_user.id + + if namespace_name and account_id is None: + account = model.user.get_user_or_org(namespace_name) + if account is None: + raise DataModelException('Invalid namespace requested') + account_id = account.id + + if performer_name: + performer = model.user.get_user(performer_name) + if performer: + performer_id = performer.id + + return repository_id, account_id, performer_id + + def _base_query(self, start_datetime, end_datetime, performer_id, repository_id, account_id, + filter_kinds): + search = LogEntry.search() + if performer_id is not None: + search = search.filter('term', performer_id=performer_id) + + if repository_id is not None: + search = search.filter('term', repository_id=repository_id) + + if account_id is not None: + search = search.filter('term', account_id=account_id) + + if filter_kinds is not None: + kind_map = model.log.get_log_entry_kinds() + ignore_ids = [kind_map[kind_name] for kind_name in filter_kinds] + search = search.exclude('terms', kind_id=ignore_ids) + + search = search.query('range', datetime={'gte': start_datetime, 'lt': end_datetime}) + return search + + def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None, + namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None): + assert start_datetime is not None and end_datetime is not None + + if (page_token is not None) and (max_page_count is not None) and (page_token['page_number'] + 1) > max_page_count: + return LogEntriesPage([], None) + + repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names( + repository_name, namespace_name, performer_name) + search = self._base_query(start_datetime, end_datetime, performer_id, repository_id, + account_id, filter_kinds) + search = search.sort({'datetime': 'desc'}, {'random_id': 'desc'}) + search = search.extra(size=PAGE_SIZE + 1) + + if page_token is not None: + search = search.extra(search_after=[page_token['datetime'], page_token['random_id']]) + + response = search.execute() + next_page_token = None + + if len(response) == PAGE_SIZE + 1: + # The last element in the response is used to check if there's more elements. + # The second element in the response is used as the pagination token because search_after does + # not include the exact match, and so the next page will start with the last element. + # This keeps the behavior exactly the same as table_logs_model, so that + # the caller can expect when a pagination token is non-empty, there must be + # at least 1 log to be retrieved. + next_page_token = { + 'datetime': _epoch_ms(response[-2].datetime), + 'random_id': response[-2].random_id, + 'page_number': page_token['page_number'] + 1 if page_token else 1, + } + + return LogEntriesPage(_for_elasticsearch_logs(response[:PAGE_SIZE]), next_page_token) + + def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None, + repository_name=None, namespace_name=None, filter_kinds=None): + if end_datetime - start_datetime >= timedelta(days=DATE_RANGE_LIMIT): + raise Exception('Cannot lookup aggregated logs over a period longer than a month') + + repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names( + repository_name, namespace_name, performer_name) + search = self._base_query(start_datetime, end_datetime, performer_id, repository_id, + account_id, filter_kinds) + search.aggs.bucket('by_id', 'terms', field='kind_id').bucket('by_date', 'date_histogram', + field='datetime', interval='day') + # es returns all buckets when size=0 + search = search.extra(size=0) + resp = search.execute() + + if not resp.aggregations: + return [] + + counts = [] + by_id = resp.aggregations['by_id'] + + for id_bucket in by_id.buckets: + for date_bucket in id_bucket.by_date.buckets: + if date_bucket.doc_count > 0: + counts.append(AggregatedLogCount(id_bucket.key, date_bucket.doc_count, date_bucket.key)) + + return counts + + def count_repository_actions(self, repository, day): + return self._base_query(day, day + timedelta(days=1), None, repository.id, None, None).count() + + def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None, + repository=None, repository_name=None, timestamp=None): + if repository_name is not None: + assert repository is None + assert namespace_name is not None + repository = model.repository.get_repository(namespace_name, repository_name) + + if timestamp is None: + timestamp = datetime.today() + + account_id = None + performer_id = None + repository_id = None + + if namespace_name is not None: + account_id = model.user.get_namespace_user(namespace_name).id + + if performer is not None: + performer_id = performer.id + + if repository is not None: + repository_id = repository.id + + metadata_json = json.dumps(metadata or {}, default=_json_serialize) + kind_id = model.log._get_log_entry_kind(kind_name) + log = LogEntry(random_id=_random_id(), kind_id=kind_id, account_id=account_id, + performer_id=performer_id, ip=ip, metadata_json=metadata_json, + repository_id=repository_id, datetime=timestamp) + + try: + log.save() + except ElasticsearchException as ex: + strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING') + logger.error('log_action failed', extra=({'exception': ex}).update(log.to_dict())) + if not (strict_logging_disabled and kind_name in ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING): + raise + + def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, + namespace_id=None, max_query_time=None): + max_query_time = max_query_time.total_seconds() + search = self._base_query(start_datetime, end_datetime, None, repository_id, namespace_id, + None) + + def raise_on_timeout(batch_generator): + start = time() + for batch in batch_generator: + elapsed = time() - start + if elapsed > max_query_time: + logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`', namespace_id, + repository_id, elapsed) + raise LogsIterationTimeout() + + yield batch + start = time() + + def read_batch(scroll): + batch = [] + for log in scroll: + batch.append(log) + if len(batch) == MAX_RESULT_WINDOW: + yield _for_elasticsearch_logs(batch) + batch = [] + + if batch: + yield _for_elasticsearch_logs(batch) + + search = search.params(size=MAX_RESULT_WINDOW, request_timeout=max_query_time) + + try: + for batch in raise_on_timeout(read_batch(search.scan())): + yield batch + except ConnectionTimeout: + raise LogsIterationTimeout() diff --git a/data/logs_model/elastic_logs.py b/data/logs_model/elastic_logs.py new file mode 100644 index 000000000..6fd8d9e42 --- /dev/null +++ b/data/logs_model/elastic_logs.py @@ -0,0 +1,75 @@ +import logging +from requests_aws4auth import AWS4Auth +from elasticsearch import RequestsHttpConnection +from elasticsearch_dsl import Index, Document, Integer, Date, Text, Ip +from elasticsearch_dsl.connections import connections + +logger = logging.getLogger(__name__) + +ELASTICSEARCH_CONNECTION_ALIAS = 'LogEntry' +INDEX_NAME_PREFIX = 'logentry_' + + +class LogEntry(Document): + random_id = Integer() # random id is the tie-breaker for sorting in pagination. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html + kind_id = Integer() + account_id = Integer() + performer_id = Integer() + repository_id = Integer() + ip = Ip() + metadata_json = Text() + datetime = Date() + + @classmethod + def init_with_template(cls, index_prefix=INDEX_NAME_PREFIX, index_settings=None): + """ + Create the index template, and populate LogEntry's mapping and index settings. + """ + wildcard_index = Index(name=index_prefix + '*', using=ELASTICSEARCH_CONNECTION_ALIAS) + wildcard_index.settings(**(index_settings or {})) + wildcard_index.document(cls) + + cls._index = wildcard_index + cls._index_prefix = index_prefix + index_template = wildcard_index.as_template(index_prefix) + index_template.save(using=ELASTICSEARCH_CONNECTION_ALIAS) + + def save(self, **kwargs): + # We group the logs based on year and month as different indexes, so that + # dropping those indexes based on retention range ~1 month is easy. + kwargs['index'] = self.datetime.strftime(self._index_prefix + '%Y-%m') + return super(LogEntry, self).save(**kwargs) + + +class AWSElasticsearchLogs(object): + """ + Model for logs operations stored in an AWS Elasticsearch cluster. + """ + + def __init__(self, host, port, aws_access_key, aws_secret_key, aws_region, + index_prefix=INDEX_NAME_PREFIX, index_settings=None): + # for options in index_settings, please refer to: + # https://www.elastic.co/guide/en/elasticsearch/guide/master/_index_settings.html + # some index settings are set at index creation time, and therefore, you should NOT + # change those settings once the index is set. + assert len(index_prefix) >= 1 + http_auth = None + if aws_access_key and aws_secret_key: + http_auth = AWS4Auth(aws_access_key, aws_secret_key, aws_region, 'es') + else: + logger.warn("Connecting to AWS Elasticsearch without HTTP AUTH") + + connections.create_connection( + alias=ELASTICSEARCH_CONNECTION_ALIAS, + hosts=[{ + 'host': host, + 'port': port + }], + http_auth=http_auth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + ) + + LogEntry.init_with_template(index_prefix, index_settings) diff --git a/data/logs_model/shared.py b/data/logs_model/shared.py new file mode 100644 index 000000000..948e4919b --- /dev/null +++ b/data/logs_model/shared.py @@ -0,0 +1,40 @@ +import uuid +import json + +from data import model +from data.logs_model.datatypes import _format_date + + +class SharedModel: + def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue, + namespace_name=None, repository_name=None, callback_url=None, + callback_email=None, filter_kinds=None): + """ Queues logs between the start_datetime and end_time, filtered by a repository or namespace, + for export to the specified URL and/or email address. Returns the ID of the export job + queued or None if error. + """ + export_id = str(uuid.uuid4()) + namespace = model.user.get_namespace_user(namespace_name) + if namespace is None: + return None + + repository = None + if repository_name is not None: + repository = model.repository.get_repository(namespace_name, repository_name) + if repository is None: + return None + + export_action_logs_queue.put([namespace_name], + json.dumps({ + 'export_id': export_id, + 'repository_id': repository.id if repository else None, + 'namespace_id': namespace.id, + 'namespace_name': namespace.username, + 'repository_name': repository.name if repository else None, + 'start_time': _format_date(start_datetime), + 'end_time': _format_date(end_datetime), + 'callback_url': callback_url, + 'callback_email': callback_email, + }), retries_remaining=3) + + return export_id diff --git a/data/logs_model/table_document_transition_logs_model.py b/data/logs_model/table_document_transition_logs_model.py new file mode 100644 index 000000000..bef0b5fb2 --- /dev/null +++ b/data/logs_model/table_document_transition_logs_model.py @@ -0,0 +1,29 @@ +import logging + +from data import model +from data.logs_model.table_logs_model import TableLogsModel +from data.logs_model.document_logs_model import DocumentLogsModel + +logger = logging.getLogger(__name__) + + +class TableDocumentTransitionLogsModel(TableLogsModel): + """ + TableDocumentTransitionLogsModel implements the data model that logs to both a database table and + document store, but reads exclusively from the database tables. + + This implementation is part of the migration path to move the logs from the database. + """ + + def __init__(self, **kwargs): + super(TableDocumentTransitionLogsModel, self).__init__() + self.document_logs_model = DocumentLogsModel(**kwargs) + + def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None, + repository=None, repository_name=None, timestamp=None): + super(TableDocumentTransitionLogsModel, self).log_action(kind_name, namespace_name, performer=performer, repository=repository, repository_name=repository_name, + ip=ip, metadata=metadata, timestamp=timestamp) + + self.document_logs_model.log_action(kind_name, namespace_name, performer=performer, repository=repository, repository_name=repository_name, + ip=ip, metadata=metadata, timestamp=timestamp) + diff --git a/data/logs_model/table_logs_model.py b/data/logs_model/table_logs_model.py index b8a6d78dc..87528e3a4 100644 --- a/data/logs_model/table_logs_model.py +++ b/data/logs_model/table_logs_model.py @@ -13,6 +13,7 @@ from data import model from data.database import LogEntry, LogEntry2, LogEntry3 from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _format_date +from data.logs_model.shared import SharedModel logger = logging.getLogger(__name__) @@ -24,7 +25,7 @@ EXPECTED_ITERATION_LOG_COUNT = 1000 LOG_MODELS = [LogEntry3, LogEntry2, LogEntry] -class TableLogsModel(ActionLogsDataInterface): +class TableLogsModel(SharedModel, ActionLogsDataInterface): """ TableLogsModel implements the data model for the logs API backed by a single table in the database. @@ -121,34 +122,6 @@ class TableLogsModel(ActionLogsDataInterface): model.log.log_action(kind_name, namespace_name, performer=performer, repository=repository, ip=ip, metadata=metadata or {}, timestamp=timestamp) - def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue, - namespace_name=None, repository_name=None, callback_url=None, - callback_email=None, filter_kinds=None): - export_id = str(uuid.uuid4()) - namespace = model.user.get_namespace_user(namespace_name) - if namespace is None: - return None - - repository = None - if repository_name is not None: - repository = model.repository.get_repository(namespace_name, repository_name) - if repository is None: - return None - - export_action_logs_queue.put([namespace_name], json.dumps({ - 'export_id': export_id, - 'repository_id': repository.id if repository else None, - 'namespace_id': namespace.id, - 'namespace_name': namespace.username, - 'repository_name': repository.name if repository else None, - 'start_time': _format_date(start_datetime), - 'end_time': _format_date(end_datetime), - 'callback_url': callback_url, - 'callback_email': callback_email, - }), retries_remaining=3) - - return export_id - def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None, namespace_id=None, max_query_time=None): # Using an adjusting scale, start downloading log rows in batches, starting at diff --git a/data/logs_model/test/mock_elasticsearch.py b/data/logs_model/test/mock_elasticsearch.py new file mode 100644 index 000000000..714899fcb --- /dev/null +++ b/data/logs_model/test/mock_elasticsearch.py @@ -0,0 +1,439 @@ +# -*- coding: utf-8 -*- +import json + +from dateutil.parser import parse + +from data.logs_model.datatypes import LogEntriesPage, Log, AggregatedLogCount + + +def _status(d, code=200): + return {"status_code": code, "content": json.dumps(d)} + + +def _shards(d, total=5, failed=0, successful=5): + d.update({"_shards": {"total": total, "failed": failed, "successful": successful}}) + return d + + +def _hits(hits): + return {"hits": {"total": len(hits), "max_score": None, "hits": hits}} + + +DEFAULT_TEMPLATE_RESPONSE = _status({"acknowledged": True}) +INDEX_RESPONSE_2019_01 = _status( + _shards({ + "_index": "logentry_2019-01", + "_type": "_doc", + "_id": "1", + "_version": 1, + "_seq_no": 0, + "_primary_term": 1, + "result": "created" + })) + +INDEX_RESPONSE_2017_03 = _status( + _shards({ + "_index": "logentry_2017-03", + "_type": "_doc", + "_id": "1", + "_version": 1, + "_seq_no": 0, + "_primary_term": 1, + "result": "created" + })) + +FAILURE_400 = _status({}, 400) + +INDEX_REQUEST_2019_01 = [ + "logentry_2019-01", { + "account_id": + 1, + "repository_id": + 1, + "ip": + "192.168.1.1", + "random_id": + 233, + "datetime": + "2019-01-01T03:30:00", + "metadata_json": + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1520479800}", + "performer_id": + 1, + "kind_id": + 1 + } +] + +INDEX_REQUEST_2017_03 = [ + "logentry_2017-03", { + "repository_id": + 1, + "account_id": + 1, + "ip": + "192.168.1.1", + "random_id": + 233, + "datetime": + "2017-03-08T03:30:00", + "metadata_json": + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1520479800}", + "performer_id": + 1, + "kind_id": + 2 + } +] + +_hit1 = { + "_index": "logentry_2018-03", + "_type": "doc", + "_id": "1", + "_score": None, + "_source": { + "random_id": + 233, + "kind_id": + 1, + "account_id": + 1, + "performer_id": + 1, + "repository_id": + 1, + "ip": + "192.168.1.1", + "metadata_json": + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1520479800}", + "datetime": + "2018-03-08T03:30", + }, + "sort": [1520479800000, 233] +} + +_hit2 = { + "_index": "logentry_2018-04", + "_type": "doc", + "_id": "2", + "_score": None, + "_source": { + "random_id": + 233, + "kind_id": + 2, + "account_id": + 2, + "performer_id": + 1, + "repository_id": + 2, + "ip": + "192.168.1.2", + "metadata_json": + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1522639800}", + "datetime": + "2018-04-02T03:30", + }, + "sort": [1522639800000, 233] +} + +_log1 = Log( + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1520479800}", + "192.168.1.1", parse("2018-03-08T03:30"), "user1.email", "user1.username", "user1.robot", + "user1.organization", "user1.username", "user1.email", "user1.robot", 1) +_log2 = Log( + "{\"\\ud83d\\ude02\": \"\\ud83d\\ude02\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\\ud83d\\udc4c\", \"key\": \"value\", \"time\": 1522639800}", + "192.168.1.2", parse("2018-04-02T03:30"), "user1.email", "user1.username", "user1.robot", + "user2.organization", "user2.username", "user2.email", "user2.robot", 2) + +SEARCH_RESPONSE_START = _status(_shards(_hits([_hit1, _hit2]))) +SEARCH_RESPONSE_END = _status(_shards(_hits([_hit2]))) +SEARCH_REQUEST_START = { + "sort": [{ + "datetime": "desc" + }, { + "random_id": "desc" + }], + "query": { + "bool": { + "filter": [{ + "term": { + "performer_id": 1 + } + }, { + "term": { + "repository_id": 1 + } + }, { + "term": { + "account_id": 1 + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-04-08T03:30:00", + "gte": "2018-03-08T03:30:00" + } + } + }] + } + }, + "size": 2 +} +SEARCH_REQUEST_END = { + "sort": [{ + "datetime": "desc" + }, { + "random_id": "desc" + }], + "query": { + "bool": { + "filter": [{ + "term": { + "performer_id": 1 + } + }, { + "term": { + "repository_id": 1 + } + }, { + "term": { + "account_id": 1 + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-04-08T03:30:00", + "gte": "2018-03-08T03:30:00" + } + } + }] + } + }, + "search_after": [1520479800000, 233], + "size": 2 +} +SEARCH_REQUEST_FILTER = { + "sort": [{ + "datetime": "desc" + }, { + "random_id": "desc" + }], + "query": { + "bool": { + "filter": [{ + "term": { + "performer_id": 1 + } + }, { + "term": { + "repository_id": 1 + } + }, { + "term": { + "account_id": 1 + } + }, { + "bool": { + "must_not": [{ + "terms": { + "kind_id": [1] + } + }] + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-04-08T03:30:00", + "gte": "2018-03-08T03:30:00" + } + } + }] + } + }, + "size": 2 +} +SEARCH_PAGE_TOKEN = {"datetime": 1520479800000, "random_id": 233, "page_number": 1} +SEARCH_PAGE_START = LogEntriesPage(logs=[_log1], next_page_token=SEARCH_PAGE_TOKEN) +SEARCH_PAGE_END = LogEntriesPage(logs=[_log2], next_page_token=None) +SEARCH_PAGE_EMPTY = LogEntriesPage([], None) + +AGGS_RESPONSE = _status( + _shards({ + "hits": { + "total": 4, + "max_score": None, + "hits": [] + }, + "aggregations": { + "by_id": { + "doc_count_error_upper_bound": + 0, + "sum_other_doc_count": + 0, + "buckets": [{ + "key": 2, + "doc_count": 3, + "by_date": { + "buckets": [{ + "key_as_string": "2009-11-12T00:00:00.000Z", + "key": 1257984000000, + "doc_count": 1 + }, { + "key_as_string": "2009-11-13T00:00:00.000Z", + "key": 1258070400000, + "doc_count": 0 + }, { + "key_as_string": "2009-11-14T00:00:00.000Z", + "key": 1258156800000, + "doc_count": 2 + }] + } + }, { + "key": 1, + "doc_count": 1, + "by_date": { + "buckets": [{ + "key_as_string": "2009-11-15T00:00:00.000Z", + "key": 1258243200000, + "doc_count": 1 + }] + } + }] + } + } + })) + +AGGS_REQUEST = { + "query": { + "bool": { + "filter": [{ + "term": { + "performer_id": 1 + } + }, { + "term": { + "repository_id": 1 + } + }, { + "term": { + "account_id": 1 + } + }, { + "bool": { + "must_not": [{ + "terms": { + "kind_id": [2] + } + }] + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-04-08T03:30:00", + "gte": "2018-03-08T03:30:00" + } + } + }] + } + }, + "aggs": { + "by_id": { + "terms": { + "field": "kind_id" + }, + "aggs": { + "by_date": { + "date_histogram": { + "field": "datetime", + "interval": "day" + } + } + } + } + }, + "size": 0 +} + +AGGS_COUNT = [ + AggregatedLogCount(1, 1, parse("2009-11-15T00:00:00.000")), + AggregatedLogCount(2, 1, parse("2009-11-12T00:00:00.000")), + AggregatedLogCount(2, 2, parse("2009-11-14T00:00:00.000")) +] + +COUNT_REQUEST = { + "query": { + "bool": { + "filter": [{ + "term": { + "repository_id": 1 + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-03-09T00:00:00", + "gte": "2018-03-08T00:00:00" + } + } + }] + } + } +} +COUNT_RESPONSE = _status(_shards({ + "count": 1, +})) + +# assume there are 2 pages +_scroll_id = "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACEmFkk1aGlTRzdSUWllejZmYTlEYTN3SVEAAAAAAAAhJRZJNWhpU0c3UlFpZXo2ZmE5RGEzd0lRAAAAAAAAHtAWLWZpaFZXVzVSTy1OTXA5V3MwcHZrZwAAAAAAAB7RFi1maWhWV1c1Uk8tTk1wOVdzMHB2a2cAAAAAAAAhJxZJNWhpU0c3UlFpZXo2ZmE5RGEzd0lR" + + +def _scroll(d): + d["_scroll_id"] = _scroll_id + return d + + +SCROLL_CREATE = _status(_shards(_scroll(_hits([_hit1])))) +SCROLL_GET = _status(_shards(_scroll(_hits([_hit2])))) +SCROLL_GET_2 = _status(_shards(_scroll(_hits([])))) +SCROLL_DELETE = _status({"succeeded": True, "num_freed": 5}) +SCROLL_LOGS = [[_log1], [_log2]] + +SCROLL_REQUESTS = [ + [ + "5m", 1, { + "sort": "_doc", + "query": { + "bool": { + "filter": [{ + "term": { + "repository_id": 1 + } + }, { + "term": { + "account_id": 1 + } + }], + "must": [{ + "range": { + "datetime": { + "lt": "2018-04-02T00:00:00", + "gte": "2018-03-08T00:00:00" + } + } + }] + } + } + } + ], + ["5m", {"scroll_id": _scroll_id}], + ["5m", {"scroll_id": _scroll_id}], + [{"scroll_id": [_scroll_id]}], +] + +SCROLL_RESPONSES = [SCROLL_CREATE, SCROLL_GET, SCROLL_GET_2, SCROLL_DELETE] \ No newline at end of file diff --git a/data/logs_model/test/test_elasticsearch.py b/data/logs_model/test/test_elasticsearch.py new file mode 100644 index 000000000..c442ca44e --- /dev/null +++ b/data/logs_model/test/test_elasticsearch.py @@ -0,0 +1,349 @@ +# -*- coding: utf-8 -*- + +# pylint: disable=redefined-outer-name, wildcard-import + +import json +from datetime import timedelta + +import pytest +from mock import patch, Mock +from dateutil.parser import parse + +from httmock import urlmatch, HTTMock + +from data.model.log import _json_serialize +from data.logs_model import configure, LogsModelProxy +from mock_elasticsearch import * + +FAKE_ES_HOST = 'fakees' +FAKE_ES_HOST_PATTERN = r'fakees.*' +FAKE_ES_PORT = 443 +FAKE_AWS_ACCESS_KEY = None +FAKE_AWS_SECRET_KEY = None +FAKE_AWS_REGION = None + +ES_CONFIG = { + 'LOGS_MODEL': [ + 'elasticsearch', { + 'host': FAKE_ES_HOST, + 'port': FAKE_ES_PORT, + 'aws_access_key': FAKE_AWS_ACCESS_KEY, + 'aws_secret_key': FAKE_AWS_SECRET_KEY, + 'aws_region': FAKE_AWS_REGION + } + ] +} + +FAKE_LOG_ENTRY_KINDS = {'push_repo': 1, 'pull_repo': 2} +FAKE_NAMESPACES = { + 'user1': + Mock(id=1, organization="user1.organization", username="user1.username", email="user1.email", + robot="user1.robot"), + 'user2': + Mock(id=2, organization="user2.organization", username="user2.username", email="user2.email", + robot="user2.robot") +} +FAKE_REPOSITORIES = { + 'user1/repo1': Mock(id=1, namespace_user=FAKE_NAMESPACES['user1']), + 'user2/repo2': Mock(id=2, namespace_user=FAKE_NAMESPACES['user2']), +} + + +@pytest.fixture() +def logs_model(): + # prevent logs model from changing + logs_model = LogsModelProxy() + with patch('data.logs_model.logs_model', logs_model): + yield logs_model + + +@pytest.fixture(scope='function') +def app_config(): + fake_config = {} + with patch("data.logs_model.document_logs_model.config.app_config", fake_config): + yield fake_config + + +@pytest.fixture() +def mock_page_size(): + with patch('data.logs_model.document_logs_model.PAGE_SIZE', 1): + yield + + +@pytest.fixture() +def mock_max_result_window(): + with patch('data.logs_model.document_logs_model.MAX_RESULT_WINDOW', 1): + yield + + +@pytest.fixture +def mock_random_id(): + mock_random = Mock(return_value=233) + with patch('data.logs_model.document_logs_model._random_id', mock_random): + yield + + +@pytest.fixture() +def mock_db_model(): + def get_user_map_by_ids(namespace_ids): + mapping = {} + for i in namespace_ids: + for name in FAKE_NAMESPACES: + if FAKE_NAMESPACES[name].id == i: + mapping[i] = FAKE_NAMESPACES[name] + return mapping + + model = Mock( + user=Mock( + get_namespace_user=FAKE_NAMESPACES.get, + get_user_or_org=FAKE_NAMESPACES.get, + get_user=FAKE_NAMESPACES.get, + get_user_map_by_ids=get_user_map_by_ids, + ), + repository=Mock(get_repository=lambda user_name, repo_name: FAKE_REPOSITORIES.get( + user_name + '/' + repo_name), + ), + log=Mock( + _get_log_entry_kind=lambda name: FAKE_LOG_ENTRY_KINDS[name], + _json_serialize=_json_serialize, + get_log_entry_kinds=Mock(return_value=FAKE_LOG_ENTRY_KINDS), + ), + ) + + with patch('data.logs_model.document_logs_model.model', model), patch( + 'data.logs_model.datatypes.model', model): + yield + + +def parse_query(query): + return {s.split('=')[0]: s.split('=')[1] for s in query.split("&")} + + +@pytest.fixture() +def mock_elasticsearch(): + mock = Mock() + mock.template.side_effect = NotImplementedError + mock.index.side_effect = NotImplementedError + mock.count.side_effect = NotImplementedError + mock.scroll_get.side_effect = NotImplementedError + mock.scroll_delete.side_effect = NotImplementedError + mock.search_scroll_create.side_effect = NotImplementedError + mock.search_aggs.side_effect = NotImplementedError + mock.search_after.side_effect = NotImplementedError + + @urlmatch(netloc=r'.*', path=r'.*') + def default(url, req): + raise Exception('\nurl={}\nmethod={}\nreq.url={}\nheaders={}\nbody={}'.format( + url, req.method, req.url, req.headers, req.body)) + + @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r'/_template/.*') + def template(url, req): + return mock.template(url.query.split('/')[-1], req.body) + + @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r'/logentry_[0-9\-]*/doc') + def index(url, req): + index = url.path.split('/')[1] + return mock.index(index, json.loads(req.body)) + + @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r'/logentry_\*/doc/_count') + def count(_, req): + return mock.count(json.loads(req.body)) + + @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r'/_search/scroll') + def scroll(url, req): + if req.method == 'DELETE': + return mock.scroll_delete(json.loads(req.body)) + elif req.method == 'GET': + query = parse_query(url.query) + scroll = query['scroll'] + return mock.scroll_get(scroll, json.loads(req.body)) + raise NotImplementedError() + + @urlmatch(netloc=FAKE_ES_HOST_PATTERN, path=r'/logentry_\*/doc/_search') + def search(url, req): + if "scroll" in url.query: + query = parse_query(url.query) + window_size = query['scroll'] + maximum_result_size = int(query['size']) + return mock.search_scroll_create(window_size, maximum_result_size, json.loads(req.body)) + elif "aggs" in req.body: + return mock.search_aggs(json.loads(req.body)) + else: + return mock.search_after(json.loads(req.body)) + + with HTTMock(scroll, count, search, index, template, default): + yield mock + + +@pytest.mark.parametrize( + """ + unlogged_pulls_ok, kind_name, namespace_name, repository, repository_name, + timestamp, index_response, expected_request, throws + """, + [ + # invalid inputs + pytest.param(False, 'non-existing', None, None, None, None, None, None, True, id="Invalid Kind" + ), + pytest.param(False, 'pull_repo', 'user1', Mock(id=1), 'repo1', None, None, None, True, + id="Invalid Parameters"), + + # remote exceptions + pytest.param(False, 'pull_repo', 'user1', Mock(id=1), None, None, FAILURE_400, None, True, + id="Throw on pull log failure"), + pytest.param(True, 'pull_repo', 'user1', Mock(id=1), None, parse("2017-03-08T03:30"), + FAILURE_400, INDEX_REQUEST_2017_03, False, id="Ok on pull log failure"), + + # success executions + pytest.param(False, 'pull_repo', 'user1', Mock(id=1), None, parse("2017-03-08T03:30"), + INDEX_RESPONSE_2017_03, INDEX_REQUEST_2017_03, False, + id="Log with namespace name and repository"), + pytest.param(False, 'push_repo', 'user1', None, 'repo1', parse("2019-01-01T03:30"), + INDEX_RESPONSE_2019_01, INDEX_REQUEST_2019_01, False, + id="Log with namespace name and repository name"), + ]) +def test_log_action(unlogged_pulls_ok, kind_name, namespace_name, repository, repository_name, + timestamp, index_response, expected_request, throws, app_config, logs_model, + mock_elasticsearch, mock_db_model, mock_random_id): + mock_elasticsearch.template = Mock(return_value=DEFAULT_TEMPLATE_RESPONSE) + mock_elasticsearch.index = Mock(return_value=index_response) + configure(ES_CONFIG) + app_config['ALLOW_PULLS_WITHOUT_STRICT_LOGGING'] = unlogged_pulls_ok + + performer = Mock(id=1) + ip = "192.168.1.1" + metadata = {'key': 'value', 'time': parse("2018-03-08T03:30"), '😂': '😂👌👌👌👌'} + if throws: + with pytest.raises(Exception): + logs_model.log_action(kind_name, namespace_name, performer, ip, metadata, repository, + repository_name, timestamp) + else: + logs_model.log_action(kind_name, namespace_name, performer, ip, metadata, repository, + repository_name, timestamp) + mock_elasticsearch.index.assert_called_with(*expected_request) + + +@pytest.mark.parametrize( + 'start_datetime, end_datetime, performer_name, repository_name, namespace_name, filter_kinds, page_token, max_page_count, search_response, expected_request, expected_page, throws', + [ + # 1st page + pytest.param(parse('2018-03-08T03:30'), parse('2018-04-08T03:30'), 'user1', "repo1", "user1", None, None, + None, SEARCH_RESPONSE_START, SEARCH_REQUEST_START, SEARCH_PAGE_START, False, id="1st page"), + # last page + pytest.param(parse('2018-03-08T03:30'), parse('2018-04-08T03:30'), 'user1', 'repo1', 'user1', None, + SEARCH_PAGE_TOKEN, None, SEARCH_RESPONSE_END, SEARCH_REQUEST_END, SEARCH_PAGE_END, False, id="Search using pagination token"), + # filter + pytest.param(parse('2018-03-08T03:30'), parse('2018-04-08T03:30'), 'user1', 'repo1', 'user1', [ + 'push_repo' + ], None, None, SEARCH_RESPONSE_END, SEARCH_REQUEST_FILTER, SEARCH_PAGE_END, False, id="Filtered search"), + # max page count + pytest.param(parse('2018-03-08T03:30'), parse('2018-04-08T03:30'), 'user1', 'repo1', 'user1', None, + SEARCH_PAGE_TOKEN, 1, AssertionError, None, SEARCH_PAGE_EMPTY, False, id="Page token reaches maximum page count", + ), # assert that it should not reach the ES server + ]) +def test_lookup_logs(start_datetime, end_datetime, performer_name, repository_name, namespace_name, + filter_kinds, page_token, max_page_count, search_response, expected_request, + expected_page, throws, logs_model, mock_elasticsearch, mock_db_model, + mock_page_size): + mock_elasticsearch.template = Mock(return_value=DEFAULT_TEMPLATE_RESPONSE) + mock_elasticsearch.search_after = Mock(return_value=search_response) + configure(ES_CONFIG) + if throws: + with pytest.raises(Exception): + logs_model.lookup_logs(start_datetime, end_datetime, performer_name, repository_name, + namespace_name, filter_kinds, page_token, max_page_count) + else: + page = logs_model.lookup_logs(start_datetime, end_datetime, performer_name, repository_name, + namespace_name, filter_kinds, page_token, max_page_count) + assert page == expected_page + if expected_request: + mock_elasticsearch.search_after.assert_called_with(expected_request) + + +@pytest.mark.parametrize( + 'start_datetime, end_datetime, performer_name, repository_name,namespace_name, filter_kinds, search_response, expected_request, expected_counts, throws', + [ + pytest.param( + parse('2018-03-08T03:30'), parse('2018-04-08T03:30'), 'user1', 'repo1', 'user1', + ['pull_repo'], AGGS_RESPONSE, AGGS_REQUEST, AGGS_COUNT, False, id="Valid Counts"), + # invalid case: date range too big + pytest.param( + parse('2018-03-08T03:30'), parse('2018-04-09T03:30'), 'user1', 'repo1', 'user1', [], None, + None, None, True, id="Throw on date range too big") + ]) +def test_get_aggregated_log_counts(start_datetime, end_datetime, performer_name, repository_name, + namespace_name, filter_kinds, search_response, expected_request, + expected_counts, throws, logs_model, mock_elasticsearch, + mock_db_model): + mock_elasticsearch.template = Mock(return_value=DEFAULT_TEMPLATE_RESPONSE) + mock_elasticsearch.search_aggs = Mock(return_value=search_response) + configure(ES_CONFIG) + if throws: + with pytest.raises(Exception): + logs_model.get_aggregated_log_counts(start_datetime, end_datetime, performer_name, + repository_name, namespace_name, filter_kinds) + else: + counts = logs_model.get_aggregated_log_counts(start_datetime, end_datetime, performer_name, + repository_name, namespace_name, filter_kinds) + assert set(counts) == set(expected_counts) + if expected_request: + mock_elasticsearch.search_aggs.assert_called_with(expected_request) + + +@pytest.mark.parametrize( + 'repository, day, count_response, expected_request, expected_count, throws', [ + pytest.param(FAKE_REPOSITORIES['user1/repo1'], parse("2018-03-08"), COUNT_RESPONSE, + COUNT_REQUEST, 1, False, id="Valid Count with 1 as result"), + ]) +def test_count_repository_actions(repository, day, count_response, expected_request, + expected_count, throws, logs_model, mock_elasticsearch, + mock_db_model): + mock_elasticsearch.template = Mock(return_value=DEFAULT_TEMPLATE_RESPONSE) + mock_elasticsearch.count = Mock(return_value=count_response) + configure(ES_CONFIG) + if throws: + with pytest.raises(Exception): + logs_model.count_repository_actions(repository, day) + else: + count = logs_model.count_repository_actions(repository, day) + assert count == expected_count + if expected_request: + mock_elasticsearch.count.assert_called_with(expected_request) + + +@pytest.mark.parametrize( + 'start_datetime, end_datetime, repository_id, namespace_id, max_query_time, scroll_responses, expected_requests, expected_logs, throws', + [ + pytest.param( + parse("2018-03-08"), parse("2018-04-02"), 1, 1, timedelta(seconds=10), SCROLL_RESPONSES, + SCROLL_REQUESTS, SCROLL_LOGS, False, id="Scroll 3 pages with page size = 1"), + ]) +def test_yield_logs_for_export(start_datetime, end_datetime, repository_id, namespace_id, + max_query_time, scroll_responses, expected_requests, expected_logs, + throws, logs_model, mock_elasticsearch, mock_db_model, + mock_max_result_window): + mock_elasticsearch.template = Mock(return_value=DEFAULT_TEMPLATE_RESPONSE) + mock_elasticsearch.search_scroll_create = Mock(return_value=scroll_responses[0]) + mock_elasticsearch.scroll_get = Mock(side_effect=scroll_responses[1:-1]) + mock_elasticsearch.scroll_delete = Mock(return_value=scroll_responses[-1]) + + configure(ES_CONFIG) + if throws: + with pytest.raises(Exception): + logs_model.yield_logs_for_export(start_datetime, end_datetime, repository_id, namespace_id, + max_query_time) + else: + log_generator = logs_model.yield_logs_for_export(start_datetime, end_datetime, repository_id, + namespace_id, max_query_time) + counter = 0 + for logs in log_generator: + if counter == 0: + mock_elasticsearch.search_scroll_create.assert_called_with(*expected_requests[counter]) + else: + mock_elasticsearch.scroll_get.assert_called_with(*expected_requests[counter]) + assert expected_logs[counter] == logs + counter += 1 + # the last two requests must be + # 1. get with response scroll with 0 hits, which indicates the termination condition + # 2. delete scroll request + mock_elasticsearch.scroll_get.assert_called_with(*expected_requests[-2]) + mock_elasticsearch.scroll_delete.assert_called_with(*expected_requests[-1]) diff --git a/data/model/log.py b/data/model/log.py index 9e445c5ee..3c771c409 100644 --- a/data/model/log.py +++ b/data/model/log.py @@ -117,7 +117,7 @@ def log_action(kind_name, user_or_organization_name, performer=None, repository= else: account = config.app_config.get('SERVICE_LOG_ACCOUNT_ID') if account is None: - account = User.select(fn.Min(User.id)).tuples().get()[0] + account = user.get_minimum_user_id() if performer is not None: performer = performer.id diff --git a/data/model/user.py b/data/model/user.py index 8e79d3982..99d5a875a 100644 --- a/data/model/user.py +++ b/data/model/user.py @@ -643,6 +643,14 @@ def get_user_by_id(user_db_id): return None +def get_user_map_by_ids(namespace_ids): + id_user = {namespace_id: None for namespace_id in namespace_ids} + users = User.select().where(User.id << namespace_ids, User.organization == False) + for user in users: + id_user[user.id] = user + + return id_user + def get_namespace_user_by_user_id(namespace_user_db_id): try: return User.get(User.id == namespace_user_db_id, User.robot == False) @@ -1068,6 +1076,10 @@ def list_namespace_geo_restrictions(namespace_name): .where(User.username == namespace_name)) +def get_minimum_user_id(): + return User.select(fn.Min(User.id)).tuples().get()[0] + + class LoginWrappedDBUser(UserMixin): def __init__(self, user_uuid, db_user=None): self._uuid = user_uuid diff --git a/requirements-nover.txt b/requirements-nover.txt index e60c52a9d..90f2c168f 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -27,6 +27,8 @@ bitmath boto3 cachetools==1.1.6 cryptography +elasticsearch +elasticsearch-dsl flask flask-restful geoip2 @@ -67,6 +69,7 @@ raven redis redlock reportlab==2.7 +requests-aws4auth semantic-version sqlalchemy==1.1.5 stringscore diff --git a/requirements.txt b/requirements.txt index d0bf95ef3..e23e13734 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,6 +38,8 @@ contextlib2==0.5.5 cryptography==2.2.2 debtcollector==1.19.0 decorator==4.3.0 +elasticsearch==6.3.1 +elasticsearch-dsl==6.3.1 enum34==1.1.6 Flask-Cors==3.0.4 Flask-Login==0.4.1 @@ -126,6 +128,7 @@ recaptcha2==0.1 redis==2.10.6 redlock==1.2.0 reportlab==2.7 +requests-aws4auth==0.9 requests-oauthlib==0.8.0 requests[security]==2.18.4 rfc3986==1.1.0 diff --git a/test/registry/registry_tests.py b/test/registry/registry_tests.py index 70980f6f1..f22ebfcca 100644 --- a/test/registry/registry_tests.py +++ b/test/registry/registry_tests.py @@ -243,6 +243,7 @@ def add_token(_, executor): return ('$token', executor.add_token().text) +# TODO(sidchen): Test logging to elastic search log model. @pytest.mark.parametrize('credentials, namespace, expected_performer', [ (('devtable', 'password'), 'devtable', 'devtable'), (add_robot, 'buynlarge', 'buynlarge+ownerbot'), diff --git a/test/registry_tests.py b/test/registry_tests.py index 05de88e90..af9800a3c 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -1007,6 +1007,7 @@ class RegistryTestsMixin(object): self.do_pull('public', 'newrepo', 'public', 'password', images=images, munge_shas=munged_shas) + # TODO(sidchen): Test logging to elastic search log model. def test_push_pull_logging(self): # Push a new repository. self.do_push('public', 'newrepo', 'public', 'password') @@ -1034,6 +1035,7 @@ class RegistryTestsMixin(object): self.assertEquals('public', logs[0]['performer']['name']) + # TODO(sidchen): Test logging to elastic search log model. def test_push_pull_logging_byrobot(self): # Lookup the robot's password. self.conduct_api_login('devtable', 'password') @@ -1067,6 +1069,7 @@ class RegistryTestsMixin(object): self.assertEquals('buynlarge+ownerbot', logs[0]['performer']['name']) + # TODO(sidchen): Test logging to elastic search log model. def test_push_pull_logging_bytoken(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') @@ -1088,6 +1091,7 @@ class RegistryTestsMixin(object): self.assertEquals('my-new-token', logs[0]['metadata']['token']) + # TODO(sidchen): Test logging to elastic search log model. def test_push_pull_logging_byoauth(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') @@ -1109,6 +1113,7 @@ class RegistryTestsMixin(object): self.assertEquals(1, logs[0]['metadata']['oauth_token_id']) + # TODO(sidchen): Test logging to elastic search log model. def test_push_pull_logging_byclitoken(self): # Push the repository. self.do_push('devtable', 'newrepo', 'devtable', 'password') diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 9406df7e1..db5b912c5 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -3391,6 +3391,7 @@ class TestOrgRobots(ApiTestCase): self.assertEquals(json['token'], json2['token']) +# TODO(sidchen): Test logging to elastic search log model. class TestLogs(ApiTestCase): def test_repo_logs(self): self.login(ADMIN_ACCESS_USER) @@ -3971,6 +3972,7 @@ class TestUserAuthorizations(ApiTestCase): expected_code=404) +# TODO(sidchen): Test logging to elastic search log model. class TestSuperUserLogs(ApiTestCase): def test_get_logs(self): self.login(ADMIN_ACCESS_USER) diff --git a/util/config/schema.py b/util/config/schema.py index 1804243f1..2e5c77df4 100644 --- a/util/config/schema.py +++ b/util/config/schema.py @@ -92,6 +92,7 @@ INTERNAL_ONLY_PROPERTIES = { 'TUF_SERVER', 'V1_ONLY_DOMAIN', + 'LOGS_MODEL', } CONFIG_SCHEMA = { diff --git a/workers/test/test_exportactionlogsworker.py b/workers/test/test_exportactionlogsworker.py index 44f919113..8fc7f6b37 100644 --- a/workers/test/test_exportactionlogsworker.py +++ b/workers/test/test_exportactionlogsworker.py @@ -12,6 +12,7 @@ from workers.exportactionlogsworker import ExportActionLogsWorker from test.fixtures import * +# TODO(sidchen): Test logging to elastic search log model. @pytest.mark.parametrize('namespace,repo_name,expects_logs', [ ('buynlarge', 'orgrepo', True), ('devtable', 'history', False),