""" Manage repository permissions. """ import logging from flask import request from app import avatar from endpoints.api import (resource, nickname, require_repo_admin, RepositoryParamResource, log_action, request_error, validate_json_request, path_param) from endpoints.exception import NotFound from data import model logger = logging.getLogger(__name__) def role_view(repo_perm_obj): return { 'role': repo_perm_obj.role.name, } def wrap_role_view_user(role_json, user): role_json['name'] = user.username role_json['is_robot'] = user.robot if not user.robot: role_json['avatar'] = avatar.get_data_for_user(user) return role_json def wrap_role_view_org(role_json, user, org_members): role_json['is_org_member'] = user.robot or user.username in org_members return role_json def wrap_role_view_team(role_json, team): role_json['name'] = team.name role_json['avatar'] = avatar.get_data_for_team(team) return role_json @resource('/v1/repository//permissions/team/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryTeamPermissionList(RepositoryParamResource): """ Resource for repository team permissions. """ @require_repo_admin @nickname('listRepoTeamPermissions') def get(self, namespace, repository): """ List all team permission. """ repo_perms = model.permission.get_all_repo_teams(namespace, repository) def wrapped_role_view(repo_perm): return wrap_role_view_team(role_view(repo_perm), repo_perm.team) return { 'permissions': {repo_perm.team.name: wrapped_role_view(repo_perm) for repo_perm in repo_perms} } @resource('/v1/repository//permissions/user/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') class RepositoryUserPermissionList(RepositoryParamResource): """ Resource for repository user permissions. """ @require_repo_admin @nickname('listRepoUserPermissions') def get(self, namespace, repository): """ List all user permissions. """ # Lookup the organization (if any). org = None try: org = model.organization.get_organization(namespace) # Will raise an error if not org except model.InvalidOrganizationException: # This repository isn't under an org pass # Load the permissions. repo_perms = model.user.get_all_repo_users(namespace, repository) # Determine how to wrap the role(s). def wrapped_role_view(repo_perm): return wrap_role_view_user(role_view(repo_perm), repo_perm.user) role_view_func = wrapped_role_view if org: users_filter = {perm.user for perm in repo_perms} org_members = model.organization.get_organization_member_set(org, users_filter=users_filter) current_func = role_view_func def wrapped_role_org_view(repo_perm): return wrap_role_view_org(current_func(repo_perm), repo_perm.user, org_members) role_view_func = wrapped_role_org_view return { 'permissions': {perm.user.username: role_view_func(perm) for perm in repo_perms} } @resource('/v1/repository//permissions/user//transitive') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('username', 'The username of the user to which the permissions apply') class RepositoryUserTransitivePermission(RepositoryParamResource): """ Resource for retrieving whether a user has access to a repository, either directly or via a team. """ @require_repo_admin @nickname('getUserTransitivePermission') def get(self, namespace, repository, username): """ Get the fetch the permission for the specified user. """ user = model.user.get_user(username) if not user: raise NotFound repo = model.repository.get_repository(namespace, repository) if not repo: raise NotFound permissions = list(model.permission.get_user_repo_permissions(user, repo)) return { 'permissions': [role_view(permission) for permission in permissions] } @resource('/v1/repository//permissions/user/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('username', 'The username of the user to which the permission applies') class RepositoryUserPermission(RepositoryParamResource): """ Resource for managing individual user permissions. """ schemas = { 'UserPermission': { 'type': 'object', 'description': 'Description of a user permission.', 'required': [ 'role', ], 'properties': { 'role': { 'type': 'string', 'description': 'Role to use for the user', 'enum': [ 'read', 'write', 'admin', ], }, }, }, } @require_repo_admin @nickname('getUserPermissions') def get(self, namespace, repository, username): """ Get the Fetch the permission for the specified user. """ logger.debug('Get repo: %s/%s permissions for user %s', namespace, repository, username) perm = model.permission.get_user_reponame_permission(username, namespace, repository) perm_view = wrap_role_view_user(role_view(perm), perm.user) try: org = model.organization.get_organization(namespace) org_members = model.organization.get_organization_member_set(org, users_filter={perm.user}) perm_view = wrap_role_view_org(perm_view, perm.user, org_members) except model.InvalidOrganizationException: # This repository is not part of an organization pass return perm_view @require_repo_admin @nickname('changeUserPermissions') @validate_json_request('UserPermission') def put(self, namespace, repository, username): # Also needs to respond to post """ Update the perimssions for an existing repository. """ new_permission = request.get_json() logger.debug('Setting permission to: %s for user %s', new_permission['role'], username) try: perm = model.permission.set_user_repo_permission(username, namespace, repository, new_permission['role']) except model.DataModelException as ex: raise request_error(exception=ex) perm_view = wrap_role_view_user(role_view(perm), perm.user) try: org = model.organization.get_organization(namespace) org_members = model.organization.get_organization_member_set(org, users_filter={perm.user}) perm_view = wrap_role_view_org(perm_view, perm.user, org_members) except model.InvalidOrganizationException: # This repository is not part of an organization pass except model.DataModelException as ex: raise request_error(exception=ex) log_action('change_repo_permission', namespace, {'username': username, 'repo': repository, 'namespace': namespace, 'role': new_permission['role']}, repo=model.repository.get_repository(namespace, repository)) return perm_view, 200 @require_repo_admin @nickname('deleteUserPermissions') def delete(self, namespace, repository, username): """ Delete the permission for the user. """ try: model.permission.delete_user_permission(username, namespace, repository) except model.DataModelException as ex: raise request_error(exception=ex) log_action('delete_repo_permission', namespace, {'username': username, 'repo': repository, 'namespace': namespace}, repo=model.repository.get_repository(namespace, repository)) return '', 204 @resource('/v1/repository//permissions/team/') @path_param('repository', 'The full path of the repository. e.g. namespace/name') @path_param('teamname', 'The name of the team to which the permission applies') class RepositoryTeamPermission(RepositoryParamResource): """ Resource for managing individual team permissions. """ schemas = { 'TeamPermission': { 'type': 'object', 'description': 'Description of a team permission.', 'required': [ 'role', ], 'properties': { 'role': { 'type': 'string', 'description': 'Role to use for the team', 'enum': [ 'read', 'write', 'admin', ], }, }, }, } @require_repo_admin @nickname('getTeamPermissions') def get(self, namespace, repository, teamname): """ Fetch the permission for the specified team. """ logger.debug('Get repo: %s/%s permissions for team %s', namespace, repository, teamname) perm = model.permission.get_team_reponame_permission(teamname, namespace, repository) return role_view(perm) @require_repo_admin @nickname('changeTeamPermissions') @validate_json_request('TeamPermission') def put(self, namespace, repository, teamname): """ Update the existing team permission. """ new_permission = request.get_json() logger.debug('Setting permission to: %s for team %s', new_permission['role'], teamname) perm = model.permission.set_team_repo_permission(teamname, namespace, repository, new_permission['role']) log_action('change_repo_permission', namespace, {'team': teamname, 'repo': repository, 'role': new_permission['role']}, repo=model.repository.get_repository(namespace, repository)) return wrap_role_view_team(role_view(perm), perm.team), 200 @require_repo_admin @nickname('deleteTeamPermissions') def delete(self, namespace, repository, teamname): """ Delete the permission for the specified team. """ model.permission.delete_team_permission(teamname, namespace, repository) log_action('delete_repo_permission', namespace, {'team': teamname, 'repo': repository}, repo=model.repository.get_repository(namespace, repository)) return '', 204