Interface out all action log data model operations

This will allow us to reimplement the logs data model against a non-database system in the near future
This commit is contained in:
Joseph Schorr 2019-01-08 14:03:28 -05:00
parent a156c91962
commit b773a18ed8
26 changed files with 714 additions and 902 deletions

View file

@ -0,0 +1,20 @@
import os
import logging
from data.logs_model.table_logs_model import table_logs_model
logger = logging.getLogger(__name__)
class LogsModelProxy(object):
def __init__(self):
self._model = table_logs_model
def __getattr__(self, attr):
return getattr(self._model, attr)
logs_model = LogsModelProxy()
logger.info('===============================')
logger.info('Using logs model `%s`', logs_model._model)
logger.info('===============================')

View file

@ -0,0 +1,120 @@
import json
from calendar import timegm
from collections import namedtuple
from email.utils import formatdate
from cachetools import lru_cache
from data import model
from util.morecollections import AttrDict
def _format_date(date):
""" Output an RFC822 date format. """
if date is None:
return None
return formatdate(timegm(date.utctimetuple()))
@lru_cache(maxsize=1)
def _kinds():
return model.log.get_log_entry_kinds()
class LogEntriesPage(namedtuple('LogEntriesPage', ['logs', 'next_page_token'])):
""" Represents a page returned by the lookup_logs call. The `logs` contains the logs
found for the page and `next_page_token`, if not None, contains the token to be
encoded and returned for the followup call.
"""
class Log(namedtuple('Log', [
'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot',
'account_organization', 'account_username', 'account_email', 'account_robot', 'kind_id'])):
""" Represents a single log entry returned by the logs model. """
@classmethod
def for_logentry(cls, log):
account_organization = None
account_username = None
account_email = None
account_robot = None
try:
account_organization = log.account.organization
account_username = log.account.username
account_email = log.account.email
account_robot = log.account.robot
except AttributeError:
pass
performer_robot = None
performer_username = None
performer_email = None
try:
performer_robot = log.performer.robot
performer_username = log.performer.username
performer_email = log.performer.email
except AttributeError:
pass
return Log(log.metadata_json, log.ip, log.datetime, performer_email, performer_username,
performer_robot, account_organization, account_username,
account_email, account_robot, log.kind_id)
def to_dict(self, avatar, include_namespace=False):
view = {
'kind': _kinds()[self.kind_id],
'metadata': json.loads(self.metadata_json),
'ip': self.ip,
'datetime': _format_date(self.datetime),
}
if self.performer_username:
performer = AttrDict({'username': self.performer_username, 'email': self.performer_email})
performer.robot = None
if self.performer_robot:
performer.robot = self.performer_robot
view['performer'] = {
'kind': 'user',
'name': self.performer_username,
'is_robot': self.performer_robot,
'avatar': avatar.get_data_for_user(performer),
}
if include_namespace:
if self.account_username:
account = AttrDict({'username': self.account_username, 'email': self.account_email})
if self.account_organization:
view['namespace'] = {
'kind': 'org',
'name': self.account_username,
'avatar': avatar.get_data_for_org(account),
}
else:
account.robot = None
if self.account_robot:
account.robot = self.account_robot
view['namespace'] = {
'kind': 'user',
'name': self.account_username,
'avatar': avatar.get_data_for_user(account),
}
return view
class AggregatedLogCount(namedtuple('AggregatedLogCount', ['kind_id', 'count', 'datetime'])):
""" Represents the aggregated count of the number of logs, of a particular kind, on a day. """
def to_dict(self):
view = {
'kind': _kinds()[self.kind_id],
'count': self.count,
'datetime': _format_date(self.datetime),
}
return view

View file

@ -0,0 +1,62 @@
from abc import ABCMeta, abstractmethod
from six import add_metaclass
class LogsIterationTimeout(Exception):
""" Exception raised if logs iteration times out. """
@add_metaclass(ABCMeta)
class ActionLogsDataInterface(object):
""" Interface for code to work with the logs data model. The logs data model consists
of all access for reading and writing action logs.
"""
@abstractmethod
def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None,
namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None):
""" Looks up all logs between the start_datetime and end_datetime, filtered
by performer (a user), repository or namespace. Note that one (and only one) of the three
can be specified. Returns a LogEntriesPage. filter, if specified, is a set/list of the
kinds of logs to filter.
"""
@abstractmethod
def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None,
repository_name=None, namespace_name=None, filter_kinds=None):
""" Returns the aggregated count of logs, by kind, between the start_datetime and end_datetime,
filtered by performer (a user), repository or namespace. Note that one (and only one) of
the three can be specified. Returns a list of AggregatedLogCount.
"""
@abstractmethod
def count_repository_actions(self, repository, day):
""" Returns the total number of repository actions over the given day, in the given repository
or None on error.
"""
@abstractmethod
def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue,
namespace_name=None, repository_name=None, callback_url=None,
callback_email=None, filter_kinds=None):
""" Queues logs between the start_datetime and end_time, filtered by a repository or namespace,
for export to the specified URL and/or email address. Returns the ID of the export job
queued or None if error.
"""
@abstractmethod
def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None,
repository=None, repository_name=None, timestamp=None):
""" Logs a single action as having taken place. """
@abstractmethod
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
namespace_id=None, max_query_time=None):
""" Returns an iterator that yields bundles of all logs found between the start_datetime and
end_datetime, optionally filtered by the repository or namespace. This function should be
used for any bulk lookup operations, and should be implemented by implementors to put
minimal strain on the backing storage for large operations. If there was an error in setting
up, returns None.
If max_query_time is specified, each iteration that yields a log bundle will have its
queries run with a maximum timeout of that specified, and, if any exceed that threshold,
LogsIterationTimeout will be raised instead of returning the logs bundle.
"""

View file

@ -0,0 +1,227 @@
# pylint: disable=protected-access
import logging
import json
import uuid
from datetime import datetime, timedelta
from tzlocal import get_localzone
from dateutil.relativedelta import relativedelta
from data import model
from data.database import LogEntry, LogEntry2, LogEntry3
from data.logs_model.interface import ActionLogsDataInterface, LogsIterationTimeout
from data.logs_model.datatypes import Log, AggregatedLogCount, LogEntriesPage, _format_date
logger = logging.getLogger(__name__)
MINIMUM_RANGE_SIZE = 1000
MAXIMUM_RANGE_SIZE = 100000
EXPECTED_ITERATION_LOG_COUNT = 1000
LOG_MODELS = [LogEntry3, LogEntry2, LogEntry]
class TableLogsModel(ActionLogsDataInterface):
"""
TableLogsModel implements the data model for the logs API backed by a single table
in the database.
"""
def lookup_logs(self, start_datetime, end_datetime, performer_name=None, repository_name=None,
namespace_name=None, filter_kinds=None, page_token=None, max_page_count=None):
assert start_datetime is not None
assert end_datetime is not None
repository = None
if repository_name and namespace_name:
repository = model.repository.get_repository(namespace_name, repository_name)
performer = None
if performer_name:
performer = model.user.get_user(performer_name)
def get_logs(m):
logs_query = model.log.get_logs_query(start_datetime, end_datetime, performer=performer,
repository=repository, namespace=namespace_name,
ignore=filter_kinds, model=m)
logs, next_page_token = model.modelutil.paginate(logs_query, m,
descending=True, page_token=page_token,
limit=20,
max_page=max_page_count)
return LogEntriesPage([Log.for_logentry(log) for log in logs], next_page_token)
# First check the LogEntry3 table for the most recent logs, unless we've been expressly told
# to look inside the other tables.
TOKEN_TABLE_ID = 'tti'
table_index = 0
table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None
if table_specified:
table_index = page_token.get(TOKEN_TABLE_ID)
page_result = get_logs(LOG_MODELS[table_index])
if page_result.next_page_token is None and table_index < len(LOG_MODELS) - 1:
page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1})
return page_result
def get_aggregated_log_counts(self, start_datetime, end_datetime, performer_name=None,
repository_name=None, namespace_name=None, filter_kinds=None):
if end_datetime - start_datetime >= timedelta(weeks=4):
raise Exception('Cannot lookup aggregated logs over a period longer than a month')
repository = None
if repository_name and namespace_name:
repository = model.repository.get_repository(namespace_name, repository_name)
performer = None
if performer_name:
performer = model.user.get_user(performer_name)
entries = {}
for log_model in LOG_MODELS:
aggregated = model.log.get_aggregated_logs(start_datetime, end_datetime,
performer=performer,
repository=repository,
namespace=namespace_name,
ignore=filter_kinds,
model=log_model)
for entry in aggregated:
synthetic_date = datetime(start_datetime.year, start_datetime.month, int(entry.day),
tzinfo=get_localzone())
if synthetic_date.day < start_datetime.day:
synthetic_date = synthetic_date + relativedelta(months=1)
key = '%s-%s' % (entry.kind_id, entry.day)
if key in entries:
entries[key] = AggregatedLogCount(entry.kind_id, entry.count + entries[key].count,
synthetic_date)
else:
entries[key] = AggregatedLogCount(entry.kind_id, entry.count, synthetic_date)
return entries.values()
def count_repository_actions(self, repository, day):
return model.repositoryactioncount.count_repository_actions(repository, day)
def log_action(self, kind_name, namespace_name=None, performer=None, ip=None, metadata=None,
repository=None, repository_name=None, timestamp=None):
if repository_name is not None:
assert repository is None
assert namespace_name is not None
repository = model.repository.get_repository(namespace_name, repository_name)
model.log.log_action(kind_name, namespace_name, performer=performer, repository=repository,
ip=ip, metadata=metadata or {}, timestamp=timestamp)
def queue_logs_export(self, start_datetime, end_datetime, export_action_logs_queue,
namespace_name=None, repository_name=None, callback_url=None,
callback_email=None, filter_kinds=None):
export_id = str(uuid.uuid4())
namespace = model.user.get_namespace_user(namespace_name)
if namespace is None:
return None
repository = None
if repository_name is not None:
repository = model.repository.get_repository(namespace_name, repository_name)
if repository is None:
return None
export_action_logs_queue.put([namespace_name], json.dumps({
'export_id': export_id,
'repository_id': repository.id if repository else None,
'namespace_id': namespace.id,
'namespace_name': namespace.username,
'repository_name': repository.name if repository else None,
'start_time': _format_date(start_datetime),
'end_time': _format_date(end_datetime),
'callback_url': callback_url,
'callback_email': callback_email,
}), retries_remaining=3)
return export_id
def yield_logs_for_export(self, start_datetime, end_datetime, repository_id=None,
namespace_id=None, max_query_time=None):
# Lookup the starting and ending IDs for the log range in the table. This operation is quite
# quick, so we use it as a bounding box for the later lookups.
min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_datetime,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of min ID for export logs `%s/%s` timed out with time of `%s`',
namespace_id, repository_id, elapsed)
raise LogsIterationTimeout()
max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_datetime,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of max ID for export logs `%s/%s` timed out with time of `%s`',
namespace_id, repository_id, elapsed)
raise LogsIterationTimeout()
min_id = min_id or 1
max_id = max_id or 1
logger.info('Found log range of %s to %s for export logs `%s/%s`', min_id, max_id,
namespace_id, repository_id)
# Using an adjusting scale, start downloading log rows in batches, starting at
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
batch_start_time = datetime.utcnow()
current_start_id = min_id
current_batch_size = MINIMUM_RANGE_SIZE
while current_start_id <= max_id:
# Verify we haven't been working for too long.
work_elapsed = datetime.utcnow() - batch_start_time
if work_elapsed > max_query_time:
logger.error('Retrieval of logs `%s/%s` timed out with time of `%s`',
namespace_id, repository_id, work_elapsed)
raise LogsIterationTimeout()
id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)]
# Load the next set of logs.
def load_logs():
logger.debug('Retrieving logs over range %s with namespace %s and repository %s',
id_range, namespace_id, repository_id)
logs_query = model.log.get_logs_query(namespace=namespace_id,
repository=repository_id,
id_range=id_range)
return [Log.for_logentry(log) for log in logs_query]
logs, elapsed = _run_and_time(load_logs)
if elapsed > max_query_time:
logger.error('Retrieval of logs for export logs `%s/%s` with range `%s` timed out at `%s`',
namespace_id, repository_id, id_range, elapsed)
raise LogsIterationTimeout()
yield logs
# Move forward.
current_start_id = id_range[1] + 1
# Increase the batch size if necessary.
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2)
def _run_and_time(fn):
start_time = datetime.utcnow()
result = fn()
return result, datetime.utcnow() - start_time
table_logs_model = TableLogsModel()

View file

@ -44,30 +44,34 @@ def find_uncounted_repository():
return None
def count_repository_actions(to_count):
""" Aggregates repository actions from the LogEntry table for the last day and writes them to
the RepositoryActionCount table. Return True if the repository was updated and False
otherwise.
def count_repository_actions(to_count, day):
""" Aggregates repository actions from the LogEntry table for the specified day. Returns the
count or None on error.
"""
today = date.today()
yesterday = today - timedelta(days=1)
# TODO(LogMigrate): Remove the branch once we're back on a single table.
def lookup_action_count(model):
return (model
.select()
.where(model.repository == to_count,
model.datetime >= yesterday,
model.datetime < today)
model.datetime >= day,
model.datetime < (day + timedelta(days=1)))
.count())
actions = (lookup_action_count(LogEntry3) + lookup_action_count(LogEntry2) +
lookup_action_count(LogEntry))
return actions
def store_repository_action_count(repository, day, action_count):
""" Stores the action count for a repository for a specific day. Returns False if the
repository already has an entry for the specified day.
"""
try:
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
RepositoryActionCount.create(repository=repository, date=day, count=action_count)
return True
except IntegrityError:
logger.debug('Count already written for repository %s', to_count.id)
logger.debug('Count already written for repository %s', repository.id)
return False

View file

@ -18,6 +18,8 @@ from auth import scopes
from auth.auth_context import (get_authenticated_context, get_authenticated_user,
get_validated_oauth_token)
from auth.decorators import process_oauth
from data.logs_model import logs_model
from data import model as data_model
from endpoints.csrf import csrf_protect
from endpoints.exception import (Unauthorized, InvalidRequest, InvalidResponse,
FreshLoginRequired, NotFound)
@ -365,7 +367,7 @@ def request_error(exception=None, **kwargs):
def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None):
if not metadata:
metadata = {}
oauth_token = get_validated_oauth_token()
if oauth_token:
metadata['oauth_token_id'] = oauth_token.id
@ -373,11 +375,15 @@ def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None):
metadata['oauth_token_application'] = oauth_token.application.name
performer = get_authenticated_user()
if repo:
repo_name = repo.name
model.log_action(kind, user_or_orgname, repo_name, performer, request.remote_addr, metadata)
if repo_name is not None:
repo = data_model.repository.get_repository(user_or_orgname, repo_name)
logs_model.log_action(kind, user_or_orgname,
repository=repo,
performer=performer,
ip=request.remote_addr,
metadata=metadata)
def define_json_response(schema_name):

View file

@ -1,18 +1,19 @@
from __init__models_interface import InitDataInterface
from data import model
from data.logs_model import logs_model
class PreOCIModel(InitDataInterface):
def is_app_repository(self, namespace_name, repository_name):
return model.repository.get_repository(namespace_name, repository_name, kind_filter='application') is not None
return model.repository.get_repository(namespace_name, repository_name,
kind_filter='application') is not None
def repository_is_public(self, namespace_name, repository_name):
return model.repository.repository_is_public(namespace_name, repository_name)
def log_action(self, kind, namespace_name, repository_name, performer, ip, metadata):
repository = model.repository.get_repository(namespace_name, repository_name)
model.log.log_action(kind, namespace_name, performer=performer, ip=ip, metadata=metadata, repository=repository)
logs_model.log_action(kind, namespace_name, performer=performer, ip=ip, metadata=metadata,
repository=repository)
pre_oci_model = PreOCIModel()
pre_oci_model = PreOCIModel()

View file

@ -1,81 +1,70 @@
""" Access usage logs for organizations or repositories. """
import json
import uuid
from datetime import datetime, timedelta
from flask import request
import features
from app import export_action_logs_queue
from app import app, export_action_logs_queue, avatar
from auth.permissions import AdministerOrganizationPermission
from auth.auth_context import get_authenticated_user
from auth import scopes
from data.logs_model import logs_model
from data.registry_model import registry_model
from endpoints.api import (resource, nickname, ApiResource, query_param, parse_args,
RepositoryParamResource, require_repo_admin, related_user_resource,
format_date, require_user_admin, path_param, require_scope, page_support,
validate_json_request, InvalidRequest, show_if)
from data import model as data_model
from endpoints.api.logs_models_pre_oci import pre_oci_model as model
from endpoints.exception import Unauthorized, NotFound
from auth.permissions import AdministerOrganizationPermission
from auth.auth_context import get_authenticated_user
from auth import scopes
LOGS_PER_PAGE = 20
SERVICE_LEVEL_LOG_KINDS = set(['service_key_create', 'service_key_approve', 'service_key_delete',
'service_key_modify', 'service_key_extend', 'service_key_rotate'])
def _parse_datetime(dt_string):
if not dt_string:
return None
try:
return datetime.strptime(dt_string + ' UTC', '%m/%d/%Y %Z')
except ValueError:
return None
def _validate_logs_arguments(start_time, end_time):
if start_time:
try:
start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z')
except ValueError:
start_time = None
if not start_time:
start_time = datetime.today() - timedelta(7) # One week
if end_time:
try:
end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z')
end_time = end_time + timedelta(days=1)
except ValueError:
end_time = None
if not end_time:
end_time = datetime.today()
start_time = _parse_datetime(start_time) or (datetime.today() - timedelta(days=1))
end_time = _parse_datetime(end_time) or datetime.today()
end_time = end_time + timedelta(days=1)
return start_time, end_time
def get_logs(start_time, end_time, performer_name=None, repository_name=None, namespace_name=None,
page_token=None, ignore=None):
def _get_logs(start_time, end_time, performer_name=None, repository_name=None, namespace_name=None,
page_token=None, filter_kinds=None):
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
kinds = model.get_log_entry_kinds()
log_entry_page = model.get_logs_query(start_time, end_time, performer_name, repository_name,
namespace_name, ignore, page_token)
log_entry_page = logs_model.lookup_logs(start_time, end_time, performer_name, repository_name,
namespace_name, filter_kinds, page_token,
app.config['ACTION_LOG_MAX_PAGE'])
include_namespace = namespace_name is None and repository_name is None
return {
'start_time': format_date(start_time),
'end_time': format_date(end_time),
'logs': [log.to_dict(kinds, include_namespace) for log in log_entry_page.logs],
'logs': [log.to_dict(avatar, include_namespace) for log in log_entry_page.logs],
}, log_entry_page.next_page_token
def get_aggregate_logs(start_time, end_time, performer_name=None, repository=None, namespace=None,
ignore=None):
def _get_aggregate_logs(start_time, end_time, performer_name=None, repository=None, namespace=None,
filter_kinds=None):
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
kinds = model.get_log_entry_kinds()
aggregated_logs = model.get_aggregated_logs(start_time, end_time, performer_name=performer_name,
repository_name=repository, namespace_name=namespace,
ignore=ignore)
aggregated_logs = logs_model.get_aggregated_log_counts(start_time, end_time,
performer_name=performer_name,
repository_name=repository,
namespace_name=namespace,
filter_kinds=filter_kinds)
return {
'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs]
'aggregated': [log.to_dict() for log in aggregated_logs]
}
@ -87,18 +76,20 @@ class RepositoryLogs(RepositoryParamResource):
@require_repo_admin
@nickname('listRepoLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. The time should be formatted "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time to which to get logs. The time should be formatted "%m/%d/%Y" in UTC.', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@page_support()
def get(self, namespace, repository, page_token, parsed_args):
""" List the logs for the specified repository. """
if model.repo_exists(namespace, repository) is False:
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return get_logs(start_time, end_time, repository_name=repository, page_token=page_token,
namespace_name=namespace)
return _get_logs(start_time, end_time,
repository_name=repository,
page_token=page_token,
namespace_name=namespace)
@resource('/v1/user/logs')
@ -108,8 +99,8 @@ class UserLogs(ApiResource):
@require_user_admin
@nickname('listUserLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('performer', 'Username for which to filter logs.', type=str)
@page_support()
def get(self, parsed_args, page_token):
@ -119,9 +110,11 @@ class UserLogs(ApiResource):
end_time = parsed_args['endtime']
user = get_authenticated_user()
return get_logs(start_time, end_time, performer_name=performer_name,
namespace_name=user.username, page_token=page_token,
ignore=SERVICE_LEVEL_LOG_KINDS)
return _get_logs(start_time, end_time,
performer_name=performer_name,
namespace_name=user.username,
page_token=page_token,
filter_kinds=SERVICE_LEVEL_LOG_KINDS)
@resource('/v1/organization/<orgname>/logs')
@ -132,8 +125,8 @@ class OrgLogs(ApiResource):
@nickname('listOrgLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('performer', 'Username for which to filter logs.', type=str)
@page_support()
@require_scope(scopes.ORG_ADMIN)
@ -145,8 +138,10 @@ class OrgLogs(ApiResource):
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return get_logs(start_time, end_time, namespace_name=orgname, performer_name=performer_name,
page_token=page_token, ignore=SERVICE_LEVEL_LOG_KINDS)
return _get_logs(start_time, end_time,
namespace_name=orgname,
performer_name=performer_name,
page_token=page_token)
raise Unauthorized()
@ -160,16 +155,18 @@ class RepositoryAggregateLogs(RepositoryParamResource):
@require_repo_admin
@nickname('getAggregateRepoLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
def get(self, namespace, repository, parsed_args):
""" Returns the aggregated logs for the specified repository. """
if model.repo_exists(namespace, repository) is False:
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return get_aggregate_logs(start_time, end_time, repository=repository, namespace=namespace)
return _get_aggregate_logs(start_time, end_time,
repository=repository,
namespace=namespace)
@resource('/v1/user/aggregatelogs')
@ -180,8 +177,8 @@ class UserAggregateLogs(ApiResource):
@require_user_admin
@nickname('getAggregateUserLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('performer', 'Username for which to filter logs.', type=str)
def get(self, parsed_args):
""" Returns the aggregated logs for the current user. """
@ -190,8 +187,10 @@ class UserAggregateLogs(ApiResource):
end_time = parsed_args['endtime']
user = get_authenticated_user()
return get_aggregate_logs(start_time, end_time, performer_name=performer_name,
namespace=user.username, ignore=SERVICE_LEVEL_LOG_KINDS)
return _get_aggregate_logs(start_time, end_time,
performer_name=performer_name,
namespace=user.username,
filter_kinds=SERVICE_LEVEL_LOG_KINDS)
@resource('/v1/organization/<orgname>/aggregatelogs')
@ -203,8 +202,8 @@ class OrgAggregateLogs(ApiResource):
@nickname('getAggregateOrgLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('performer', 'Username for which to filter logs.', type=str)
@require_scope(scopes.ORG_ADMIN)
def get(self, orgname, parsed_args):
@ -215,46 +214,13 @@ class OrgAggregateLogs(ApiResource):
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return get_aggregate_logs(start_time, end_time, namespace=orgname,
performer_name=performer_name, ignore=SERVICE_LEVEL_LOG_KINDS)
return _get_aggregate_logs(start_time, end_time,
namespace=orgname,
performer_name=performer_name)
raise Unauthorized()
def queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None):
export_id = str(uuid.uuid4())
namespace = data_model.user.get_namespace_user(namespace_name)
if namespace is None:
raise InvalidRequest('Unknown namespace')
repository = None
if repository_name is not None:
repository = data_model.repository.get_repository(namespace_name, repository_name)
if repository is None:
raise InvalidRequest('Unknown repository')
callback_url = options.get('callback_url')
if callback_url:
if not callback_url.startswith('https://') and not callback_url.startswith('http://'):
raise InvalidRequest('Invalid callback URL')
export_action_logs_queue.put([namespace_name], json.dumps({
'export_id': export_id,
'repository_id': repository.id if repository else None,
'namespace_id': namespace.id,
'namespace_name': namespace.username,
'repository_name': repository.name if repository else None,
'start_time': start_time,
'end_time': end_time,
'callback_url': callback_url,
'callback_email': options.get('callback_email'),
}), retries_remaining=3)
return {
'export_id': export_id,
}
EXPORT_LOGS_SCHEMA = {
'type': 'object',
'description': 'Configuration for an export logs operation',
@ -271,6 +237,27 @@ EXPORT_LOGS_SCHEMA = {
}
def _queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None):
callback_url = options.get('callback_url')
if callback_url:
if not callback_url.startswith('https://') and not callback_url.startswith('http://'):
raise InvalidRequest('Invalid callback URL')
callback_email = options.get('callback_email')
if callback_email:
if callback_email.find('@') < 0:
raise InvalidRequest('Invalid callback e-mail')
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
export_id = logs_model.queue_logs_export(start_time, end_time, export_action_logs_queue,
namespace_name, repository_name, callback_url,
callback_email)
if export_id is None:
raise InvalidRequest('Invalid export request')
return export_id
@resource('/v1/repository/<apirepopath:repository>/exportlogs')
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
class ExportRepositoryLogs(RepositoryParamResource):
@ -282,18 +269,21 @@ class ExportRepositoryLogs(RepositoryParamResource):
@require_repo_admin
@nickname('exportRepoLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@validate_json_request('ExportLogs')
def post(self, namespace, repository, parsed_args):
""" Queues an export of the logs for the specified repository. """
if model.repo_exists(namespace, repository) is False:
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return queue_logs_export(start_time, end_time, request.get_json(), namespace,
repository_name=repository)
export_id = _queue_logs_export(start_time, end_time, request.get_json(), namespace,
repository_name=repository)
return {
'export_id': export_id,
}
@resource('/v1/user/exportlogs')
@ -306,8 +296,8 @@ class ExportUserLogs(ApiResource):
@require_user_admin
@nickname('exportUserLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@validate_json_request('ExportLogs')
def post(self, parsed_args):
""" Returns the aggregated logs for the current user. """
@ -315,7 +305,10 @@ class ExportUserLogs(ApiResource):
end_time = parsed_args['endtime']
user = get_authenticated_user()
return queue_logs_export(start_time, end_time, request.get_json(), user.username)
export_id = _queue_logs_export(start_time, end_time, request.get_json(), user.username)
return {
'export_id': export_id,
}
@resource('/v1/organization/<orgname>/exportlogs')
@ -330,8 +323,8 @@ class ExportOrgLogs(ApiResource):
@nickname('exportOrgLogs')
@parse_args()
@query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str)
@query_param('starttime', 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@query_param('endtime', 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str)
@require_scope(scopes.ORG_ADMIN)
@validate_json_request('ExportLogs')
def post(self, orgname, parsed_args):
@ -341,6 +334,9 @@ class ExportOrgLogs(ApiResource):
start_time = parsed_args['starttime']
end_time = parsed_args['endtime']
return queue_logs_export(start_time, end_time, request.get_json(), orgname)
export_id = _queue_logs_export(start_time, end_time, request.get_json(), orgname)
return {
'export_id': export_id,
}
raise Unauthorized()

View file

@ -1,141 +0,0 @@
import json
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from datetime import datetime
from dateutil.relativedelta import relativedelta
from six import add_metaclass
from tzlocal import get_localzone
from app import avatar
from endpoints.api import format_date
from util.morecollections import AttrDict
class LogEntry(
namedtuple('LogEntry', [
'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot',
'account_organization', 'account_username', 'account_email', 'account_robot', 'kind_id'
])):
"""
LogEntry a single log entry.
:type metadata_json: string
:type ip: string
:type datetime: string
:type performer_email: int
:type performer_username: string
:type performer_robot: boolean
:type account_organization: boolean
:type account_username: string
:type account_email: string
:type account_robot: boolean
:type kind_id: int
"""
def to_dict(self, kinds, include_namespace):
view = {
'kind': kinds[self.kind_id],
'metadata': json.loads(self.metadata_json),
'ip': self.ip,
'datetime': format_date(self.datetime),
}
if self.performer_username:
performer = AttrDict({'username': self.performer_username, 'email': self.performer_email})
performer.robot = None
if self.performer_robot:
performer.robot = self.performer_robot
view['performer'] = {
'kind': 'user',
'name': self.performer_username,
'is_robot': self.performer_robot,
'avatar': avatar.get_data_for_user(performer),
}
if include_namespace:
if self.account_username:
account = AttrDict({'username': self.account_username, 'email': self.account_email})
if self.account_organization:
view['namespace'] = {
'kind': 'org',
'name': self.account_username,
'avatar': avatar.get_data_for_org(account),
}
else:
account.robot = None
if self.account_robot:
account.robot = self.account_robot
view['namespace'] = {
'kind': 'user',
'name': self.account_username,
'avatar': avatar.get_data_for_user(account),
}
return view
class LogEntryPage(
namedtuple('LogEntryPage', ['logs', 'next_page_token'])):
"""
LogEntryPage represents a single page of logs.
:type logs: [LogEntry]
:type next_page_token: {any -> any}
"""
class AggregatedLogEntry(
namedtuple('AggregatedLogEntry', ['count', 'kind_id', 'day'])):
"""
AggregatedLogEntry represents an aggregated view of logs.
:type count: int
:type kind_id: int
:type day: string
"""
def to_dict(self, kinds, start_time):
synthetic_date = datetime(start_time.year, start_time.month, int(self.day), tzinfo=get_localzone())
if synthetic_date.day < start_time.day:
synthetic_date = synthetic_date + relativedelta(months=1)
view = {
'kind': kinds[self.kind_id],
'count': self.count,
'datetime': format_date(synthetic_date),
}
return view
@add_metaclass(ABCMeta)
class LogEntryDataInterface(object):
"""
Interface that represents all data store interactions required by a Log.
"""
@abstractmethod
def get_logs_query(self, start_time, end_time, performer_name=None, repository_name=None, namespace_name=None,
ignore=None, page_token=None):
"""
Returns a LogEntryPage.
"""
@abstractmethod
def get_log_entry_kinds(self):
"""
Returns a map of LogEntryKind id -> name and name -> id
"""
@abstractmethod
def repo_exists(self, namespace_name, repository_name):
"""
Returns whether or not a repo exists.
"""
@abstractmethod
def get_aggregated_logs(self, start_time, end_time, performer_name=None, repository_name=None, namespace_name=None,
ignore=None):
"""
Returns a list of aggregated logs
"""

View file

@ -1,122 +0,0 @@
import itertools
from app import app
from data import model, database
from endpoints.api.logs_models_interface import LogEntryDataInterface, LogEntryPage, LogEntry, AggregatedLogEntry
def create_log(log):
account_organization = None
account_username = None
account_email = None
account_robot = None
try:
account_organization = log.account.organization
account_username = log.account.username
account_email = log.account.email
account_robot = log.account.robot
except AttributeError:
pass
performer_robot = None
performer_username = None
performer_email = None
try:
performer_robot = log.performer.robot
performer_username = log.performer.username
performer_email = log.performer.email
except AttributeError:
pass
return LogEntry(log.metadata_json, log.ip, log.datetime, performer_email, performer_username,
performer_robot, account_organization, account_username,
account_email, account_robot, log.kind_id)
class PreOCIModel(LogEntryDataInterface):
"""
PreOCIModel implements the data model for the Tags using a database schema
before it was changed to support the OCI specification.
"""
def get_logs_query(self, start_time, end_time, performer_name=None, repository_name=None,
namespace_name=None, ignore=None, page_token=None):
repo = None
if repository_name and namespace_name:
repo = model.repository.get_repository(namespace_name, repository_name)
performer = None
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on a single table.
def get_logs(m):
logs_query = model.log.get_logs_query(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=m)
logs, next_page_token = model.modelutil.paginate(logs_query, m,
descending=True, page_token=page_token,
limit=20,
max_page=app.config['ACTION_LOG_MAX_PAGE'])
return LogEntryPage([create_log(log) for log in logs], next_page_token)
# First check the LogEntry3 table for the most recent logs, unless we've been expressly told
# to look inside the other tables.
TOKEN_TABLE_ID = 'tti'
tables = [database.LogEntry3, database.LogEntry2, database.LogEntry]
table_index = 0
table_specified = page_token is not None and page_token.get(TOKEN_TABLE_ID) is not None
if table_specified:
table_index = page_token.get(TOKEN_TABLE_ID)
page_result = get_logs(tables[table_index])
if page_result.next_page_token is None and table_index < len(tables) - 1:
page_result = page_result._replace(next_page_token={TOKEN_TABLE_ID: table_index + 1})
return page_result
def get_log_entry_kinds(self):
return model.log.get_log_entry_kinds()
def repo_exists(self, namespace_name, repository_name):
repo = model.repository.get_repository(namespace_name, repository_name)
if repo is None:
return False
return True
def get_aggregated_logs(self, start_time, end_time, performer_name=None, repository_name=None,
namespace_name=None, ignore=None):
repo = None
if repository_name and namespace_name:
repo = model.repository.get_repository(namespace_name, repository_name)
performer = None
if performer_name:
performer = model.user.get_user(performer_name)
# TODO(LogMigrate): Remove the branch once we're back on a single table.
aggregated_logs = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry)
aggregated_logs_2 = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry2)
aggregated_logs_3 = model.log.get_aggregated_logs(start_time, end_time, performer=performer,
repository=repo, namespace=namespace_name,
ignore=ignore, model=database.LogEntry3)
entries = {}
for log in itertools.chain(aggregated_logs, aggregated_logs_2, aggregated_logs_3):
key = '%s-%s' % (log.kind_id, log.day)
if key in entries:
entries[key] = AggregatedLogEntry(log.count + entries[key].count, log.kind_id, log.day)
else:
entries[key] = AggregatedLogEntry(log.count, log.kind_id, log.day)
return entries.values()
pre_oci_model = PreOCIModel()

View file

@ -4,7 +4,7 @@ import os
import string
import socket
from datetime import datetime, timedelta
from datetime import datetime
from random import SystemRandom
from flask import request, make_response, jsonify
@ -16,6 +16,7 @@ from auth import scopes
from auth.auth_context import get_authenticated_user
from auth.permissions import SuperUserPermission
from data.database import ServiceKeyApprovalType
from data.logs_model import logs_model
from endpoints.api import (ApiResource, nickname, resource, validate_json_request,
internal_only, require_scope, show_if, parse_args,
query_param, require_fresh_login, path_param, verify_not_prod,
@ -25,36 +26,13 @@ from endpoints.api.build import get_logs_or_log_url
from endpoints.api.superuser_models_pre_oci import (pre_oci_model, ServiceKeyDoesNotExist,
ServiceKeyAlreadyApproved,
InvalidRepositoryBuildException)
from endpoints.api.logs_models_pre_oci import pre_oci_model as log_model
from endpoints.api.logs import _validate_logs_arguments
from util.useremails import send_confirmation_email, send_recovery_email
from _init import ROOT_DIR
logger = logging.getLogger(__name__)
def _validate_logs_arguments(start_time, end_time):
if start_time:
try:
start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z')
except ValueError:
start_time = None
if not start_time:
start_time = datetime.today() - timedelta(7) # One week
if end_time:
try:
end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z')
end_time = end_time + timedelta(days=1)
except ValueError:
end_time = None
if not end_time:
end_time = datetime.today()
return start_time, end_time
def get_immediate_subdirectories(directory):
return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))]
@ -134,10 +112,9 @@ class SuperUserAggregateLogs(ApiResource):
if SuperUserPermission().can():
(start_time, end_time) = _validate_logs_arguments(parsed_args['starttime'],
parsed_args['endtime'])
aggregated_logs = log_model.get_aggregated_logs(start_time, end_time)
kinds = log_model.get_log_entry_kinds()
aggregated_logs = logs_model.get_aggregated_log_counts(start_time, end_time)
return {
'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs]
'aggregated': [log.to_dict() for log in aggregated_logs]
}
raise Unauthorized()
@ -166,13 +143,12 @@ class SuperUserLogs(ApiResource):
end_time = parsed_args['endtime']
(start_time, end_time) = _validate_logs_arguments(start_time, end_time)
log_page = log_model.get_logs_query(start_time, end_time, page_token=page_token)
kinds = log_model.get_log_entry_kinds()
log_entry_page = logs_model.lookup_logs(start_time, end_time, page_token=page_token)
return {
'start_time': format_date(start_time),
'end_time': format_date(end_time),
'logs': [log.to_dict(kinds, include_namespace=True) for log in log_page.logs],
}, log_page.next_page_token
'logs': [log.to_dict(avatar, include_namespace=True) for log in log_entry_page.logs],
}, log_entry_page.next_page_token
raise Unauthorized()

View file

@ -208,120 +208,12 @@ class Organization(namedtuple('Organization', ['username', 'email'])):
}
class LogEntry(
namedtuple('LogEntry', [
'metadata_json', 'ip', 'datetime', 'performer_email', 'performer_username', 'performer_robot',
'account_organization', 'account_username', 'account_email', 'account_robot', 'kind',
])):
"""
LogEntry a single log entry.
:type metadata_json: string
:type ip: string
:type datetime: string
:type performer_email: int
:type performer_username: string
:type performer_robot: boolean
:type account_organization: boolean
:type account_username: string
:type account_email: string
:type account_robot: boolean
:type kind_id: int
"""
def to_dict(self):
view = {
'kind': self.kind,
'metadata': json.loads(self.metadata_json),
'ip': self.ip,
'datetime': format_date(self.datetime),
}
if self.performer_username:
performer = AttrDict({'username': self.performer_username, 'email': self.performer_email})
performer.robot = None
if self.performer_robot:
performer.robot = self.performer_robot
view['performer'] = {
'kind': 'user',
'name': self.performer_username,
'is_robot': self.performer_robot,
'avatar': avatar.get_data_for_user(performer),
}
if self.account_username:
account = AttrDict({'username': self.account_username, 'email': self.account_email})
if self.account_organization:
view['namespace'] = {
'kind': 'org',
'name': self.account_username,
'avatar': avatar.get_data_for_org(account),
}
else:
account.robot = None
if self.account_robot:
account.robot = self.account_robot
view['namespace'] = {
'kind': 'user',
'name': self.account_username,
'avatar': avatar.get_data_for_user(account),
}
return view
class LogEntryPage(
namedtuple('LogEntryPage', ['logs', 'next_page_token'])):
"""
LogEntryPage represents a single page of logs.
:type logs: [LogEntry]
:type next_page_token: {any -> any}
"""
class AggregatedLogEntry(
namedtuple('AggregatedLogEntry', ['count', 'kind_id', 'day', 'start_time'])):
"""
AggregatedLogEntry represents an aggregated view of logs.
:type count: int
:type kind_id: int
:type day: string
:type start_time: Date
"""
def to_dict(self):
synthetic_date = datetime(self.start_time.year, self.start_time.month, int(self.day), tzinfo=get_localzone())
if synthetic_date.day < self.start_time.day:
synthetic_date = synthetic_date + relativedelta(months=1)
kinds = model.log.get_log_entry_kinds()
view = {
'kind': kinds[self.kind_id],
'count': self.count,
'datetime': format_date(synthetic_date),
}
return view
@add_metaclass(ABCMeta)
class SuperuserDataInterface(object):
"""
Interface that represents all data store interactions required by a superuser api.
"""
@abstractmethod
def get_logs_query(self, start_time, end_time, page_token=None):
"""
Returns a LogEntryPage.
"""
@abstractmethod
def get_aggregated_logs(self, start_time, end_time):
"""
Returns a list of AggregatedLogEntry
"""
@abstractmethod
def get_organizations(self):
"""

View file

@ -7,37 +7,8 @@ from auth.permissions import ReadRepositoryPermission, ModifyRepositoryPermissio
from data import model, database
from endpoints.api.build import get_job_config, _get_build_status
from endpoints.api.superuser_models_interface import BuildTrigger
from endpoints.api.superuser_models_interface import SuperuserDataInterface, LogEntryPage, LogEntry, Organization, User, \
ServiceKey, Approval, RepositoryBuild, AggregatedLogEntry
def _create_log(log, log_kind):
account_organization = None
account_username = None
account_email = None
account_robot = None
try:
account_organization = log.account.organization
account_username = log.account.username
account_email = log.account.email
account_robot = log.account.robot
except AttributeError:
pass
performer_robot = None
performer_username = None
performer_email = None
try:
performer_robot = log.performer.robot
performer_username = log.performer.username
performer_email = log.performer.email
except AttributeError:
pass
return LogEntry(log.metadata_json, log.ip, log.datetime, performer_email, performer_username,
performer_robot, account_organization, account_username,
account_email, account_robot, log_kind[log.kind_id])
from endpoints.api.superuser_models_interface import SuperuserDataInterface, Organization, User, \
ServiceKey, Approval, RepositoryBuild
def _create_user(user):
@ -205,16 +176,5 @@ class PreOCIModel(SuperuserDataInterface):
def get_organizations(self):
return [Organization(org.username, org.email) for org in model.organization.get_organizations()]
def get_aggregated_logs(self, start_time, end_time):
aggregated_logs = model.log.get_aggregated_logs(start_time, end_time)
return [AggregatedLogEntry(log.count, log.kind_id, log.day, start_time) for log in aggregated_logs]
def get_logs_query(self, start_time, end_time, page_token=None):
logs_query = model.log.get_logs_query(start_time, end_time)
logs, next_page_token = model.modelutil.paginate(logs_query, database.LogEntry, descending=True,
page_token=page_token, limit=20)
kinds = model.log.get_log_entry_kinds()
return LogEntryPage([_create_log(log, kinds) for log in logs], next_page_token)
pre_oci_model = PreOCIModel()

View file

@ -1,131 +0,0 @@
import pytest
from mock import Mock
from data import model, database
from endpoints.api.logs_models_interface import LogEntry, LogEntryPage, AggregatedLogEntry
from endpoints.api.logs_models_pre_oci import pre_oci_model
from util.morecollections import AttrDict
def test_get_logs_query(monkeypatch):
get_repository_mock = Mock()
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
get_user_mock = Mock()
monkeypatch.setattr(model.user, 'get_user', get_user_mock)
get_logs_query_mock = Mock()
monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock)
paginate_mock = Mock()
paginate_mock.return_value = ([], {})
monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock)
assert pre_oci_model.get_logs_query('start_time', 'end_time', 'preformer_namne', 'repository_name', 'namespace_name',
set(), None) == LogEntryPage([], {})
def test_get_logs_query_returns_list_log_entries(monkeypatch):
get_repository_mock = Mock()
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
get_user_mock = Mock()
monkeypatch.setattr(model.user, 'get_user', get_user_mock)
get_logs_query_mock = Mock()
monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock)
paginate_mock = Mock()
paginate_mock.return_value = ([AttrDict({'kind': 1, 'datetime': 'datetime', 'ip': 'ip', 'metadata_json': '{}',
'account': AttrDict(
{'username': 'account_username', 'email': 'account_email', 'robot': False,
'organization': False}),
'performer': AttrDict(
{'email': 'performer_email', 'username': 'performer_username',
'robot': False}), 'kind_id': 1})], {'key': 'value'})
monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock)
assert pre_oci_model.get_logs_query('start_time', 'end_time', 'performer_username', 'repository_name',
'namespace_name',
set(), {'start_id': 1}) == LogEntryPage([
LogEntry('{}', 'ip', 'datetime', 'performer_email', 'performer_username', False,
False, 'account_username', 'account_email', False, 1)], {'key': 'value'})
@pytest.mark.skip('Turned off until we move back to a single LogEntry table')
def test_get_logs_query_calls_get_repository(monkeypatch):
repo_mock = Mock()
performer_mock = Mock()
query_mock = Mock()
get_repository_mock = Mock()
get_repository_mock.return_value = repo_mock
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
get_user_mock = Mock()
get_user_mock.return_value = performer_mock
monkeypatch.setattr(model.user, 'get_user', get_user_mock)
get_logs_query_mock = Mock()
get_logs_query_mock.return_value = query_mock
monkeypatch.setattr(model.log, 'get_logs_query', get_logs_query_mock)
paginate_mock = Mock()
page_token = {}
paginate_mock.return_value = ([], page_token)
monkeypatch.setattr(model.modelutil, 'paginate', paginate_mock)
ignore = set()
pre_oci_model.get_logs_query('start_time', 'end_time', 'performer_username', 'repository_name', 'namespace_name',
ignore, page_token)
get_repository_mock.assert_called_once_with('namespace_name', 'repository_name')
get_user_mock.assert_called_once_with('performer_username')
get_logs_query_mock.assert_called_once_with('start_time', 'end_time', performer=performer_mock, repository=repo_mock,
namespace='namespace_name', ignore=ignore)
paginate_mock.assert_called_once_with(query_mock, database.LogEntry, descending=True,
page_token=page_token, limit=20)
def test_get_log_entry_kinds(monkeypatch):
get_log_entry_kinds_mock = Mock()
monkeypatch.setattr(model.log, 'get_log_entry_kinds', get_log_entry_kinds_mock)
pre_oci_model.get_log_entry_kinds()
get_log_entry_kinds_mock.assert_called_once_with()
def test_does_repo_exist_returns_false(monkeypatch):
get_repository_mock = Mock()
get_repository_mock.return_value = None
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
assert pre_oci_model.repo_exists('namespace_name', 'repository_name') is False
def test_does_repo_exist_returns_true(monkeypatch):
get_repository_mock = Mock()
get_repository_mock.return_value = True
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
assert pre_oci_model.repo_exists('namespace_name', 'repository_name') is True
def test_get_aggregated_logs(monkeypatch):
get_aggregated_logs_mock = Mock()
get_aggregated_logs_mock.side_effect = [[AttrDict({'day': '1', 'kind_id': 4, 'count': 6})],
[AttrDict({'day': '1', 'kind_id': 4, 'count': 12})],
[AttrDict({'day': '1', 'kind_id': 4, 'count': 3})]]
monkeypatch.setattr(model.log, 'get_aggregated_logs', get_aggregated_logs_mock)
repo_mock = Mock()
get_repository_mock = Mock()
get_repository_mock.return_value = repo_mock
monkeypatch.setattr(model.repository, 'get_repository', get_repository_mock)
performer_mock = Mock()
get_user_mock = Mock()
get_user_mock.return_value = performer_mock
monkeypatch.setattr(model.user, 'get_user', get_user_mock)
actual = pre_oci_model.get_aggregated_logs('start_time', 'end_time', 'performer_name', 'repository_name',
'namespace_name', set())
assert actual == [AggregatedLogEntry(21, 4, '1')]

View file

@ -14,6 +14,7 @@ from auth.auth_context import get_authenticated_user
from auth.credentials import validate_credentials
from auth.decorators import process_auth
from auth.permissions import CreateRepositoryPermission, ModifyRepositoryPermission
from data.logs_model import logs_model
from endpoints.appr import appr_bp, require_app_repo_read, require_app_repo_write
from endpoints.appr.cnr_backend import Blob, Channel, Package, User
from endpoints.appr.decorators import disallow_for_image_repository
@ -105,8 +106,8 @@ def list_packages():
def delete_package(namespace, package_name, release, media_type):
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_package(reponame, release, media_type, package_class=Package)
model.log_action('delete_tag', namespace, repo_name=package_name,
metadata={'release': release, 'mediatype': media_type})
logs_model.log_action('delete_tag', namespace, repository_name=package_name,
metadata={'release': release, 'mediatype': media_type})
return jsonify(result)
@ -160,8 +161,8 @@ def pull(namespace, package_name, release, media_type):
logger.debug('Pull of release %s of app repository %s/%s', release, namespace, package_name)
reponame = repo_name(namespace, package_name)
data = cnr_registry.pull(reponame, release, media_type, Package, blob_class=Blob)
model.log_action('pull_repo', namespace, repo_name=package_name,
metadata={'release': release, 'mediatype': media_type})
logs_model.log_action('pull_repo', namespace, repository_name=package_name,
metadata={'release': release, 'mediatype': media_type})
json_format = request.args.get('format', None) == 'json'
return _pull(data, json_format)
@ -188,7 +189,7 @@ def push(namespace, package_name):
{"package": reponame,
"scopes": ['create']})
Package.create_repository(reponame, private, owner)
model.log_action('create_repo', namespace, repo_name=package_name)
logs_model.log_action('create_repo', namespace, repository_name=package_name)
if not ModifyRepositoryPermission(namespace, package_name).can():
raise Forbidden("Unauthorized access for: %s" % reponame,
@ -211,8 +212,8 @@ def push(namespace, package_name):
blob = Blob(reponame, values['blob'])
app_release = cnr_registry.push(reponame, release_version, media_type, blob, force,
package_class=Package, user=owner, visibility=private)
model.log_action('push_repo', namespace, repo_name=package_name,
metadata={'release': release_version})
logs_model.log_action('push_repo', namespace, repository_name=package_name,
metadata={'release': release_version})
return jsonify(app_release)
@ -265,8 +266,8 @@ def add_channel_release(namespace, package_name, channel_name, release):
reponame = repo_name(namespace, package_name)
result = cnr_registry.add_channel_release(reponame, channel_name, release, channel_class=Channel,
package_class=Package)
model.log_action('create_tag', namespace, repo_name=package_name,
metadata={'channel': channel_name, 'release': release})
logs_model.log_action('create_tag', namespace, repository_name=package_name,
metadata={'channel': channel_name, 'release': release})
return jsonify(result)
@ -296,8 +297,8 @@ def delete_channel_release(namespace, package_name, channel_name, release):
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_channel_release(reponame, channel_name, release,
channel_class=Channel, package_class=Package)
model.log_action('delete_tag', namespace, repo_name=package_name,
metadata={'channel': channel_name, 'release': release})
logs_model.log_action('delete_tag', namespace, repository_name=package_name,
metadata={'channel': channel_name, 'release': release})
return jsonify(result)
@ -312,6 +313,6 @@ def delete_channel(namespace, package_name, channel_name):
_check_channel_name(channel_name)
reponame = repo_name(namespace, package_name)
result = cnr_registry.delete_channel(reponame, channel_name, channel_class=Channel)
model.log_action('delete_tag', namespace, repo_name=package_name,
metadata={'channel': channel_name})
logs_model.log_action('delete_tag', namespace, repository_name=package_name,
metadata={'channel': channel_name})
return jsonify(result)

View file

@ -7,6 +7,7 @@ from flask import request
from app import app, dockerfile_build_queue, metric_queue
from data import model
from data.logs_model import logs_model
from data.database import db
from auth.auth_context import get_authenticated_user
from notifications import spawn_notification
@ -109,8 +110,8 @@ def start_build(repository, prepared_build, pull_robot_name=None):
event_log_metadata['trigger_kind'] = prepared_build.trigger.service.name
event_log_metadata['trigger_metadata'] = prepared_build.metadata or {}
model.log.log_action('build_dockerfile', repository.namespace_user.username,
ip=request.remote_addr, metadata=event_log_metadata, repository=repository)
logs_model.log_action('build_dockerfile', repository.namespace_user.username,
ip=request.remote_addr, metadata=event_log_metadata, repository=repository)
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repo = AttrDict({

View file

@ -6,6 +6,7 @@ from flask import Blueprint, jsonify, abort, request, make_response
from jwt import get_unverified_header
from app import app
from data.logs_model import logs_model
from endpoints.keyserver.models_interface import ServiceKeyDoesNotExist
from endpoints.keyserver.models_pre_oci import pre_oci_model as model
from util.security import jwtutil
@ -127,7 +128,7 @@ def put_service_key(service, kid):
model.create_service_key('', kid, service, jwk, metadata, expiration_date,
rotation_duration=rotation_duration)
model.log_action('service_key_create', request.remote_addr, metadata_dict={
logs_model.log_action('service_key_create', ip=request.remote_addr, metadata={
'kid': kid,
'preshared': False,
'service': service,
@ -151,7 +152,7 @@ def put_service_key(service, kid):
except ServiceKeyDoesNotExist:
abort(404)
model.log_action('service_key_rotate', request.remote_addr, metadata_dict={
logs_model.log_action('service_key_rotate', ip=request.remote_addr, metadata={
'kid': kid,
'signer_kid': signer_key.kid,
'service': service,
@ -187,7 +188,7 @@ def delete_service_key(service, kid):
except ServiceKeyDoesNotExist:
abort(404)
model.log_action('service_key_delete', request.remote_addr, metadata_dict={
logs_model.log_action('service_key_delete', ip=request.remote_addr, metadata={
'kid': kid,
'signer_kid': signer_key.kid,
'service': service,

View file

@ -63,10 +63,3 @@ class KeyServerDataInterface(object):
Deletes and returns a service key with the given kid or raises ServiceKeyNotFound.
"""
pass
@abstractmethod
def log_action(self, action_name, ip, metadata_dict=None):
"""
Records a particular serivce key interaction to an audit log.
"""
pass

View file

@ -38,10 +38,6 @@ class PreOCIModel(KeyServerDataInterface):
except data.model.ServiceKeyDoesNotExist:
raise ServiceKeyDoesNotExist()
def log_action(self, action_name, ip, metadata_dict=None):
metadata_dict = {} if metadata_dict is None else metadata_dict
data.model.log.log_action(action_name, None, metadata=metadata_dict, ip=ip)
pre_oci_model = PreOCIModel()

111
initdb.py
View file

@ -23,6 +23,7 @@ from data.database import (db, all_models, Role, TeamRole, Visibility, LoginServ
ApprTagKind, ApprBlobPlacementLocation, Repository, TagKind,
ManifestChild, TagToRepositoryTag, get_epoch_timestamp_ms)
from data import model
from data.logs_model import logs_model
from data.queue import WorkQueue
from data.registry_model import registry_model
from data.registry_model.registry_pre_oci_model import pre_oci_model
@ -179,11 +180,11 @@ def __generate_service_key(kid, name, user, timestamp, approval_type, expiration
'auto_approved': True
}
model.log.log_action('service_key_approve', None, performer=user,
timestamp=timestamp, metadata=key_metadata)
logs_model.log_action('service_key_approve', None, performer=user,
timestamp=timestamp, metadata=key_metadata)
model.log.log_action('service_key_create', None, performer=user,
timestamp=timestamp, metadata=key_metadata)
logs_model.log_action('service_key_create', None, performer=user,
timestamp=timestamp, metadata=key_metadata)
def __generate_repository(with_storage, user_obj, name, description, is_public, permissions, structure):
@ -583,9 +584,9 @@ def populate_database(minimal=False, with_storage=False):
'manifest_digest': tag_manifest.digest
}
model.log.log_action('manifest_label_add', new_user_1.username, performer=new_user_1,
timestamp=datetime.now(), metadata=label_metadata,
repository=tag_manifest.tag.repository)
logs_model.log_action('manifest_label_add', new_user_1.username, performer=new_user_1,
timestamp=datetime.now(), metadata=label_metadata,
repository=tag_manifest.tag.repository)
model.blob.initiate_upload(new_user_1.username, simple_repo.name, str(uuid4()), 'local_us', {})
model.notification.create_repo_notification(simple_repo, 'repo_push', 'quay_notification', {}, {})
@ -839,66 +840,66 @@ def populate_database(minimal=False, with_storage=False):
token.token_code = 'test'
token.save()
model.log.log_action('org_create_team', org.username, performer=new_user_1,
timestamp=week_ago, metadata={'team': 'readers'})
logs_model.log_action('org_create_team', org.username, performer=new_user_1,
timestamp=week_ago, metadata={'team': 'readers'})
model.log.log_action('org_set_team_role', org.username, performer=new_user_1,
timestamp=week_ago,
metadata={'team': 'readers', 'role': 'read'})
logs_model.log_action('org_set_team_role', org.username, performer=new_user_1,
timestamp=week_ago,
metadata={'team': 'readers', 'role': 'read'})
model.log.log_action('create_repo', org.username, performer=new_user_1,
repository=org_repo, timestamp=week_ago,
metadata={'namespace': org.username, 'repo': 'orgrepo'})
logs_model.log_action('create_repo', org.username, performer=new_user_1,
repository=org_repo, timestamp=week_ago,
metadata={'namespace': org.username, 'repo': 'orgrepo'})
model.log.log_action('change_repo_permission', org.username,
performer=new_user_2, repository=org_repo,
timestamp=six_ago,
metadata={'username': new_user_1.username,
'repo': 'orgrepo', 'role': 'admin'})
logs_model.log_action('change_repo_permission', org.username,
performer=new_user_2, repository=org_repo,
timestamp=six_ago,
metadata={'username': new_user_1.username,
'repo': 'orgrepo', 'role': 'admin'})
model.log.log_action('change_repo_permission', org.username,
performer=new_user_1, repository=org_repo,
timestamp=six_ago,
metadata={'username': new_user_2.username,
'repo': 'orgrepo', 'role': 'read'})
logs_model.log_action('change_repo_permission', org.username,
performer=new_user_1, repository=org_repo,
timestamp=six_ago,
metadata={'username': new_user_2.username,
'repo': 'orgrepo', 'role': 'read'})
model.log.log_action('add_repo_accesstoken', org.username, performer=new_user_1,
repository=org_repo, timestamp=four_ago,
metadata={'repo': 'orgrepo', 'token': 'deploytoken'})
logs_model.log_action('add_repo_accesstoken', org.username, performer=new_user_1,
repository=org_repo, timestamp=four_ago,
metadata={'repo': 'orgrepo', 'token': 'deploytoken'})
model.log.log_action('push_repo', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo'})
logs_model.log_action('push_repo', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo'})
model.log.log_action('pull_repo', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo'})
logs_model.log_action('pull_repo', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo'})
model.log.log_action('pull_repo', org.username, repository=org_repo,
timestamp=today,
metadata={'token': 'sometoken', 'token_code': 'somecode',
'repo': 'orgrepo'})
logs_model.log_action('pull_repo', org.username, repository=org_repo,
timestamp=today,
metadata={'token': 'sometoken', 'token_code': 'somecode',
'repo': 'orgrepo'})
model.log.log_action('delete_tag', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo', 'tag': 'sometag'})
logs_model.log_action('delete_tag', org.username, performer=new_user_2,
repository=org_repo, timestamp=today,
metadata={'username': new_user_2.username,
'repo': 'orgrepo', 'tag': 'sometag'})
model.log.log_action('pull_repo', org.username, repository=org_repo,
timestamp=today,
metadata={'token_code': 'somecode', 'repo': 'orgrepo'})
logs_model.log_action('pull_repo', org.username, repository=org_repo,
timestamp=today,
metadata={'token_code': 'somecode', 'repo': 'orgrepo'})
model.log.log_action('pull_repo', new_user_2.username, repository=publicrepo,
timestamp=yesterday,
metadata={'token_code': 'somecode', 'repo': 'publicrepo'})
logs_model.log_action('pull_repo', new_user_2.username, repository=publicrepo,
timestamp=yesterday,
metadata={'token_code': 'somecode', 'repo': 'publicrepo'})
model.log.log_action('build_dockerfile', new_user_1.username, repository=building,
timestamp=today,
metadata={'repo': 'building', 'namespace': new_user_1.username,
'trigger_id': trigger.uuid, 'config': json.loads(trigger.config),
'service': trigger.service.name})
logs_model.log_action('build_dockerfile', new_user_1.username, repository=building,
timestamp=today,
metadata={'repo': 'building', 'namespace': new_user_1.username,
'trigger_id': trigger.uuid, 'config': json.loads(trigger.config),
'service': trigger.service.name})
model.message.create([{'content': 'We love you, Quay customers!', 'severity': 'info',
'media_type': 'text/plain'}])

View file

@ -26,9 +26,10 @@ from app import (app, config_provider, all_queues, dockerfile_build_queue, notif
from buildtrigger.basehandler import BuildTriggerHandler
from initdb import setup_database_for_testing, finished_database_for_testing
from data import database, model, appr_model
from data.registry_model import registry_model
from data.appr_model.models import NEW_MODELS
from data.database import RepositoryActionCount, Repository as RepositoryTable
from data.logs_model import logs_model
from data.registry_model import registry_model
from test.helpers import assert_action_logged
from util.secscan.fake import fake_security_scanner
@ -2174,8 +2175,8 @@ class TestDeleteRepository(ApiTestCase):
model.notification.create_repo_notification(repository, 'build_queued', 'slack', {}, {})
# Create some logs.
model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
logs_model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
logs_model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
# Create some build triggers.
user = model.user.get_user(ADMIN_ACCESS_USER)
@ -3326,7 +3327,7 @@ class TestOrgRobots(ApiTestCase):
trigger, pull_robot_name=membername)
# Add some log entries for the robot.
model.log.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo)
logs_model.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo)
# Delete the robot and verify it works.
self.deleteEmptyResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'))

View file

@ -7,8 +7,8 @@ from urlparse import urlparse
from flask import request
from app import analytics, userevents, ip_resolver
from data import model
from auth.auth_context import get_authenticated_context, get_authenticated_user
from data.logs_model import logs_model
logger = logging.getLogger(__name__)
@ -74,6 +74,6 @@ def track_and_log(event_name, repo_obj, analytics_name=None, analytics_sample=1,
# Log the action to the database.
logger.debug('Logging the %s to logs system', event_name)
model.log.log_action(event_name, namespace_name, performer=get_authenticated_user(),
ip=request.remote_addr, metadata=metadata, repository=repo_obj)
logs_model.log_action(event_name, namespace_name, performer=get_authenticated_user(),
ip=request.remote_addr, metadata=metadata, repository=repo_obj)
logger.debug('Track and log of %s complete', event_name)

View file

@ -5,7 +5,7 @@ from dateutil.parser import parse as parse_date
from app import app
from data import model
from data.database import ServiceKeyApprovalType
from data.model.log import log_action
from data.logs_model import logs_model
def generate_key(service, name, expiration_date=None, notes=None):
@ -30,8 +30,8 @@ def generate_key(service, name, expiration_date=None, notes=None):
'auto_approved': True,
}
log_action('service_key_create', None, metadata=key_log_metadata)
log_action('service_key_approve', None, metadata=key_log_metadata)
logs_model.log_action('service_key_create', metadata=key_log_metadata)
logs_model.log_action('service_key_approve', metadata=key_log_metadata)
return private_key, key.kid

View file

@ -11,10 +11,10 @@ from enum import Enum, unique
import features
from app import app, export_action_logs_queue, storage, get_app_url
from data import model
from app import app, export_action_logs_queue, storage, get_app_url, avatar
from endpoints.api import format_date
from endpoints.api.logs_models_pre_oci import create_log
from data.logs_model import logs_model
from data.logs_model.interface import LogsIterationTimeout
from workers.queueworker import QueueWorker, JobException
from util.log import logfile_path
from util.useremails import send_logs_exported_email
@ -29,10 +29,6 @@ MAXIMUM_WORK_PERIOD_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_SECONDS
MAXIMUM_QUERY_TIME_SECONDS = app.config.get('EXPORT_ACTION_LOGS_MAXIMUM_QUERY_TIME_SECONDS', 30)
EXPORTED_LOGS_EXPIRATION_SECONDS = app.config.get('EXPORT_ACTION_LOGS_SECONDS', 60 * 60) # 1 hour
MINIMUM_RANGE_SIZE = 1000
MAXIMUM_RANGE_SIZE = 100000
EXPECTED_ITERATION_LOG_COUNT = 1000
@unique
class ExportResult(Enum):
@ -79,29 +75,6 @@ class ExportActionLogsWorker(QueueWorker):
repository_id = job_details['repository_id']
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
min_id, elapsed = _run_and_time(lambda: model.log.get_minimum_id_for_logs(start_time,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of min ID for export logs `%s` timed out with time of `%s`',
export_id, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return
max_id, elapsed = _run_and_time(lambda: model.log.get_maximum_id_for_logs(end_time,
repository_id,
namespace_id))
if elapsed > max_query_time:
logger.error('Retrieval of max ID for export logs `%s` timed out with time of `%s`',
export_id, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return
min_id = min_id or 1
max_id = max_id or 1
logger.info('Found log range of %s to %s for export logs `%s`', min_id, max_id, export_id)
# Generate a file key so that if we return an API URL, it cannot simply be constructed from
# just the export ID.
file_key = str(uuid.uuid4())
@ -131,10 +104,13 @@ class ExportActionLogsWorker(QueueWorker):
BytesIO(str(prefix_data)), upload_metadata)
uploaded_byte_count = len(prefix_data)
logs_iterator = logs_model.yield_logs_for_export(start_time, end_time, repository_id,
namespace_id, max_query_time)
try:
# Stream the logs to storage as chunks.
updated_metadata, uploaded_byte_count = self._stream_logs(upload_id, upload_metadata,
uploaded_byte_count, min_id, max_id,
uploaded_byte_count, logs_iterator,
job_details)
if updated_metadata is None:
storage.cancel_chunked_upload(upload_id, upload_metadata)
@ -169,69 +145,35 @@ class ExportActionLogsWorker(QueueWorker):
self._report_results(job_details, ExportResult.SUCCESSFUL_EXPORT, export_url)
def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, min_id, max_id,
def _stream_logs(self, upload_id, upload_metadata, uploaded_byte_count, logs_iterator,
job_details):
export_id = job_details['export_id']
max_work_period = timedelta(seconds=MAXIMUM_WORK_PERIOD_SECONDS)
max_query_time = timedelta(seconds=MAXIMUM_QUERY_TIME_SECONDS)
kinds = model.log.get_log_entry_kinds()
# Using an adjusting scale, start downloading log rows in batches, starting at
# MINIMUM_RANGE_SIZE and doubling until we've reached EXPECTED_ITERATION_LOG_COUNT or
# the lookup range has reached MAXIMUM_RANGE_SIZE. If at any point this operation takes
# longer than the MAXIMUM_WORK_PERIOD_SECONDS, terminate the batch operation as timed out.
batch_start_time = datetime.utcnow()
current_start_id = min_id
current_batch_size = MINIMUM_RANGE_SIZE
try:
for logs in logs_iterator:
work_elapsed = datetime.utcnow() - batch_start_time
if work_elapsed > max_work_period:
logger.error('Retrieval of logs `%s` timed out with time of `%s`',
export_id, work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
while current_start_id <= max_id:
# Verify we haven't been working for too long.
work_elapsed = datetime.utcnow() - batch_start_time
if work_elapsed > max_work_period:
logger.error('Retrieval of logs `%s` timed out with time of `%s`',
export_id, work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
logs_data = ''
if logs:
logs_data = ','.join([json.dumps(log.to_dict(avatar, False)) for log in logs]) + ','
id_range = [current_start_id, min(max_id, current_start_id + current_batch_size)]
# Load the next set of logs.
def retrieve_and_write_logs():
namespace_id = job_details['namespace_id'] if not job_details.get('repository_id') else None
repository_id = job_details.get('repository_id')
logger.debug('Retrieving logs over range %s with namespace %s and repository %s',
id_range, namespace_id, repository_id)
logs_query = model.log.get_logs_query(namespace=namespace_id,
repository=repository_id,
id_range=id_range)
return [create_log(log) for log in logs_query]
logs, elapsed = _run_and_time(retrieve_and_write_logs)
if elapsed > max_query_time:
logger.error('Retrieval of logs for export logs `%s` with range `%s` timed out at `%s`',
export_id, id_range, elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
# Write the logs to storage.
logger.debug('Writing %s retrieved logs for range %s', len(logs), id_range)
if logs:
logs_data = ','.join([json.dumps(log.to_dict(kinds, False)) for log in logs]) + ','
logs_data = logs_data.encode('utf-8')
upload_metadata = storage.stream_upload_chunk(storage.preferred_locations, upload_id,
uploaded_byte_count, -1,
BytesIO(logs_data),
upload_metadata)
uploaded_byte_count += len(logs_data)
# Move forward.
current_start_id = id_range[1] + 1
# Increase the batch size if necessary.
if len(logs) < EXPECTED_ITERATION_LOG_COUNT:
current_batch_size = min(MAXIMUM_RANGE_SIZE, current_batch_size * 2)
except LogsIterationTimeout:
logger.error('Retrieval of logs for export logs timed out at `%s`', work_elapsed)
self._report_results(job_details, ExportResult.OPERATION_TIMEDOUT)
return None, None
return upload_metadata, uploaded_byte_count
@ -271,11 +213,6 @@ def _parse_time(specified_time):
except ValueError:
return None
def _run_and_time(fn):
start_time = datetime.utcnow()
result = fn()
return result, datetime.utcnow() - start_time
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)

View file

@ -1,7 +1,10 @@
import logging
from datetime import date, timedelta
from app import app # This is required to initialize the database.
from data import model
from data.logs_model import logs_model
from workers.worker import Worker
POLL_PERIOD_SECONDS = 10
@ -21,8 +24,16 @@ class RepositoryActionCountWorker(Worker):
logger.debug('No further repositories to count')
return False
yesterday = date.today() - timedelta(days=1)
logger.debug('Found repository #%s to count', to_count.id)
was_counted = model.repositoryactioncount.count_repository_actions(to_count)
daily_count = logs_model.count_repository_actions(to_count, yesterday)
if daily_count is None:
logger.debug('Could not load count for repository #%s', to_count.id)
return False
was_counted = model.repositoryactioncount.store_repository_action_count(to_count, yesterday,
daily_count)
if not was_counted:
logger.debug('Repository #%s was counted by another worker', to_count.id)
return False

View file

@ -5,7 +5,8 @@ from datetime import datetime, timedelta
from httmock import urlmatch, HTTMock
from data import model, database
from data import model
from data.logs_model import logs_model
from workers.exportactionlogsworker import ExportActionLogsWorker
from test.fixtures import *
@ -21,9 +22,6 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app):
repo = model.repository.get_repository(namespace, repo_name)
assert (model.log.get_maximum_id_for_logs(end_time, repository_id=repo.id) is not None) == expects_logs
assert (model.log.get_minimum_id_for_logs(start_time, repository_id=repo.id) is not None) == expects_logs
worker = ExportActionLogsWorker(None)
called = [{}]
@ -59,7 +57,9 @@ def test_process_queue_item(namespace, repo_name, expects_logs, app):
created = storage.get_content(storage.preferred_locations, 'exportedactionlogs/' + storage_id)
created_json = json.loads(created)
expected_count = database.LogEntry3.select().where(database.LogEntry3.repository == repo).count()
expected_count = len(logs_model.lookup_logs(start_time, end_time, namespace_name=namespace,
repository_name=repo_name).logs)
assert (expected_count > 1) == expects_logs
assert created_json['export_id'] == 'someid'