Merge pull request #2806 from ecordell/QUAY-643/api-permission-v22
Add data interface for api-permissions for v2-2
This commit is contained in:
commit
e407bc1441
5 changed files with 400 additions and 186 deletions
|
@ -4,55 +4,27 @@ import logging
|
|||
|
||||
from flask import request
|
||||
|
||||
from app import avatar
|
||||
from endpoints.api import (resource, nickname, require_repo_admin, RepositoryParamResource,
|
||||
log_action, request_error, validate_json_request, path_param)
|
||||
from endpoints.exception import NotFound
|
||||
from data import model
|
||||
|
||||
from permission_models_pre_oci import pre_oci_model as model
|
||||
from permission_models_interface import DeleteException, SaveException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def role_view(repo_perm_obj):
|
||||
return {
|
||||
'role': repo_perm_obj.role.name,
|
||||
}
|
||||
|
||||
def wrap_role_view_user(role_json, user):
|
||||
role_json['name'] = user.username
|
||||
role_json['is_robot'] = user.robot
|
||||
if not user.robot:
|
||||
role_json['avatar'] = avatar.get_data_for_user(user)
|
||||
return role_json
|
||||
|
||||
|
||||
def wrap_role_view_org(role_json, user, org_members):
|
||||
role_json['is_org_member'] = user.robot or user.username in org_members
|
||||
return role_json
|
||||
|
||||
|
||||
def wrap_role_view_team(role_json, team):
|
||||
role_json['name'] = team.name
|
||||
role_json['avatar'] = avatar.get_data_for_team(team)
|
||||
return role_json
|
||||
|
||||
|
||||
@resource('/v1/repository/<apirepopath:repository>/permissions/team/')
|
||||
@path_param('repository', 'The full path of the repository. e.g. namespace/name')
|
||||
class RepositoryTeamPermissionList(RepositoryParamResource):
|
||||
""" Resource for repository team permissions. """
|
||||
@require_repo_admin
|
||||
@nickname('listRepoTeamPermissions')
|
||||
def get(self, namespace, repository):
|
||||
def get(self, namespace_name, repository_name):
|
||||
""" List all team permission. """
|
||||
repo_perms = model.permission.get_all_repo_teams(namespace, repository)
|
||||
|
||||
def wrapped_role_view(repo_perm):
|
||||
return wrap_role_view_team(role_view(repo_perm), repo_perm.team)
|
||||
repo_perms = model.get_repo_permissions_by_team(namespace_name, repository_name)
|
||||
|
||||
return {
|
||||
'permissions': {repo_perm.team.name: wrapped_role_view(repo_perm)
|
||||
'permissions': {repo_perm.team_name: repo_perm.to_dict()
|
||||
for repo_perm in repo_perms}
|
||||
}
|
||||
|
||||
|
@ -63,38 +35,10 @@ class RepositoryUserPermissionList(RepositoryParamResource):
|
|||
""" Resource for repository user permissions. """
|
||||
@require_repo_admin
|
||||
@nickname('listRepoUserPermissions')
|
||||
def get(self, namespace, repository):
|
||||
def get(self, namespace_name, repository_name):
|
||||
""" List all user permissions. """
|
||||
# Lookup the organization (if any).
|
||||
org = None
|
||||
try:
|
||||
org = model.organization.get_organization(namespace) # Will raise an error if not org
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository isn't under an org
|
||||
pass
|
||||
|
||||
# Load the permissions.
|
||||
repo_perms = model.user.get_all_repo_users(namespace, repository)
|
||||
|
||||
# Determine how to wrap the role(s).
|
||||
def wrapped_role_view(repo_perm):
|
||||
return wrap_role_view_user(role_view(repo_perm), repo_perm.user)
|
||||
|
||||
role_view_func = wrapped_role_view
|
||||
|
||||
if org:
|
||||
users_filter = {perm.user for perm in repo_perms}
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter=users_filter)
|
||||
current_func = role_view_func
|
||||
|
||||
def wrapped_role_org_view(repo_perm):
|
||||
return wrap_role_view_org(current_func(repo_perm), repo_perm.user, org_members)
|
||||
|
||||
role_view_func = wrapped_role_org_view
|
||||
|
||||
return {
|
||||
'permissions': {perm.user.username: role_view_func(perm) for perm in repo_perms}
|
||||
}
|
||||
perms = model.get_repo_permissions_by_user(namespace_name, repository_name)
|
||||
return {'permissions': {p.username: p.to_dict() for p in perms}}
|
||||
|
||||
|
||||
@resource('/v1/repository/<apirepopath:repository>/permissions/user/<username>/transitive')
|
||||
|
@ -105,19 +49,16 @@ class RepositoryUserTransitivePermission(RepositoryParamResource):
|
|||
or via a team. """
|
||||
@require_repo_admin
|
||||
@nickname('getUserTransitivePermission')
|
||||
def get(self, namespace, repository, username):
|
||||
def get(self, namespace_name, repository_name, username):
|
||||
""" Get the fetch the permission for the specified user. """
|
||||
user = model.user.get_user(username)
|
||||
if not user:
|
||||
|
||||
roles = model.get_repo_roles(username, namespace_name, repository_name)
|
||||
|
||||
if not roles:
|
||||
raise NotFound
|
||||
|
||||
repo = model.repository.get_repository(namespace, repository)
|
||||
if not repo:
|
||||
raise NotFound
|
||||
|
||||
permissions = list(model.permission.get_user_repo_permissions(user, repo))
|
||||
|
||||
return {
|
||||
'permissions': [role_view(permission) for permission in permissions]
|
||||
'permissions': [r.to_dict() for r in roles]
|
||||
}
|
||||
|
||||
|
||||
|
@ -149,69 +90,48 @@ class RepositoryUserPermission(RepositoryParamResource):
|
|||
|
||||
@require_repo_admin
|
||||
@nickname('getUserPermissions')
|
||||
def get(self, namespace, repository, username):
|
||||
""" Get the Fetch the permission for the specified user. """
|
||||
logger.debug('Get repo: %s/%s permissions for user %s', namespace, repository, username)
|
||||
perm = model.permission.get_user_reponame_permission(username, namespace, repository)
|
||||
perm_view = wrap_role_view_user(role_view(perm), perm.user)
|
||||
|
||||
try:
|
||||
org = model.organization.get_organization(namespace)
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter={perm.user})
|
||||
perm_view = wrap_role_view_org(perm_view, perm.user, org_members)
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository is not part of an organization
|
||||
pass
|
||||
|
||||
return perm_view
|
||||
def get(self, namespace_name, repository_name, username):
|
||||
""" Get the permission for the specified user. """
|
||||
logger.debug('Get repo: %s/%s permissions for user %s', namespace_name, repository_name, username)
|
||||
perm = model.get_repo_permission_for_user(username, namespace_name, repository_name)
|
||||
return perm.to_dict()
|
||||
|
||||
@require_repo_admin
|
||||
@nickname('changeUserPermissions')
|
||||
@validate_json_request('UserPermission')
|
||||
def put(self, namespace, repository, username): # Also needs to respond to post
|
||||
def put(self, namespace_name, repository_name, username): # Also needs to respond to post
|
||||
""" Update the perimssions for an existing repository. """
|
||||
new_permission = request.get_json()
|
||||
|
||||
logger.debug('Setting permission to: %s for user %s', new_permission['role'], username)
|
||||
|
||||
try:
|
||||
perm = model.permission.set_user_repo_permission(username, namespace, repository,
|
||||
new_permission['role'])
|
||||
except model.DataModelException as ex:
|
||||
perm = model.set_repo_permission_for_user(username, namespace_name, repository_name,
|
||||
new_permission['role'])
|
||||
resp = perm.to_dict()
|
||||
except SaveException as ex:
|
||||
raise request_error(exception=ex)
|
||||
|
||||
perm_view = wrap_role_view_user(role_view(perm), perm.user)
|
||||
|
||||
try:
|
||||
org = model.organization.get_organization(namespace)
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter={perm.user})
|
||||
perm_view = wrap_role_view_org(perm_view, perm.user, org_members)
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository is not part of an organization
|
||||
pass
|
||||
except model.DataModelException as ex:
|
||||
raise request_error(exception=ex)
|
||||
|
||||
log_action('change_repo_permission', namespace,
|
||||
{'username': username, 'repo': repository,
|
||||
'namespace': namespace,
|
||||
log_action('change_repo_permission', namespace_name,
|
||||
{'username': username, 'repo': repository_name,
|
||||
'namespace': namespace_name,
|
||||
'role': new_permission['role']},
|
||||
repo=model.repository.get_repository(namespace, repository))
|
||||
repo_name=repository_name)
|
||||
|
||||
return perm_view, 200
|
||||
return resp, 200
|
||||
|
||||
@require_repo_admin
|
||||
@nickname('deleteUserPermissions')
|
||||
def delete(self, namespace, repository, username):
|
||||
def delete(self, namespace_name, repository_name, username):
|
||||
""" Delete the permission for the user. """
|
||||
try:
|
||||
model.permission.delete_user_permission(username, namespace, repository)
|
||||
except model.DataModelException as ex:
|
||||
model.delete_repo_permission_for_user(username, namespace_name, repository_name)
|
||||
except DeleteException as ex:
|
||||
raise request_error(exception=ex)
|
||||
|
||||
log_action('delete_repo_permission', namespace,
|
||||
{'username': username, 'repo': repository, 'namespace': namespace},
|
||||
repo=model.repository.get_repository(namespace, repository))
|
||||
log_action('delete_repo_permission', namespace_name,
|
||||
{'username': username, 'repo': repository_name, 'namespace': namespace_name},
|
||||
repo_name=repository_name)
|
||||
|
||||
return '', 204
|
||||
|
||||
|
@ -244,39 +164,46 @@ class RepositoryTeamPermission(RepositoryParamResource):
|
|||
|
||||
@require_repo_admin
|
||||
@nickname('getTeamPermissions')
|
||||
def get(self, namespace, repository, teamname):
|
||||
def get(self, namespace_name, repository_name, teamname):
|
||||
""" Fetch the permission for the specified team. """
|
||||
logger.debug('Get repo: %s/%s permissions for team %s', namespace, repository, teamname)
|
||||
perm = model.permission.get_team_reponame_permission(teamname, namespace, repository)
|
||||
return role_view(perm)
|
||||
logger.debug('Get repo: %s/%s permissions for team %s', namespace_name, repository_name, teamname)
|
||||
role = model.get_repo_role_for_team(teamname, namespace_name, repository_name)
|
||||
return role.to_dict()
|
||||
|
||||
@require_repo_admin
|
||||
@nickname('changeTeamPermissions')
|
||||
@validate_json_request('TeamPermission')
|
||||
def put(self, namespace, repository, teamname):
|
||||
def put(self, namespace_name, repository_name, teamname):
|
||||
""" Update the existing team permission. """
|
||||
new_permission = request.get_json()
|
||||
|
||||
logger.debug('Setting permission to: %s for team %s', new_permission['role'], teamname)
|
||||
|
||||
perm = model.permission.set_team_repo_permission(teamname, namespace, repository,
|
||||
new_permission['role'])
|
||||
try:
|
||||
perm = model.set_repo_permission_for_team(teamname, namespace_name, repository_name,
|
||||
new_permission['role'])
|
||||
resp = perm.to_dict()
|
||||
except SaveException as ex:
|
||||
raise request_error(exception=ex)
|
||||
|
||||
|
||||
log_action('change_repo_permission', namespace,
|
||||
{'team': teamname, 'repo': repository,
|
||||
log_action('change_repo_permission', namespace_name,
|
||||
{'team': teamname, 'repo': repository_name,
|
||||
'role': new_permission['role']},
|
||||
repo=model.repository.get_repository(namespace, repository))
|
||||
|
||||
return wrap_role_view_team(role_view(perm), perm.team), 200
|
||||
repo_name=repository_name)
|
||||
return resp, 200
|
||||
|
||||
@require_repo_admin
|
||||
@nickname('deleteTeamPermissions')
|
||||
def delete(self, namespace, repository, teamname):
|
||||
def delete(self, namespace_name, repository_name, teamname):
|
||||
""" Delete the permission for the specified team. """
|
||||
model.permission.delete_team_permission(teamname, namespace, repository)
|
||||
|
||||
log_action('delete_repo_permission', namespace,
|
||||
{'team': teamname, 'repo': repository},
|
||||
repo=model.repository.get_repository(namespace, repository))
|
||||
try:
|
||||
model.delete_repo_permission_for_team(teamname, namespace_name, repository_name)
|
||||
except DeleteException as ex:
|
||||
raise request_error(exception=ex)
|
||||
|
||||
log_action('delete_repo_permission', namespace_name,
|
||||
{'team': teamname, 'repo': repository_name},
|
||||
repo_name=repository_name)
|
||||
|
||||
return '', 204
|
||||
|
|
208
endpoints/api/permission_models_interface.py
Normal file
208
endpoints/api/permission_models_interface.py
Normal file
|
@ -0,0 +1,208 @@
|
|||
import sys
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from collections import namedtuple
|
||||
|
||||
from six import add_metaclass
|
||||
|
||||
|
||||
class SaveException(Exception):
|
||||
def __init__(self, other):
|
||||
self.traceback = sys.exc_info()
|
||||
super(SaveException, self).__init__(other.message)
|
||||
|
||||
class DeleteException(Exception):
|
||||
def __init__(self, other):
|
||||
self.traceback = sys.exc_info()
|
||||
super(DeleteException, self).__init__(other.message)
|
||||
|
||||
|
||||
class Role(namedtuple('Role', ['role_name'])):
|
||||
def to_dict(self):
|
||||
return {
|
||||
'role': self.role_name,
|
||||
}
|
||||
|
||||
class UserPermission(namedtuple('UserPermission', [
|
||||
'role_name',
|
||||
'username',
|
||||
'is_robot',
|
||||
'avatar',
|
||||
'is_org_member',
|
||||
'has_org',
|
||||
])):
|
||||
|
||||
def to_dict(self):
|
||||
perm_dict = {
|
||||
'role': self.role_name,
|
||||
'username': self.username,
|
||||
'is_robot': False,
|
||||
'avatar': self.avatar,
|
||||
}
|
||||
if self.has_org:
|
||||
perm_dict['is_org_member'] = self.is_org_member
|
||||
return perm_dict
|
||||
|
||||
|
||||
class RobotPermission(namedtuple('RobotPermission', [
|
||||
'role_name',
|
||||
'username',
|
||||
'is_robot',
|
||||
'is_org_member',
|
||||
])):
|
||||
|
||||
def to_dict(self, user=None, team=None, org_members=None):
|
||||
return {
|
||||
'role': self.role_name,
|
||||
'username': self.username,
|
||||
'is_robot': True,
|
||||
'is_org_member': self.is_org_member,
|
||||
}
|
||||
|
||||
|
||||
class TeamPermission(namedtuple('TeamPermission', [
|
||||
'role_name',
|
||||
'team_name',
|
||||
'avatar',
|
||||
])):
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'role': self.role_name,
|
||||
'name': self.team_name,
|
||||
'avatar': self.avatar,
|
||||
}
|
||||
|
||||
@add_metaclass(ABCMeta)
|
||||
class PermissionDataInterface(object):
|
||||
"""
|
||||
Data interface used by permissions API
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_permissions_by_user(self, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
list(UserPermission)
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_roles(self, username, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
username: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
list(Role) or None
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_permission_for_user(self, username, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
username: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
UserPermission
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_repo_permission_for_user(self, username, namespace_name, repository_name, role_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
username: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
role_name: string
|
||||
|
||||
Returns:
|
||||
UserPermission
|
||||
|
||||
Raises:
|
||||
SaveException
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_repo_permission_for_user(self, username, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
username: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
void
|
||||
|
||||
Raises:
|
||||
DeleteException
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_permissions_by_team(self, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
list(TeamPermission)
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_role_for_team(self, team_name, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
team_name: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
Role
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_repo_permission_for_team(self, team_name, namespace_name, repository_name, permission):
|
||||
"""
|
||||
|
||||
Args:
|
||||
team_name: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
permission: string
|
||||
|
||||
Returns:
|
||||
TeamPermission
|
||||
|
||||
Raises:
|
||||
SaveException
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_repo_permission_for_team(self, team_name, namespace_name, repository_name):
|
||||
"""
|
||||
|
||||
Args:
|
||||
team_name: string
|
||||
namespace_name: string
|
||||
repository_name: string
|
||||
|
||||
Returns:
|
||||
TeamPermission
|
||||
|
||||
Raises:
|
||||
DeleteException
|
||||
"""
|
115
endpoints/api/permission_models_pre_oci.py
Normal file
115
endpoints/api/permission_models_pre_oci.py
Normal file
|
@ -0,0 +1,115 @@
|
|||
from app import avatar
|
||||
from data import model
|
||||
from permission_models_interface import PermissionDataInterface, UserPermission, TeamPermission, Role, SaveException, DeleteException
|
||||
|
||||
|
||||
class PreOCIModel(PermissionDataInterface):
|
||||
"""
|
||||
PreOCIModel implements the data model for Permission using a database schema
|
||||
before it was changed to support the OCI specification.
|
||||
"""
|
||||
|
||||
def get_repo_permissions_by_user(self, namespace_name, repository_name):
|
||||
org = None
|
||||
try:
|
||||
org = model.organization.get_organization(namespace_name) # Will raise an error if not org
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository isn't under an org
|
||||
pass
|
||||
|
||||
# Load the permissions.
|
||||
repo_perms = model.user.get_all_repo_users(namespace_name, repository_name)
|
||||
|
||||
if org:
|
||||
users_filter = {perm.user for perm in repo_perms}
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter=users_filter)
|
||||
|
||||
def is_org_member(user):
|
||||
if not org:
|
||||
return False
|
||||
|
||||
return user.robot or user.username in org_members
|
||||
|
||||
return [self._user_permission(perm, org is not None, is_org_member(perm.user)) for perm in repo_perms]
|
||||
|
||||
def get_repo_roles(self, username, namespace_name, repository_name):
|
||||
user = model.user.get_user(username)
|
||||
if not user:
|
||||
return None
|
||||
|
||||
repo = model.repository.get_repository(namespace_name, repository_name)
|
||||
if not repo:
|
||||
return None
|
||||
|
||||
return [self._role(r) for r in model.permission.get_user_repo_permissions(user, repo)]
|
||||
|
||||
def get_repo_permission_for_user(self, username, namespace_name, repository_name):
|
||||
perm = model.permission.get_user_reponame_permission(username, namespace_name, repository_name)
|
||||
org = None
|
||||
try:
|
||||
org = model.organization.get_organization(namespace_name)
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter={perm.user})
|
||||
is_org_member = perm.user.robot or perm.user.username in org_members
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository is not part of an organization
|
||||
is_org_member = False
|
||||
|
||||
return self._user_permission(perm, org is not None, is_org_member)
|
||||
|
||||
def set_repo_permission_for_user(self, username, namespace_name, repository_name, role_name):
|
||||
try:
|
||||
perm = model.permission.set_user_repo_permission(username, namespace_name, repository_name, role_name)
|
||||
org = None
|
||||
try:
|
||||
org = model.organization.get_organization(namespace_name)
|
||||
org_members = model.organization.get_organization_member_set(org, users_filter={perm.user})
|
||||
is_org_member = perm.user.robot or perm.user.username in org_members
|
||||
except model.InvalidOrganizationException:
|
||||
# This repository is not part of an organization
|
||||
is_org_member = False
|
||||
return self._user_permission(perm, org is not None, is_org_member)
|
||||
except model.DataModelException as ex:
|
||||
raise SaveException(ex)
|
||||
|
||||
def delete_repo_permission_for_user(self, username, namespace_name, repository_name):
|
||||
try:
|
||||
model.permission.delete_user_permission(username, namespace_name, repository_name)
|
||||
except model.DataModelException as ex:
|
||||
raise DeleteException(ex)
|
||||
|
||||
def get_repo_permissions_by_team(self, namespace_name, repository_name):
|
||||
repo_perms = model.permission.get_all_repo_teams(namespace_name, repository_name)
|
||||
return [self._team_permission(perm, perm.team.name) for perm in repo_perms]
|
||||
|
||||
def get_repo_role_for_team(self, team_name, namespace_name, repository_name):
|
||||
return self._role(model.permission.get_team_reponame_permission(team_name, namespace_name, repository_name))
|
||||
|
||||
def set_repo_permission_for_team(self, team_name, namespace_name, repository_name, role_name):
|
||||
try:
|
||||
return self._team_permission(model.permission.set_team_repo_permission(team_name, namespace_name, repository_name, role_name), team_name)
|
||||
except model.DataModelException as ex:
|
||||
raise SaveException(ex)
|
||||
|
||||
def delete_repo_permission_for_team(self, team_name, namespace_name, repository_name):
|
||||
try:
|
||||
model.permission.delete_team_permission(team_name, namespace_name, repository_name)
|
||||
except model.DataModelException as ex:
|
||||
raise DeleteException(ex)
|
||||
|
||||
def _role(self, permission_obj):
|
||||
return Role(role_name=permission_obj.role.name)
|
||||
|
||||
def _user_permission(self, permission_obj, has_org, is_org_member):
|
||||
return UserPermission(role_name=permission_obj.role.name,
|
||||
username=permission_obj.user.username,
|
||||
is_robot=permission_obj.user.robot,
|
||||
avatar=avatar.get_data_for_user(permission_obj.user),
|
||||
is_org_member=is_org_member,
|
||||
has_org=has_org)
|
||||
|
||||
def _team_permission(self, permission_obj, team_name):
|
||||
return TeamPermission(role_name=permission_obj.role.name,
|
||||
team_name=permission_obj.team.name,
|
||||
avatar=avatar.get_data_for_team(permission_obj.team))
|
||||
|
||||
pre_oci_model = PreOCIModel()
|
|
@ -5,6 +5,7 @@ from flask_principal import AnonymousIdentity
|
|||
|
||||
from endpoints.api import api
|
||||
from endpoints.api.repositorynotification import RepositoryNotification
|
||||
from endpoints.api.permission import RepositoryUserTransitivePermission
|
||||
from endpoints.api.team import OrganizationTeamSyncing
|
||||
from endpoints.api.test.shared import conduct_api_call
|
||||
from endpoints.api.repository import RepositoryTrust
|
||||
|
@ -67,6 +68,21 @@ NOTIFICATION_PARAMS = {'namespace': 'devtable', 'repository': 'devtable/simple',
|
|||
(RepositoryTrust, 'POST', REPO_PARAMS, {'trust_enabled': True}, 'freshuser', 403),
|
||||
(RepositoryTrust, 'POST', REPO_PARAMS, {'trust_enabled': True}, 'reader', 403),
|
||||
(RepositoryTrust, 'POST', REPO_PARAMS, {'trust_enabled': True}, 'devtable', 404),
|
||||
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'public/publicrepo'}, None, None, 401),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'public/publicrepo'}, None, 'freshuser', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'public/publicrepo'}, None, 'reader', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'public/publicrepo'}, None, 'devtable', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'devtable/shared'}, None, None, 401),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'devtable/shared'}, None, 'freshuser', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'devtable/shared'}, None, 'reader', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'devtable/shared'}, None, 'devtable', 404),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'buynlarge/orgrepo'}, None, None, 401),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'buynlarge/orgrepo'}, None, 'freshuser', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'buynlarge/orgrepo'}, None, 'reader', 403),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'A2O9','repository': 'buynlarge/orgrepo'}, None, 'devtable', 404),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'devtable','repository': 'devtable/shared'}, None, 'devtable', 200),
|
||||
(RepositoryUserTransitivePermission, 'GET', {'username': 'devtable','repository': 'devtable/nope'}, None, 'devtable', 404),
|
||||
])
|
||||
def test_api_security(resource, method, params, body, identity, expected, client):
|
||||
with client_with_identity(identity, client) as cl:
|
||||
|
|
|
@ -756,58 +756,6 @@ class TestTeamMemberListBuynlargeOwners(ApiTestCase):
|
|||
self._run_test('GET', 200, 'devtable', None)
|
||||
|
||||
|
||||
class TestRepositoryUserTransitivePermissionA2o9PublicPublicrepo(ApiTestCase):
|
||||
def setUp(self):
|
||||
ApiTestCase.setUp(self)
|
||||
self._set_url(RepositoryUserTransitivePermission, username="A2O9", repository="public/publicrepo")
|
||||
|
||||
def test_get_anonymous(self):
|
||||
self._run_test('GET', 401, None, None)
|
||||
|
||||
def test_get_freshuser(self):
|
||||
self._run_test('GET', 403, 'freshuser', None)
|
||||
|
||||
def test_get_reader(self):
|
||||
self._run_test('GET', 403, 'reader', None)
|
||||
|
||||
def test_get_devtable(self):
|
||||
self._run_test('GET', 403, 'devtable', None)
|
||||
|
||||
class TestRepositoryUserTransitivePermissionA2o9DevtableShared(ApiTestCase):
|
||||
def setUp(self):
|
||||
ApiTestCase.setUp(self)
|
||||
self._set_url(RepositoryUserTransitivePermission, username="A2O9", repository="devtable/shared")
|
||||
|
||||
def test_get_anonymous(self):
|
||||
self._run_test('GET', 401, None, None)
|
||||
|
||||
def test_get_freshuser(self):
|
||||
self._run_test('GET', 403, 'freshuser', None)
|
||||
|
||||
def test_get_reader(self):
|
||||
self._run_test('GET', 403, 'reader', None)
|
||||
|
||||
def test_get_devtable(self):
|
||||
self._run_test('GET', 404, 'devtable', None)
|
||||
|
||||
class TestRepositoryUserTransitivePermissionA2o9BuynlargeOrgrepo(ApiTestCase):
|
||||
def setUp(self):
|
||||
ApiTestCase.setUp(self)
|
||||
self._set_url(RepositoryUserTransitivePermission, username="A2O9", repository="buynlarge/orgrepo")
|
||||
|
||||
def test_get_anonymous(self):
|
||||
self._run_test('GET', 401, None, None)
|
||||
|
||||
def test_get_freshuser(self):
|
||||
self._run_test('GET', 403, 'freshuser', None)
|
||||
|
||||
def test_get_reader(self):
|
||||
self._run_test('GET', 403, 'reader', None)
|
||||
|
||||
def test_get_devtable(self):
|
||||
self._run_test('GET', 404, 'devtable', None)
|
||||
|
||||
|
||||
class TestRepositoryUserPermissionA2o9PublicPublicrepo(ApiTestCase):
|
||||
def setUp(self):
|
||||
ApiTestCase.setUp(self)
|
||||
|
|
Reference in a new issue