From 9e96e6870f2bdbb363a5fefca484d8a3732414c2 Mon Sep 17 00:00:00 2001 From: Evan Cordell Date: Fri, 18 Nov 2016 14:46:11 -0500 Subject: [PATCH] Add support for * (admin) permission to registry auth v2 endpoint --- endpoints/v2/v2auth.py | 9 ++++++++- test/test_registry_v2_auth.py | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index 60d419662..45f248961 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -7,7 +7,7 @@ from flask import request, jsonify, abort from app import app, userevents, instance_keys from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token from auth.permissions import (ModifyRepositoryPermission, ReadRepositoryPermission, - CreateRepositoryPermission) + CreateRepositoryPermission, AdministerRepositoryPermission) from auth.process import process_auth from endpoints.v2 import v2_bp from endpoints.decorators import anon_protect @@ -118,6 +118,13 @@ def generate_registry_jwt(): else: logger.debug('No permission to pull repository %s/%s', namespace, reponame) + if '*' in actions: + # Grant * user is admin + if (AdministerRepositoryPermission(namespace, reponame).can()): + final_actions.append('*') + else: + logger.debug("No permission to administer repository %s/%s", namespace, reponame) + # Add the access for the JWT. access.append({ 'type': 'repository', diff --git a/test/test_registry_v2_auth.py b/test/test_registry_v2_auth.py index be3d40c03..2e8779a81 100644 --- a/test/test_registry_v2_auth.py +++ b/test/test_registry_v2_auth.py @@ -82,7 +82,20 @@ class TestRegistryV2Auth(unittest.TestCase): identity = self._parse_token(token) self.assertEqual(identity.id, TEST_USER.username) self.assertEqual(1, len(identity.provides)) - + + def test_token_with_admin_access(self): + access = [ + { + 'type': 'repository', + 'name': 'somens/somerepo', + 'actions': ['*'], + } + ] + token = self._generate_token(self._generate_token_data(access=access)) + identity = self._parse_token(token) + self.assertEqual(identity.id, TEST_USER.username) + self.assertEqual(1, len(identity.provides)) + def test_malformed_access(self): access = [ {