registry auth tests: test more access types
This commit is contained in:
parent
9e96e6870f
commit
b4ace1dd29
3 changed files with 58 additions and 26 deletions
|
@ -66,6 +66,10 @@ def repository_write_grant(namespace, repository):
|
||||||
return _RepositoryNeed(namespace, repository, 'write')
|
return _RepositoryNeed(namespace, repository, 'write')
|
||||||
|
|
||||||
|
|
||||||
|
def repository_admin_grant(namespace, repository):
|
||||||
|
return _RepositoryNeed(namespace, repository, 'admin')
|
||||||
|
|
||||||
|
|
||||||
class QuayDeferredPermissionUser(Identity):
|
class QuayDeferredPermissionUser(Identity):
|
||||||
def __init__(self, uuid, auth_type, auth_scopes, user=None):
|
def __init__(self, uuid, auth_type, auth_scopes, user=None):
|
||||||
super(QuayDeferredPermissionUser, self).__init__(uuid, auth_type)
|
super(QuayDeferredPermissionUser, self).__init__(uuid, auth_type)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from flask_principal import identity_changed, Identity
|
||||||
|
|
||||||
from app import app, get_app_url, instance_keys
|
from app import app, get_app_url, instance_keys
|
||||||
from .auth_context import set_grant_context, get_grant_context
|
from .auth_context import set_grant_context, get_grant_context
|
||||||
from .permissions import repository_read_grant, repository_write_grant
|
from .permissions import repository_read_grant, repository_write_grant, repository_admin_grant
|
||||||
from util.names import parse_namespace_repository
|
from util.names import parse_namespace_repository
|
||||||
from util.http import abort
|
from util.http import abort
|
||||||
from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_header,
|
from util.security.registry_jwt import (ANONYMOUS_SUB, decode_bearer_header,
|
||||||
|
@ -50,6 +50,7 @@ ACCESS_SCHEMA = {
|
||||||
'enum': [
|
'enum': [
|
||||||
'push',
|
'push',
|
||||||
'pull',
|
'pull',
|
||||||
|
'*',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -165,7 +166,9 @@ def identity_from_bearer_token(bearer_header):
|
||||||
for grant in payload['access']:
|
for grant in payload['access']:
|
||||||
namespace, repo_name = parse_namespace_repository(grant['name'], lib_namespace)
|
namespace, repo_name = parse_namespace_repository(grant['name'], lib_namespace)
|
||||||
|
|
||||||
if 'push' in grant['actions']:
|
if '*' in grant['actions']:
|
||||||
|
loaded_identity.provides.add(repository_admin_grant(namespace, repo_name))
|
||||||
|
elif 'push' in grant['actions']:
|
||||||
loaded_identity.provides.add(repository_write_grant(namespace, repo_name))
|
loaded_identity.provides.add(repository_write_grant(namespace, repo_name))
|
||||||
elif 'pull' in grant['actions']:
|
elif 'pull' in grant['actions']:
|
||||||
loaded_identity.provides.add(repository_read_grant(namespace, repo_name))
|
loaded_identity.provides.add(repository_read_grant(namespace, repo_name))
|
||||||
|
|
|
@ -71,37 +71,62 @@ class TestRegistryV2Auth(unittest.TestCase):
|
||||||
self.assertEqual(0, len(identity.provides))
|
self.assertEqual(0, len(identity.provides))
|
||||||
|
|
||||||
def test_token_with_access(self):
|
def test_token_with_access(self):
|
||||||
access = [
|
valid_access = [
|
||||||
{
|
[
|
||||||
'type': 'repository',
|
{
|
||||||
'name': 'somens/somerepo',
|
'type': 'repository',
|
||||||
'actions': ['pull', 'push'],
|
'name': 'somens/somerepo',
|
||||||
}
|
'actions': ['pull', 'push'],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'somens/somerepo',
|
||||||
|
'actions': ['pull', '*'],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'somens/somerepo',
|
||||||
|
'actions': ['*', 'push'],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'somens/somerepo',
|
||||||
|
'actions': ['*'],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'type': 'repository',
|
||||||
|
'name': 'somens/somerepo',
|
||||||
|
'actions': ['pull', '*', 'push'],
|
||||||
|
}
|
||||||
|
],
|
||||||
]
|
]
|
||||||
token = self._generate_token(self._generate_token_data(access=access))
|
for access in valid_access:
|
||||||
identity = self._parse_token(token)
|
token = self._generate_token(self._generate_token_data(access=access))
|
||||||
self.assertEqual(identity.id, TEST_USER.username)
|
identity = self._parse_token(token)
|
||||||
self.assertEqual(1, len(identity.provides))
|
self.assertEqual(identity.id, TEST_USER.username)
|
||||||
|
self.assertEqual(1, len(identity.provides))
|
||||||
def test_token_with_admin_access(self):
|
role = list(identity.provides)[0][3]
|
||||||
access = [
|
if "*" in access[0]['actions']:
|
||||||
{
|
self.assertEqual(role, 'admin')
|
||||||
'type': 'repository',
|
elif "push" in access[0]['actions']:
|
||||||
'name': 'somens/somerepo',
|
self.assertEqual(role, 'write')
|
||||||
'actions': ['*'],
|
elif "pull" in access[0]['actions']:
|
||||||
}
|
self.assertEqual(role, 'read')
|
||||||
]
|
|
||||||
token = self._generate_token(self._generate_token_data(access=access))
|
|
||||||
identity = self._parse_token(token)
|
|
||||||
self.assertEqual(identity.id, TEST_USER.username)
|
|
||||||
self.assertEqual(1, len(identity.provides))
|
|
||||||
|
|
||||||
def test_malformed_access(self):
|
def test_malformed_access(self):
|
||||||
access = [
|
access = [
|
||||||
{
|
{
|
||||||
'toipe': 'repository',
|
'toipe': 'repository',
|
||||||
'namesies': 'somens/somerepo',
|
'namesies': 'somens/somerepo',
|
||||||
'akshuns': ['pull', 'push'],
|
'akshuns': ['pull', 'push', '*'],
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
token = self._generate_token(self._generate_token_data(access=access))
|
token = self._generate_token(self._generate_token_data(access=access))
|
||||||
|
|
Reference in a new issue