""" Access usage logs for organizations or repositories. """ from datetime import datetime, timedelta from endpoints.api import (resource, nickname, ApiResource, query_param, parse_args, RepositoryParamResource, require_repo_admin, related_user_resource, format_date, require_user_admin, path_param, require_scope, page_support) from endpoints.api.logs_models_pre_oci import pre_oci_model as model from endpoints.exception import Unauthorized, NotFound from auth.permissions import AdministerOrganizationPermission from auth.auth_context import get_authenticated_user from auth import scopes LOGS_PER_PAGE = 20 SERVICE_LEVEL_LOG_KINDS = set(['service_key_create', 'service_key_approve', 'service_key_delete', 'service_key_modify', 'service_key_extend', 'service_key_rotate']) def _validate_logs_arguments(start_time, end_time): if start_time: try: start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z') except ValueError: start_time = None if not start_time: start_time = datetime.today() - timedelta(7) # One week if end_time: try: end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z') end_time = end_time + timedelta(days=1) except ValueError: end_time = None if not end_time: end_time = datetime.today() return start_time, end_time def get_logs(start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, page_token=None, ignore=None): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) kinds = model.get_log_entry_kinds() log_entry_page = model.get_logs_query(start_time, end_time, performer_name, repository_name, namespace_name, ignore, page_token) include_namespace = namespace_name is None and repository_name is None return { 'start_time': format_date(start_time), 'end_time': format_date(end_time), 'logs': [log.to_dict(kinds, include_namespace) for log in log_entry_page.logs], }, log_entry_page.next_page_token def get_aggregate_logs(start_time, end_time, performer_name=None, repository=None, namespace=None, ignore=None): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) kinds = model.get_log_entry_kinds() aggregated_logs = model.get_aggregated_logs(start_time, end_time, performer_name=performer_name, repository_name=repository, namespace_name=namespace, ignore=ignore) return { 'aggregated': [log.to_dict(kinds, start_time) for log in aggregated_logs] } @resource('/v1/repository//logs') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryLogs(RepositoryParamResource): """ Resource for fetching logs for the specific repository. """ @require_repo_admin @nickname('listRepoLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) @page_support() def get(self, namespace, repository, page_token, parsed_args): """ List the logs for the specified repository. """ if model.repo_exists(namespace, repository) is False: raise NotFound() start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_logs(start_time, end_time, repository_name=repository, page_token=page_token, namespace_name=namespace) @resource('/v1/user/logs') class UserLogs(ApiResource): """ Resource for fetching logs for the current user. """ @require_user_admin @nickname('listUserLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @page_support() def get(self, parsed_args, page_token): """ List the logs for the current user. """ performer_name = parsed_args['performer'] start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] user = get_authenticated_user() return get_logs(start_time, end_time, performer_name=performer_name, namespace_name=user.username, page_token=page_token, ignore=SERVICE_LEVEL_LOG_KINDS) @resource('/v1/organization//logs') @path_param('orgname', 'The name of the organization') @related_user_resource(UserLogs) class OrgLogs(ApiResource): """ Resource for fetching logs for the entire organization. """ @nickname('listOrgLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @page_support() @require_scope(scopes.ORG_ADMIN) def get(self, orgname, page_token, parsed_args): """ List the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if permission.can(): performer_name = parsed_args['performer'] start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_logs(start_time, end_time, namespace_name=orgname, performer_name=performer_name, page_token=page_token, ignore=SERVICE_LEVEL_LOG_KINDS) raise Unauthorized() @resource('/v1/repository//aggregatelogs') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryAggregateLogs(RepositoryParamResource): """ Resource for fetching aggregated logs for the specific repository. """ @require_repo_admin @nickname('getAggregateRepoLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) def get(self, namespace, repository, parsed_args): """ Returns the aggregated logs for the specified repository. """ if model.repo_exists(namespace, repository) is False: raise NotFound() start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_aggregate_logs(start_time, end_time, repository=repository, namespace=namespace) @resource('/v1/user/aggregatelogs') class UserAggregateLogs(ApiResource): """ Resource for fetching aggregated logs for the current user. """ @require_user_admin @nickname('getAggregateUserLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) def get(self, parsed_args): """ Returns the aggregated logs for the current user. """ performer_name = parsed_args['performer'] start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] user = get_authenticated_user() return get_aggregate_logs(start_time, end_time, performer_name=performer_name, namespace=user.username, ignore=SERVICE_LEVEL_LOG_KINDS) @resource('/v1/organization//aggregatelogs') @path_param('orgname', 'The name of the organization') @related_user_resource(UserLogs) class OrgAggregateLogs(ApiResource): """ Resource for fetching aggregate logs for the entire organization. """ @nickname('getAggregateOrgLogs') @parse_args() @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @require_scope(scopes.ORG_ADMIN) def get(self, orgname, parsed_args): """ Gets the aggregated logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if permission.can(): performer_name = parsed_args['performer'] start_time = parsed_args['starttime'] end_time = parsed_args['endtime'] return get_aggregate_logs(start_time, end_time, namespace=orgname, performer_name=performer_name, ignore=SERVICE_LEVEL_LOG_KINDS) raise Unauthorized()