From 8323c51e6ede740d5818f878cfede979929f748a Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 23 May 2016 16:36:48 -0400 Subject: [PATCH] Extend registry auth to support notary JWTs. --- endpoints/v2/v2auth.py | 20 +++++++++++++++----- test/registry_tests.py | 24 ++++++++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index f5de3943e..7f857b70c 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -9,6 +9,7 @@ from auth.auth import process_auth from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token from auth.permissions import (ModifyRepositoryPermission, ReadRepositoryPermission, CreateRepositoryPermission) +from cachetools import lru_cache from endpoints.v2 import v2_bp from endpoints.decorators import anon_protect from util.cache import no_cache @@ -20,10 +21,18 @@ logger = logging.getLogger(__name__) TOKEN_VALIDITY_LIFETIME_S = 60 * 60 # 1 hour -SCOPE_REGEX = re.compile( - r'^repository:(([\.a-zA-Z0-9_\-]+/)?[\.a-zA-Z0-9_\-]+):(((push|pull|\*),)*(push|pull|\*))$' +SCOPE_REGEX_TEMPLATE = ( + r'^repository:((?:{}\/)?((?:[\.a-zA-Z0-9_\-]+\/)?[\.a-zA-Z0-9_\-]+)):((?:push|pull|\*)(?:,(?:push|pull|\*))*)$' ) + +@lru_cache(maxsize=1) +def get_scope_regex(): + hostname = re.escape(app.config['SERVER_HOSTNAME']) + scope_regex_string = SCOPE_REGEX_TEMPLATE.format(hostname) + return re.compile(scope_regex_string) + + @v2_bp.route('/auth') @process_auth @no_cache @@ -59,7 +68,7 @@ def generate_registry_jwt(): } if len(scope_param) > 0: - match = SCOPE_REGEX.match(scope_param) + match = get_scope_regex().match(scope_param) if match is None: logger.debug('Match: %s', match) logger.debug('len: %s', len(scope_param)) @@ -68,7 +77,8 @@ def generate_registry_jwt(): logger.debug('Match: %s', match.groups()) - namespace_and_repo = match.group(1) + registry_and_repo = match.group(1) + namespace_and_repo = match.group(2) actions = match.group(3).split(',') lib_namespace = app.config['LIBRARY_NAMESPACE'] @@ -112,7 +122,7 @@ def generate_registry_jwt(): # Add the access for the JWT. access.append({ 'type': 'repository', - 'name': namespace_and_repo, + 'name': registry_and_repo, 'actions': final_actions, }) diff --git a/test/registry_tests.py b/test/registry_tests.py index a5af86700..9f34d2c41 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -703,7 +703,7 @@ class V1RegistryLoginMixin(object): class V2RegistryLoginMixin(object): - def do_login(self, username, password, scope, expect_success=True): + def do_login(self, username, password, scope, expect_success=True, expected_failure_code=401): params = { 'account': username, 'scope': scope, @@ -713,7 +713,7 @@ class V2RegistryLoginMixin(object): if expect_success: expected_code = 200 else: - expected_code = 401 + expected_code = expected_failure_code auth = None if username and password: @@ -1757,8 +1757,9 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base return cert_obj.public_key() - def do_logincheck(self, username, password, scope, expected_actions=[], expect_success=True): - response = self.do_login(username, password, scope, expect_success=expect_success) + def do_logincheck(self, username, password, scope, expected_actions=[], expect_success=True, + **kwargs): + response = self.do_login(username, password, scope, expect_success=expect_success, **kwargs) if not expect_success: return @@ -1803,6 +1804,21 @@ class V2LoginTests(V2RegistryLoginMixin, LoginTests, RegistryTestCaseMixin, Base scope='repository:devtable/simple:pull', expected_actions=[]) + def test_validuser_withendpoint(self): + self.do_logincheck('devtable', 'password', expect_success=True, + scope='repository:localhost:5000/devtable/simple:pull,push', + expected_actions=['push', 'pull']) + + def test_validuser_invalid_endpoint(self): + self.do_logincheck('public', 'password', expect_success=False, expected_failure_code=400, + scope='repository:someotherrepo.com/devtable/simple:pull,push', + expected_actions=[]) + + def test_validuser_malformed_endpoint(self): + self.do_logincheck('public', 'password', expect_success=False, expected_failure_code=400, + scope='repository:localhost:5000/registryroot/devtable/simple:pull,push', + expected_actions=[]) + def test_validuser_noscope(self): self.do_logincheck('public', 'password', expect_success=True, scope=None)