Determine which TUF root to show based on actual access, not requested
access
This commit is contained in:
parent
7b411b2c25
commit
43dd974dca
5 changed files with 61 additions and 38 deletions
|
@ -1,18 +1,43 @@
|
|||
import pytest
|
||||
|
||||
from endpoints.v2.v2auth import attach_metadata_root_name, CLAIM_APOSTILLE_ROOT
|
||||
import flask
|
||||
from flask import g
|
||||
from flask_principal import Identity
|
||||
|
||||
from endpoints.v2.v2auth import get_tuf_root
|
||||
from auth import permissions
|
||||
|
||||
def admin_identity(namespace, reponame):
|
||||
identity = Identity('admin')
|
||||
identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'admin'))
|
||||
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'admin'))
|
||||
return identity
|
||||
|
||||
@pytest.mark.parametrize('context,access,expected', [
|
||||
({}, None, {}),
|
||||
({}, [], {}),
|
||||
({}, [{}], {}),
|
||||
({}, [{"actions": None}], {}),
|
||||
({}, [{"actions": []}], {}),
|
||||
({}, [{"actions": ["pull"]}], {CLAIM_APOSTILLE_ROOT: 'quay'}),
|
||||
({}, [{"actions": ["push"]}], {CLAIM_APOSTILLE_ROOT: 'signer'}),
|
||||
({}, [{"actions": ["pull", "push"]}], {CLAIM_APOSTILLE_ROOT: 'signer'}),
|
||||
def write_identity(namespace, reponame):
|
||||
identity = Identity('writer')
|
||||
identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'write'))
|
||||
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'write'))
|
||||
return identity
|
||||
|
||||
def read_identity(namespace, reponame):
|
||||
identity = Identity('reader')
|
||||
identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'read'))
|
||||
identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'read'))
|
||||
return identity
|
||||
|
||||
@pytest.mark.parametrize('identity,expected', [
|
||||
(Identity('anon'), 'quay'),
|
||||
(read_identity("namespace", "repo"), 'quay'),
|
||||
(read_identity("different", "repo"), 'quay'),
|
||||
(admin_identity("different", "repo"), 'quay'),
|
||||
(write_identity("different", "repo"), 'quay'),
|
||||
(admin_identity("namespace", "repo"), 'signer'),
|
||||
(write_identity("namespace", "repo"), 'signer'),
|
||||
])
|
||||
def test_attach_metadata_root_name(context, access, expected):
|
||||
actual = attach_metadata_root_name(context, access)
|
||||
def test_get_tuf_root(identity, expected):
|
||||
app = flask.Flask(__name__)
|
||||
|
||||
with app.test_request_context('/'):
|
||||
g.identity = identity
|
||||
actual = get_tuf_root("namespace", "repo")
|
||||
assert actual == expected, "should be %s, but was %s" % (expected, actual)
|
||||
|
|
|
@ -17,8 +17,6 @@ from util.cache import no_cache
|
|||
from util.names import parse_namespace_repository, REPOSITORY_NAME_REGEX
|
||||
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
|
||||
|
||||
CLAIM_APOSTILLE_ROOT = 'com.apostille.root'
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -67,6 +65,7 @@ def generate_registry_jwt(auth_result):
|
|||
user_event_data = {
|
||||
'action': 'login',
|
||||
}
|
||||
tuf_root = 'quay'
|
||||
|
||||
if len(scope_param) > 0:
|
||||
match = get_scope_regex().match(scope_param)
|
||||
|
@ -162,6 +161,8 @@ def generate_registry_jwt(auth_result):
|
|||
'repository': reponame,
|
||||
'namespace': namespace,
|
||||
}
|
||||
|
||||
tuf_root = get_tuf_root(namespace, reponame)
|
||||
|
||||
elif user is None and token is None:
|
||||
# In this case, we are doing an auth flow, and it's not an anonymous pull
|
||||
|
@ -174,28 +175,14 @@ def generate_registry_jwt(auth_result):
|
|||
event.publish_event_data('docker-cli', user_event_data)
|
||||
|
||||
# Build the signed JWT.
|
||||
context, subject = build_context_and_subject(user, token, oauthtoken)
|
||||
context = attach_metadata_root_name(context, access)
|
||||
context, subject = build_context_and_subject(user, token, oauthtoken, tuf_root)
|
||||
token = generate_bearer_token(audience_param, subject, context, access,
|
||||
TOKEN_VALIDITY_LIFETIME_S, instance_keys)
|
||||
return jsonify({'token': token})
|
||||
|
||||
|
||||
def attach_metadata_root_name(context, access):
|
||||
"""
|
||||
Adds in metadata_root_name into JWT context when appropriate
|
||||
"""
|
||||
try:
|
||||
actions = access[0]["actions"]
|
||||
except(TypeError, IndexError, KeyError):
|
||||
return context
|
||||
|
||||
if not actions:
|
||||
return context
|
||||
|
||||
if "push" in actions:
|
||||
context[CLAIM_APOSTILLE_ROOT] = 'signer'
|
||||
else:
|
||||
context[CLAIM_APOSTILLE_ROOT] = 'quay'
|
||||
|
||||
return context
|
||||
def get_tuf_root(namespace, reponame):
|
||||
# Users with write access to a repo will see signer-rooted TUF metadata
|
||||
if ModifyRepositoryPermission(namespace, reponame).can():
|
||||
return 'signer'
|
||||
return 'quay'
|
||||
|
|
Reference in a new issue