035541c6f2
- Implement logs model using Elasticsearch with tests - Implement transition model using both elasticsearch and database model - Add LOGS_MODEL configuration to choose which to use. Co-authored-by: Sida Chen <sidchen@redhat.com> Co-authored-by: Kenny Lee Sin Cheong <kenny.lee@redhat.com>
250 lines
9.6 KiB
Python
250 lines
9.6 KiB
Python
# pylint: disable=protected-access
|
|
|
|
import json
|
|
import logging
|
|
|
|
from calendar import timegm
|
|
from datetime import timedelta, datetime
|
|
from random import getrandbits
|
|
from time import time
|
|
|
|
from elasticsearch.exceptions import ConnectionTimeout, ElasticsearchException
|
|
|
|
from data import model
|
|
from data.model import config
|
|
from data.model.log import _json_serialize, ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING, DataModelException
|
|
from data.logs_model.elastic_logs import LogEntry, AWSElasticsearchLogs
|
|
from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage
|
|
from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout
|
|
from data.logs_model.shared import SharedModel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PAGE_SIZE = 100
|
|
MAX_RESULT_WINDOW = 100000
|
|
# DATE_RANGE_LIMIT is to limit the query date time range to at most 1 month.
|
|
DATE_RANGE_LIMIT = 32
|
|
|
|
def _for_elasticsearch_logs(logs):
|
|
namespace_ids = set()
|
|
for log in logs:
|
|
namespace_ids.add(log.account_id)
|
|
namespace_ids.add(log.performer_id)
|
|
|
|
id_user_map = model.user.get_user_map_by_ids(namespace_ids)
|
|
return [Log.for_elasticsearch_log(log, id_user_map) for log in logs]
|
|
|
|
|
|
def _epoch_ms(dt):
|
|
return (timegm(dt.utctimetuple()) * 1000) + (dt.microsecond / 1000)
|
|
|
|
|
|
def _random_id():
|
|
""" generates 31 random bits as integer for random_id field in LogEntry.
|
|
It's used as tie-breaker for sorting logs based on datetime.
|
|
We assume the chance of collision on both fields is low, but when they do
|
|
collide, only the result of lookup_logs function will be affected because
|
|
some log may be missing or duplicated.
|
|
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
|
|
"""
|
|
return getrandbits(31)
|
|
|
|
|
|
class DocumentLogsModel(SharedModel, ActionLogsDataInterface):
|
|
"""
|
|
DocumentLogsModel implements the data model for the logs API backed by an
|
|
elasticsearch service.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self._client = AWSElasticsearchLogs(**kwargs)
|
|
|
|
@staticmethod
|
|
def _get_ids_by_names(repository_name, namespace_name, performer_name):
|
|
""" retrieve repository/namespace/performer ids based on their names.
|
|
throws DataModelException when the namespace_name does not match any
|
|
user in the database.
|
|
returns database ID or None if not exists.
|
|
"""
|
|
repository_id = None
|
|
account_id = None
|
|
performer_id = None
|
|
|
|
if repository_name and namespace_name:
|
|
repository = model.repository.get_repository(namespace_name, repository_name)
|
|
if repository:
|
|
repository_id = repository.id
|
|
account_id = repository.namespace_user.id
|
|
|
|
if namespace_name and account_id is None:
|
|
account = model.user.get_user_or_org(namespace_name)
|
|
if account is None:
|
|
raise DataModelException('Invalid namespace requested')
|
|
account_id = account.id
|
|
|
|
if performer_name:
|
|
performer = model.user.get_user(performer_name)
|
|
if performer:
|
|
performer_id = performer.id
|
|
|
|
return repository_id, account_id, performer_id
|
|
|
|
def _base_query(self, start_datetime, end_datetime, performer_id, repository_id, account_id,
|
|
filter_kinds):
|
|
search = LogEntry.search()
|
|
if performer_id is not None:
|
|
search = search.filter('term', performer_id=performer_id)
|
|
|
|
if repository_id is not None:
|
|
search = search.filter('term', repository_id=repository_id)
|
|
|
|
if account_id is not None:
|
|
search = search.filter('term', account_id=account_id)
|
|
|
|
if filter_kinds is not None:
|
|
kind_map = model.log.get_log_entry_kinds()
|
|
ignore_ids = [kind_map[kind_name] for kind_name in filter_kinds]
|
|
search = search.exclude('terms', kind_id=ignore_ids)
|
|
|
|
search = search.query('range', datetime={'gte': start_datetime, 'lt': end_datetime})
|
|
return search
|
|
|
|
def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None,
|
|
namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None):
|
|
assert start_datetime is not None and end_datetime is not None
|
|
|
|
if (page_token is not None) and (max_page_count is not None) and (page_token['page_number'] + 1) > max_page_count:
|
|
return LogEntriesPage([], None)
|
|
|
|
repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names(
|
|
repository_name, namespace_name, performer_name)
|
|
search = self._base_query(start_datetime, end_datetime, performer_id, repository_id,
|
|
account_id, filter_kinds)
|
|
search = search.sort({'datetime': 'desc'}, {'random_id': 'desc'})
|
|
search = search.extra(size=PAGE_SIZE + 1)
|
|
|
|
if page_token is not None:
|
|
search = search.extra(search_after=[page_token['datetime'], page_token['random_id']])
|
|
|
|
response = search.execute()
|
|
next_page_token = None
|
|
|
|
if len(response) == PAGE_SIZE + 1:
|
|
# The last element in the response is used to check if there's more elements.
|
|
# The second element in the response is used as the pagination token because search_after does
|
|
# not include the exact match, and so the next page will start with the last element.
|
|
# This keeps the behavior exactly the same as table_logs_model, so that
|
|
# the caller can expect when a pagination token is non-empty, there must be
|
|
# at least 1 log to be retrieved.
|
|
next_page_token = {
|
|
'datetime': _epoch_ms(response[-2].datetime),
|
|
'random_id': response[-2].random_id,
|
|
'page_number': page_token['page_number'] + 1 if page_token else 1,
|
|
}
|
|
|
|
return LogEntriesPage(_for_elasticsearch_logs(response[:PAGE_SIZE]), next_page_token)
|
|
|
|
def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None,
|
|
repository_name=None, namespace_name=None, filter_kinds=None):
|
|
if end_datetime - start_datetime >= timedelta(days=DATE_RANGE_LIMIT):
|
|
raise Exception('Cannot lookup aggregated logs over a period longer than a month')
|
|
|
|
repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names(
|
|
repository_name, namespace_name, performer_name)
|
|
search = self._base_query(start_datetime, end_datetime, performer_id, repository_id,
|
|
account_id, filter_kinds)
|
|
search.aggs.bucket('by_id', 'terms', field='kind_id').bucket('by_date', 'date_histogram',
|
|
field='datetime', interval='day')
|
|
# es returns all buckets when size=0
|
|
search = search.extra(size=0)
|
|
resp = search.execute()
|
|
|
|
if not resp.aggregations:
|
|
return []
|
|
|
|
counts = []
|
|
by_id = resp.aggregations['by_id']
|
|
|
|
for id_bucket in by_id.buckets:
|
|
for date_bucket in id_bucket.by_date.buckets:
|
|
if date_bucket.doc_count > 0:
|
|
counts.append(AggregatedLogCount(id_bucket.key, date_bucket.doc_count, date_bucket.key))
|
|
|
|
return counts
|
|
|
|
def count_repository_actions(self, repository, day):
|
|
return self._base_query(day, day + timedelta(days=1), None, repository.id, None, None).count()
|
|
|
|
def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None,
|
|
repository=None, repository_name=None, timestamp=None):
|
|
if repository_name is not None:
|
|
assert repository is None
|
|
assert namespace_name is not None
|
|
repository = model.repository.get_repository(namespace_name, repository_name)
|
|
|
|
if timestamp is None:
|
|
timestamp = datetime.today()
|
|
|
|
account_id = None
|
|
performer_id = None
|
|
repository_id = None
|
|
|
|
if namespace_name is not None:
|
|
account_id = model.user.get_namespace_user(namespace_name).id
|
|
|
|
if performer is not None:
|
|
performer_id = performer.id
|
|
|
|
if repository is not None:
|
|
repository_id = repository.id
|
|
|
|
metadata_json = json.dumps(metadata or {}, default=_json_serialize)
|
|
kind_id = model.log._get_log_entry_kind(kind_name)
|
|
log = LogEntry(random_id=_random_id(), kind_id=kind_id, account_id=account_id,
|
|
performer_id=performer_id, ip=ip, metadata_json=metadata_json,
|
|
repository_id=repository_id, datetime=timestamp)
|
|
|
|
try:
|
|
log.save()
|
|
except ElasticsearchException as ex:
|
|
strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING')
|
|
logger.error('log_action failed', extra=({'exception': ex}).update(log.to_dict()))
|
|
if not (strict_logging_disabled and kind_name in ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING):
|
|
raise
|
|
|
|
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
|
|
namespace_id=None, max_query_time=None):
|
|
max_query_time = max_query_time.total_seconds()
|
|
search = self._base_query(start_datetime, end_datetime, None, repository_id, namespace_id,
|
|
None)
|
|
|
|
def raise_on_timeout(batch_generator):
|
|
start = time()
|
|
for batch in batch_generator:
|
|
elapsed = time() - start
|
|
if elapsed > max_query_time:
|
|
logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`', namespace_id,
|
|
repository_id, elapsed)
|
|
raise LogsIterationTimeout()
|
|
|
|
yield batch
|
|
start = time()
|
|
|
|
def read_batch(scroll):
|
|
batch = []
|
|
for log in scroll:
|
|
batch.append(log)
|
|
if len(batch) == MAX_RESULT_WINDOW:
|
|
yield _for_elasticsearch_logs(batch)
|
|
batch = []
|
|
|
|
if batch:
|
|
yield _for_elasticsearch_logs(batch)
|
|
|
|
search = search.params(size=MAX_RESULT_WINDOW, request_timeout=max_query_time)
|
|
|
|
try:
|
|
for batch in raise_on_timeout(read_batch(search.scan())):
|
|
yield batch
|
|
except ConnectionTimeout:
|
|
raise LogsIterationTimeout()
|