Add support for * (admin) permission to registry auth v2 endpoint

This commit is contained in:
Evan Cordell 2016-11-18 14:46:11 -05:00
parent c4fbc8e2e4
commit 9e96e6870f
2 changed files with 22 additions and 2 deletions

View file

@ -7,7 +7,7 @@ from flask import request, jsonify, abort
from app import app, userevents, instance_keys from app import app, userevents, instance_keys
from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token
from auth.permissions import (ModifyRepositoryPermission, ReadRepositoryPermission, from auth.permissions import (ModifyRepositoryPermission, ReadRepositoryPermission,
CreateRepositoryPermission) CreateRepositoryPermission, AdministerRepositoryPermission)
from auth.process import process_auth from auth.process import process_auth
from endpoints.v2 import v2_bp from endpoints.v2 import v2_bp
from endpoints.decorators import anon_protect from endpoints.decorators import anon_protect
@ -118,6 +118,13 @@ def generate_registry_jwt():
else: else:
logger.debug('No permission to pull repository %s/%s', namespace, reponame) logger.debug('No permission to pull repository %s/%s', namespace, reponame)
if '*' in actions:
# Grant * user is admin
if (AdministerRepositoryPermission(namespace, reponame).can()):
final_actions.append('*')
else:
logger.debug("No permission to administer repository %s/%s", namespace, reponame)
# Add the access for the JWT. # Add the access for the JWT.
access.append({ access.append({
'type': 'repository', 'type': 'repository',

View file

@ -82,7 +82,20 @@ class TestRegistryV2Auth(unittest.TestCase):
identity = self._parse_token(token) identity = self._parse_token(token)
self.assertEqual(identity.id, TEST_USER.username) self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(1, len(identity.provides)) self.assertEqual(1, len(identity.provides))
def test_token_with_admin_access(self):
access = [
{
'type': 'repository',
'name': 'somens/somerepo',
'actions': ['*'],
}
]
token = self._generate_token(self._generate_token_data(access=access))
identity = self._parse_token(token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(1, len(identity.provides))
def test_malformed_access(self): def test_malformed_access(self):
access = [ access = [
{ {