Switch to returning an empty set when there are invalid auth scopes
This commit is contained in:
parent
804be4d4be
commit
354f4109d0
3 changed files with 62 additions and 13 deletions
|
@ -74,12 +74,13 @@ class QuayDeferredPermissionUser(Identity):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def for_id(uuid, auth_scopes=None):
|
def for_id(uuid, auth_scopes=None):
|
||||||
return QuayDeferredPermissionUser(uuid, 'user_uuid', auth_scopes or {scopes.DIRECT_LOGIN})
|
auth_scopes = auth_scopes if auth_scopes is not None else {scopes.DIRECT_LOGIN}
|
||||||
|
return QuayDeferredPermissionUser(uuid, 'user_uuid', auth_scopes)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def for_user(user, auth_scopes=None):
|
def for_user(user, auth_scopes=None):
|
||||||
return QuayDeferredPermissionUser(user.uuid, 'user_uuid', auth_scopes or {scopes.DIRECT_LOGIN},
|
auth_scopes = auth_scopes if auth_scopes is not None else {scopes.DIRECT_LOGIN}
|
||||||
user=user)
|
return QuayDeferredPermissionUser(user.uuid, 'user_uuid', auth_scopes, user=user)
|
||||||
|
|
||||||
def _translate_role_for_scopes(self, cardinality, max_roles, role):
|
def _translate_role_for_scopes(self, cardinality, max_roles, role):
|
||||||
if self._scope_set is None:
|
if self._scope_set is None:
|
||||||
|
|
|
@ -100,12 +100,13 @@ def scopes_from_scope_string(scopes):
|
||||||
# Note: The scopes string should be space seperated according to the spec:
|
# Note: The scopes string should be space seperated according to the spec:
|
||||||
# https://tools.ietf.org/html/rfc6749#section-3.3
|
# https://tools.ietf.org/html/rfc6749#section-3.3
|
||||||
# However, we also support commas for backwards compatibility with existing callers to our code.
|
# However, we also support commas for backwards compatibility with existing callers to our code.
|
||||||
return {ALL_SCOPES.get(scope, None) for scope in re.split(' |,', scopes)}
|
scope_set = {ALL_SCOPES.get(scope, None) for scope in re.split(' |,', scopes)}
|
||||||
|
return scope_set if not None in scope_set else {}
|
||||||
|
|
||||||
|
|
||||||
def validate_scope_string(scopes):
|
def validate_scope_string(scopes):
|
||||||
decoded = scopes_from_scope_string(scopes)
|
decoded = scopes_from_scope_string(scopes)
|
||||||
return None not in decoded and len(decoded) > 0
|
return len(decoded) > 0
|
||||||
|
|
||||||
|
|
||||||
def is_subset_string(full_string, expected_string):
|
def is_subset_string(full_string, expected_string):
|
||||||
|
@ -113,6 +114,9 @@ def is_subset_string(full_string, expected_string):
|
||||||
in full_string.
|
in full_string.
|
||||||
"""
|
"""
|
||||||
full_scopes = scopes_from_scope_string(full_string)
|
full_scopes = scopes_from_scope_string(full_string)
|
||||||
|
if not full_scopes:
|
||||||
|
return False
|
||||||
|
|
||||||
full_implied_scopes = set.union(*[IMPLIED_SCOPES[scope] for scope in full_scopes])
|
full_implied_scopes = set.union(*[IMPLIED_SCOPES[scope] for scope in full_scopes])
|
||||||
expected_scopes = scopes_from_scope_string(expected_string)
|
expected_scopes = scopes_from_scope_string(expected_string)
|
||||||
return expected_scopes.issubset(full_implied_scopes)
|
return expected_scopes.issubset(full_implied_scopes)
|
||||||
|
@ -122,13 +126,12 @@ def get_scope_information(scopes_string):
|
||||||
scopes = scopes_from_scope_string(scopes_string)
|
scopes = scopes_from_scope_string(scopes_string)
|
||||||
scope_info = []
|
scope_info = []
|
||||||
for scope in scopes:
|
for scope in scopes:
|
||||||
if scope:
|
scope_info.append({
|
||||||
scope_info.append({
|
'title': scope.title,
|
||||||
'title': scope.title,
|
'scope': scope.scope,
|
||||||
'scope': scope.scope,
|
'description': scope.description,
|
||||||
'description': scope.description,
|
'icon': scope.icon,
|
||||||
'icon': scope.icon,
|
'dangerous': scope.dangerous,
|
||||||
'dangerous': scope.dangerous,
|
})
|
||||||
})
|
|
||||||
|
|
||||||
return scope_info
|
return scope_info
|
||||||
|
|
|
@ -9,6 +9,8 @@ from flask import g
|
||||||
from flask.ext.principal import identity_loaded
|
from flask.ext.principal import identity_loaded
|
||||||
|
|
||||||
from auth.auth import _process_basic_auth
|
from auth.auth import _process_basic_auth
|
||||||
|
from auth.scopes import scopes_from_scope_string, is_subset_string, DIRECT_LOGIN, ADMIN_REPO
|
||||||
|
from auth.permissions import QuayDeferredPermissionUser
|
||||||
from endpoints.api import api_bp, api
|
from endpoints.api import api_bp, api
|
||||||
from endpoints.api.user import User, Signin
|
from endpoints.api.user import User, Signin
|
||||||
|
|
||||||
|
@ -124,6 +126,49 @@ class TestAuth(ApiTestCase):
|
||||||
self.conduct_basic_auth(robot.username, 'someinvalidcode')
|
self.conduct_basic_auth(robot.username, 'someinvalidcode')
|
||||||
self.verify_no_identity()
|
self.verify_no_identity()
|
||||||
|
|
||||||
|
def test_deferred_permissions_scopes(self):
|
||||||
|
self.assertEquals(QuayDeferredPermissionUser.for_id('123454')._scope_set, {DIRECT_LOGIN})
|
||||||
|
self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {})._scope_set, {})
|
||||||
|
self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {ADMIN_REPO})._scope_set, {ADMIN_REPO})
|
||||||
|
|
||||||
|
def assertParsedScopes(self, scopes_str, *args):
|
||||||
|
expected = list(args)
|
||||||
|
parsed = scopes_from_scope_string(scopes_str)
|
||||||
|
self.assertEquals([p.scope for p in parsed], expected)
|
||||||
|
|
||||||
|
def test_scopes_parsing(self):
|
||||||
|
# Valid single scopes.
|
||||||
|
self.assertParsedScopes('repo:read', 'repo:read')
|
||||||
|
self.assertParsedScopes('repo:admin', 'repo:admin')
|
||||||
|
|
||||||
|
# Invalid scopes.
|
||||||
|
self.assertParsedScopes('not:valid')
|
||||||
|
self.assertParsedScopes('repo:admins')
|
||||||
|
|
||||||
|
# Valid scope strings.
|
||||||
|
self.assertParsedScopes('repo:read repo:admin', 'repo:read', 'repo:admin')
|
||||||
|
self.assertParsedScopes('repo:read,repo:admin', 'repo:read', 'repo:admin')
|
||||||
|
self.assertParsedScopes('repo:read,repo:admin repo:write', 'repo:read', 'repo:admin',
|
||||||
|
'repo:write')
|
||||||
|
|
||||||
|
# Partially invalid scopes.
|
||||||
|
self.assertParsedScopes('repo:read,not:valid')
|
||||||
|
self.assertParsedScopes('repo:read repo:admins')
|
||||||
|
|
||||||
|
# Invalid scope strings.
|
||||||
|
self.assertParsedScopes('repo:read|repo:admin')
|
||||||
|
|
||||||
|
def test_subset_string(self):
|
||||||
|
self.assertTrue(is_subset_string('repo:read', 'repo:read'))
|
||||||
|
self.assertTrue(is_subset_string('repo:read repo:admin', 'repo:read'))
|
||||||
|
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:read'))
|
||||||
|
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin'))
|
||||||
|
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin repo:read'))
|
||||||
|
|
||||||
|
self.assertFalse(is_subset_string('', 'repo:read'))
|
||||||
|
self.assertFalse(is_subset_string('unknown:tag', 'repo:read'))
|
||||||
|
self.assertFalse(is_subset_string('repo:read unknown:tag', 'repo:read'))
|
||||||
|
self.assertFalse(is_subset_string('repo:read,unknown:tag', 'repo:read'))
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Reference in a new issue