Implement logs model using Elasticsearch
- Implement logs model using Elasticsearch with tests - Implement transition model using both elasticsearch and database model - Add LOGS_MODEL configuration to choose which to use. Co-authored-by: Sida Chen <sidchen@redhat.com> Co-authored-by: Kenny Lee Sin Cheong <kenny.lee@redhat.com>
This commit is contained in:
parent
40c0352dd1
commit
035541c6f2
20 changed files with 1282 additions and 38 deletions
250
data/logs_model/document_logs_model.py
Normal file
250
data/logs_model/document_logs_model.py
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
# pylint: disable=protected-access
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from calendar import timegm
|
||||
from datetime import timedelta, datetime
|
||||
from random import getrandbits
|
||||
from time import time
|
||||
|
||||
from elasticsearch.exceptions import ConnectionTimeout, ElasticsearchException
|
||||
|
||||
from data import model
|
||||
from data.model import config
|
||||
from data.model.log import _json_serialize, ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING, DataModelException
|
||||
from data.logs_model.elastic_logs import LogEntry, AWSElasticsearchLogs
|
||||
from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage
|
||||
from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout
|
||||
from data.logs_model.shared import SharedModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PAGE_SIZE = 100
|
||||
MAX_RESULT_WINDOW = 100000
|
||||
# DATE_RANGE_LIMIT is to limit the query date time range to at most 1 month.
|
||||
DATE_RANGE_LIMIT = 32
|
||||
|
||||
def _for_elasticsearch_logs(logs):
|
||||
namespace_ids = set()
|
||||
for log in logs:
|
||||
namespace_ids.add(log.account_id)
|
||||
namespace_ids.add(log.performer_id)
|
||||
|
||||
id_user_map = model.user.get_user_map_by_ids(namespace_ids)
|
||||
return [Log.for_elasticsearch_log(log, id_user_map) for log in logs]
|
||||
|
||||
|
||||
def _epoch_ms(dt):
|
||||
return (timegm(dt.utctimetuple()) * 1000) + (dt.microsecond / 1000)
|
||||
|
||||
|
||||
def _random_id():
|
||||
""" generates 31 random bits as integer for random_id field in LogEntry.
|
||||
It's used as tie-breaker for sorting logs based on datetime.
|
||||
We assume the chance of collision on both fields is low, but when they do
|
||||
collide, only the result of lookup_logs function will be affected because
|
||||
some log may be missing or duplicated.
|
||||
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
|
||||
"""
|
||||
return getrandbits(31)
|
||||
|
||||
|
||||
class DocumentLogsModel(SharedModel, ActionLogsDataInterface):
|
||||
"""
|
||||
DocumentLogsModel implements the data model for the logs API backed by an
|
||||
elasticsearch service.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self._client = AWSElasticsearchLogs(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def _get_ids_by_names(repository_name, namespace_name, performer_name):
|
||||
""" retrieve repository/namespace/performer ids based on their names.
|
||||
throws DataModelException when the namespace_name does not match any
|
||||
user in the database.
|
||||
returns database ID or None if not exists.
|
||||
"""
|
||||
repository_id = None
|
||||
account_id = None
|
||||
performer_id = None
|
||||
|
||||
if repository_name and namespace_name:
|
||||
repository = model.repository.get_repository(namespace_name, repository_name)
|
||||
if repository:
|
||||
repository_id = repository.id
|
||||
account_id = repository.namespace_user.id
|
||||
|
||||
if namespace_name and account_id is None:
|
||||
account = model.user.get_user_or_org(namespace_name)
|
||||
if account is None:
|
||||
raise DataModelException('Invalid namespace requested')
|
||||
account_id = account.id
|
||||
|
||||
if performer_name:
|
||||
performer = model.user.get_user(performer_name)
|
||||
if performer:
|
||||
performer_id = performer.id
|
||||
|
||||
return repository_id, account_id, performer_id
|
||||
|
||||
def _base_query(self, start_datetime, end_datetime, performer_id, repository_id, account_id,
|
||||
filter_kinds):
|
||||
search = LogEntry.search()
|
||||
if performer_id is not None:
|
||||
search = search.filter('term', performer_id=performer_id)
|
||||
|
||||
if repository_id is not None:
|
||||
search = search.filter('term', repository_id=repository_id)
|
||||
|
||||
if account_id is not None:
|
||||
search = search.filter('term', account_id=account_id)
|
||||
|
||||
if filter_kinds is not None:
|
||||
kind_map = model.log.get_log_entry_kinds()
|
||||
ignore_ids = [kind_map[kind_name] for kind_name in filter_kinds]
|
||||
search = search.exclude('terms', kind_id=ignore_ids)
|
||||
|
||||
search = search.query('range', datetime={'gte': start_datetime, 'lt': end_datetime})
|
||||
return search
|
||||
|
||||
def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None,
|
||||
namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None):
|
||||
assert start_datetime is not None and end_datetime is not None
|
||||
|
||||
if (page_token is not None) and (max_page_count is not None) and (page_token['page_number'] + 1) > max_page_count:
|
||||
return LogEntriesPage([], None)
|
||||
|
||||
repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names(
|
||||
repository_name, namespace_name, performer_name)
|
||||
search = self._base_query(start_datetime, end_datetime, performer_id, repository_id,
|
||||
account_id, filter_kinds)
|
||||
search = search.sort({'datetime': 'desc'}, {'random_id': 'desc'})
|
||||
search = search.extra(size=PAGE_SIZE + 1)
|
||||
|
||||
if page_token is not None:
|
||||
search = search.extra(search_after=[page_token['datetime'], page_token['random_id']])
|
||||
|
||||
response = search.execute()
|
||||
next_page_token = None
|
||||
|
||||
if len(response) == PAGE_SIZE + 1:
|
||||
# The last element in the response is used to check if there's more elements.
|
||||
# The second element in the response is used as the pagination token because search_after does
|
||||
# not include the exact match, and so the next page will start with the last element.
|
||||
# This keeps the behavior exactly the same as table_logs_model, so that
|
||||
# the caller can expect when a pagination token is non-empty, there must be
|
||||
# at least 1 log to be retrieved.
|
||||
next_page_token = {
|
||||
'datetime': _epoch_ms(response[-2].datetime),
|
||||
'random_id': response[-2].random_id,
|
||||
'page_number': page_token['page_number'] + 1 if page_token else 1,
|
||||
}
|
||||
|
||||
return LogEntriesPage(_for_elasticsearch_logs(response[:PAGE_SIZE]), next_page_token)
|
||||
|
||||
def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None,
|
||||
repository_name=None, namespace_name=None, filter_kinds=None):
|
||||
if end_datetime - start_datetime >= timedelta(days=DATE_RANGE_LIMIT):
|
||||
raise Exception('Cannot lookup aggregated logs over a period longer than a month')
|
||||
|
||||
repository_id, account_id, performer_id = DocumentLogsModel._get_ids_by_names(
|
||||
repository_name, namespace_name, performer_name)
|
||||
search = self._base_query(start_datetime, end_datetime, performer_id, repository_id,
|
||||
account_id, filter_kinds)
|
||||
search.aggs.bucket('by_id', 'terms', field='kind_id').bucket('by_date', 'date_histogram',
|
||||
field='datetime', interval='day')
|
||||
# es returns all buckets when size=0
|
||||
search = search.extra(size=0)
|
||||
resp = search.execute()
|
||||
|
||||
if not resp.aggregations:
|
||||
return []
|
||||
|
||||
counts = []
|
||||
by_id = resp.aggregations['by_id']
|
||||
|
||||
for id_bucket in by_id.buckets:
|
||||
for date_bucket in id_bucket.by_date.buckets:
|
||||
if date_bucket.doc_count > 0:
|
||||
counts.append(AggregatedLogCount(id_bucket.key, date_bucket.doc_count, date_bucket.key))
|
||||
|
||||
return counts
|
||||
|
||||
def count_repository_actions(self, repository, day):
|
||||
return self._base_query(day, day + timedelta(days=1), None, repository.id, None, None).count()
|
||||
|
||||
def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None,
|
||||
repository=None, repository_name=None, timestamp=None):
|
||||
if repository_name is not None:
|
||||
assert repository is None
|
||||
assert namespace_name is not None
|
||||
repository = model.repository.get_repository(namespace_name, repository_name)
|
||||
|
||||
if timestamp is None:
|
||||
timestamp = datetime.today()
|
||||
|
||||
account_id = None
|
||||
performer_id = None
|
||||
repository_id = None
|
||||
|
||||
if namespace_name is not None:
|
||||
account_id = model.user.get_namespace_user(namespace_name).id
|
||||
|
||||
if performer is not None:
|
||||
performer_id = performer.id
|
||||
|
||||
if repository is not None:
|
||||
repository_id = repository.id
|
||||
|
||||
metadata_json = json.dumps(metadata or {}, default=_json_serialize)
|
||||
kind_id = model.log._get_log_entry_kind(kind_name)
|
||||
log = LogEntry(random_id=_random_id(), kind_id=kind_id, account_id=account_id,
|
||||
performer_id=performer_id, ip=ip, metadata_json=metadata_json,
|
||||
repository_id=repository_id, datetime=timestamp)
|
||||
|
||||
try:
|
||||
log.save()
|
||||
except ElasticsearchException as ex:
|
||||
strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING')
|
||||
logger.error('log_action failed', extra=({'exception': ex}).update(log.to_dict()))
|
||||
if not (strict_logging_disabled and kind_name in ACTIONS_ALLOWED_WITHOUT_AUDIT_LOGGING):
|
||||
raise
|
||||
|
||||
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
|
||||
namespace_id=None, max_query_time=None):
|
||||
max_query_time = max_query_time.total_seconds()
|
||||
search = self._base_query(start_datetime, end_datetime, None, repository_id, namespace_id,
|
||||
None)
|
||||
|
||||
def raise_on_timeout(batch_generator):
|
||||
start = time()
|
||||
for batch in batch_generator:
|
||||
elapsed = time() - start
|
||||
if elapsed > max_query_time:
|
||||
logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`', namespace_id,
|
||||
repository_id, elapsed)
|
||||
raise LogsIterationTimeout()
|
||||
|
||||
yield batch
|
||||
start = time()
|
||||
|
||||
def read_batch(scroll):
|
||||
batch = []
|
||||
for log in scroll:
|
||||
batch.append(log)
|
||||
if len(batch) == MAX_RESULT_WINDOW:
|
||||
yield _for_elasticsearch_logs(batch)
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
yield _for_elasticsearch_logs(batch)
|
||||
|
||||
search = search.params(size=MAX_RESULT_WINDOW, request_timeout=max_query_time)
|
||||
|
||||
try:
|
||||
for batch in raise_on_timeout(read_batch(search.scan())):
|
||||
yield batch
|
||||
except ConnectionTimeout:
|
||||
raise LogsIterationTimeout()
|
||||
Reference in a new issue