import json from datetime import datetime, timedelta from endpoints.api import (resource, nickname, ApiResource, query_param, parse_args, RepositoryParamResource, require_repo_admin, related_user_resource, format_date, Unauthorized, NotFound, require_user_admin, internal_only, path_param, require_scope) from auth.permissions import AdministerOrganizationPermission, AdministerOrganizationPermission from auth.auth_context import get_authenticated_user from data import model from auth import scopes def log_view(log): view = { 'kind': log.kind.name, 'metadata': json.loads(log.metadata_json), 'ip': log.ip, 'datetime': format_date(log.datetime), } if log.performer and log.performer.username: view['performer'] = { 'kind': 'user', 'name': log.performer.username, 'is_robot': log.performer.robot, } return view def get_logs(start_time, end_time, performer_name=None, repository=None, namespace=None): performer = None if performer_name: performer = model.get_user(performer_name) if start_time: try: start_time = datetime.strptime(start_time + ' UTC', '%m/%d/%Y %Z') except ValueError: start_time = None if not start_time: start_time = datetime.today() - timedelta(7) # One week if end_time: try: end_time = datetime.strptime(end_time + ' UTC', '%m/%d/%Y %Z') end_time = end_time + timedelta(days=1) except ValueError: end_time = None if not end_time: end_time = datetime.today() logs = model.list_logs(start_time, end_time, performer=performer, repository=repository, namespace=namespace) return { 'start_time': format_date(start_time), 'end_time': format_date(end_time), 'logs': [log_view(log) for log in logs] } @resource('/v1/repository//logs') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryLogs(RepositoryParamResource): """ Resource for fetching logs for the specific repository. """ @require_repo_admin @nickname('listRepoLogs') @parse_args @query_param('starttime', 'Earliest time from which to get logs (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs (%m/%d/%Y %Z)', type=str) def get(self, args, namespace, repository): """ List the logs for the specified repository. """ repo = model.get_repository(namespace, repository) if not repo: raise NotFound() start_time = args['starttime'] end_time = args['endtime'] return get_logs(start_time, end_time, repository=repo, namespace=namespace) @resource('/v1/user/logs') @internal_only class UserLogs(ApiResource): """ Resource for fetching logs for the current user. """ @require_user_admin @nickname('listUserLogs') @parse_args @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) def get(self, args): """ List the logs for the current user. """ performer_name = args['performer'] start_time = args['starttime'] end_time = args['endtime'] user = get_authenticated_user() return get_logs(start_time, end_time, performer_name=performer_name, namespace=user.username) @resource('/v1/organization//logs') @path_param('orgname', 'The name of the organization') @related_user_resource(UserLogs) class OrgLogs(ApiResource): """ Resource for fetching logs for the entire organization. """ @nickname('listOrgLogs') @parse_args @query_param('starttime', 'Earliest time from which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('endtime', 'Latest time to which to get logs. (%m/%d/%Y %Z)', type=str) @query_param('performer', 'Username for which to filter logs.', type=str) @require_scope(scopes.ORG_ADMIN) def get(self, args, orgname): """ List the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if permission.can(): performer_name = args['performer'] start_time = args['starttime'] end_time = args['endtime'] return get_logs(start_time, end_time, namespace=orgname, performer_name=performer_name) raise Unauthorized()