auth.test: merge registry jwt into one pytest file
This commit is contained in:
parent
880871a2ea
commit
1040c939bf
2 changed files with 59 additions and 125 deletions
|
@ -5,6 +5,8 @@ import pytest
|
||||||
|
|
||||||
from app import app, instance_keys
|
from app import app, instance_keys
|
||||||
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
|
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
|
||||||
|
from data import model # TODO(jzelinskie): remove this after service keys are decoupled
|
||||||
|
from data.database import ServiceKeyApprovalType
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
|
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
|
||||||
|
@ -16,13 +18,8 @@ TOKEN_VALIDITY_LIFETIME_S = 60 * 60 # 1 hour
|
||||||
ANONYMOUS_SUB = '(anonymous)'
|
ANONYMOUS_SUB = '(anonymous)'
|
||||||
SERVICE_NAME = 'quay'
|
SERVICE_NAME = 'quay'
|
||||||
|
|
||||||
|
# This import has to come below any references to "app".
|
||||||
def setup_module(m):
|
from test.fixtures import *
|
||||||
setup_database_for_testing(m)
|
|
||||||
|
|
||||||
|
|
||||||
def teardown_module(m):
|
|
||||||
finished_database_for_testing(m)
|
|
||||||
|
|
||||||
|
|
||||||
def _access(typ='repository', name='somens/somerepo', actions=None):
|
def _access(typ='repository', name='somens/somerepo', actions=None):
|
||||||
|
@ -73,7 +70,7 @@ def _parse_token(token):
|
||||||
return identity_from_bearer_token(token)[0]
|
return identity_from_bearer_token(token)[0]
|
||||||
|
|
||||||
|
|
||||||
def test_accepted_token():
|
def test_accepted_token(initialized_db):
|
||||||
token = _token(_token_data())
|
token = _token(_token_data())
|
||||||
identity = _parse_token(token)
|
identity = _parse_token(token)
|
||||||
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
||||||
|
@ -93,7 +90,7 @@ def test_accepted_token():
|
||||||
(_access(actions=['*', 'push'])),
|
(_access(actions=['*', 'push'])),
|
||||||
(_access(actions=['*'])),
|
(_access(actions=['*'])),
|
||||||
(_access(actions=['pull', '*', 'push'])),])
|
(_access(actions=['pull', '*', 'push'])),])
|
||||||
def test_token_with_access(access):
|
def test_token_with_access(access, initialized_db):
|
||||||
token = _token(_token_data(access=access))
|
token = _token(_token_data(access=access))
|
||||||
identity = _parse_token(token)
|
identity = _parse_token(token)
|
||||||
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username,
|
||||||
|
@ -134,6 +131,58 @@ def test_token_with_access(access):
|
||||||
('some random token'),
|
('some random token'),
|
||||||
('Bearer: sometokenhere'),
|
('Bearer: sometokenhere'),
|
||||||
('\nBearer: dGVzdA'),])
|
('\nBearer: dGVzdA'),])
|
||||||
def test_invalid_jwt(token):
|
def test_invalid_jwt(token, initialized_db):
|
||||||
with pytest.raises(InvalidJWTException):
|
with pytest.raises(InvalidJWTException):
|
||||||
_parse_token(token)
|
_parse_token(token)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mixing_keys_e2e(initialized_db):
|
||||||
|
token_data = _token_data()
|
||||||
|
|
||||||
|
# Create a new key for testing.
|
||||||
|
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
|
||||||
|
name='newkey', metadata={})
|
||||||
|
private_key = p.exportKey('PEM')
|
||||||
|
|
||||||
|
# Test first with the new valid, but unapproved key.
|
||||||
|
unapproved_key_token = _token(token_data, key_id='newkey', private_key=private_key)
|
||||||
|
with pytest.raises(InvalidJWTException):
|
||||||
|
_parse_token(unapproved_key_token)
|
||||||
|
|
||||||
|
# Approve the key and try again.
|
||||||
|
admin_user = model.user.get_user('devtable')
|
||||||
|
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
|
||||||
|
|
||||||
|
valid_token = _token(token_data, key_id='newkey', private_key=private_key)
|
||||||
|
|
||||||
|
identity = _parse_token(valid_token)
|
||||||
|
assert identity.id == TEST_USER.username
|
||||||
|
assert len(identity.provides) == 0
|
||||||
|
|
||||||
|
# Try using a different private key with the existing key ID.
|
||||||
|
bad_private_token = _token(token_data, key_id='newkey',
|
||||||
|
private_key=instance_keys.local_private_key)
|
||||||
|
with pytest.raises(InvalidJWTException):
|
||||||
|
_parse_token(bad_private_token)
|
||||||
|
|
||||||
|
# Try using a different key ID with the existing private key.
|
||||||
|
kid_mismatch_token = _token(token_data, key_id=instance_keys.local_key_id,
|
||||||
|
private_key=private_key)
|
||||||
|
with pytest.raises(InvalidJWTException):
|
||||||
|
_parse_token(kid_mismatch_token)
|
||||||
|
|
||||||
|
# Delete the new key.
|
||||||
|
key.delete_instance(recursive=True)
|
||||||
|
|
||||||
|
# Ensure it still works (via the cache.)
|
||||||
|
deleted_key_token = _token(token_data, key_id='newkey', private_key=private_key)
|
||||||
|
identity = _parse_token(deleted_key_token)
|
||||||
|
assert identity.id == TEST_USER.username
|
||||||
|
assert len(identity.provides) == 0
|
||||||
|
|
||||||
|
# Break the cache.
|
||||||
|
instance_keys.clear_cache()
|
||||||
|
|
||||||
|
# Ensure the key no longer works.
|
||||||
|
with pytest.raises(InvalidJWTException):
|
||||||
|
_parse_token(deleted_key_token)
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
import unittest
|
|
||||||
import time
|
|
||||||
import jwt
|
|
||||||
|
|
||||||
from app import app, instance_keys
|
|
||||||
from data import model
|
|
||||||
from data.database import ServiceKeyApprovalType
|
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
||||||
from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S
|
|
||||||
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
|
|
||||||
from util.morecollections import AttrDict
|
|
||||||
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject)
|
|
||||||
|
|
||||||
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
|
||||||
TEST_USER = AttrDict({'username': 'joeuser'})
|
|
||||||
MAX_SIGNED_S = 3660
|
|
||||||
|
|
||||||
|
|
||||||
class TestRegistryV2Auth(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
setup_database_for_testing(self)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
finished_database_for_testing(self)
|
|
||||||
|
|
||||||
def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER,
|
|
||||||
iat=None, exp=None, nbf=None, iss=None):
|
|
||||||
|
|
||||||
_, subject = build_context_and_subject(user=user)
|
|
||||||
return {
|
|
||||||
'iss': iss or instance_keys.service_name,
|
|
||||||
'aud': audience,
|
|
||||||
'nbf': nbf if nbf is not None else int(time.time()),
|
|
||||||
'iat': iat if iat is not None else int(time.time()),
|
|
||||||
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
|
|
||||||
'sub': subject,
|
|
||||||
'access': access,
|
|
||||||
'context': context,}
|
|
||||||
|
|
||||||
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False,
|
|
||||||
alg=None):
|
|
||||||
key_id = key_id or instance_keys.local_key_id
|
|
||||||
private_key = private_key or instance_keys.local_private_key
|
|
||||||
|
|
||||||
if alg == "none":
|
|
||||||
private_key = None
|
|
||||||
|
|
||||||
token_headers = {
|
|
||||||
'kid': key_id,}
|
|
||||||
|
|
||||||
if skip_header:
|
|
||||||
token_headers = {}
|
|
||||||
|
|
||||||
token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers)
|
|
||||||
return 'Bearer {0}'.format(token_data)
|
|
||||||
|
|
||||||
def _parse_token(self, token):
|
|
||||||
return identity_from_bearer_token(token)[0]
|
|
||||||
|
|
||||||
def test_mixing_keys(self):
|
|
||||||
token_data = self._generate_token_data()
|
|
||||||
|
|
||||||
# Create a new key for testing.
|
|
||||||
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None,
|
|
||||||
kid='newkey', name='newkey', metadata={})
|
|
||||||
|
|
||||||
private_key = p.exportKey('PEM')
|
|
||||||
|
|
||||||
# Test first with the new valid, but unapproved key.
|
|
||||||
unapproved_key_token = self._generate_token(token_data, key_id='newkey',
|
|
||||||
private_key=private_key)
|
|
||||||
with self.assertRaises(InvalidJWTException):
|
|
||||||
self._parse_token(unapproved_key_token)
|
|
||||||
|
|
||||||
# Approve the key and try again.
|
|
||||||
admin_user = model.user.get_user('devtable')
|
|
||||||
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
|
|
||||||
|
|
||||||
valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
|
||||||
|
|
||||||
identity = self._parse_token(valid_token)
|
|
||||||
self.assertEqual(identity.id, TEST_USER.username)
|
|
||||||
self.assertEqual(0, len(identity.provides))
|
|
||||||
|
|
||||||
# Try using a different private key with the existing key ID.
|
|
||||||
bad_private_token = self._generate_token(token_data, key_id='newkey',
|
|
||||||
private_key=instance_keys.local_private_key)
|
|
||||||
with self.assertRaises(InvalidJWTException):
|
|
||||||
self._parse_token(bad_private_token)
|
|
||||||
|
|
||||||
# Try using a different key ID with the existing private key.
|
|
||||||
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id,
|
|
||||||
private_key=private_key)
|
|
||||||
with self.assertRaises(InvalidJWTException):
|
|
||||||
self._parse_token(kid_mismatch_token)
|
|
||||||
|
|
||||||
# Delete the new key.
|
|
||||||
key.delete_instance(recursive=True)
|
|
||||||
|
|
||||||
# Ensure it still works (via the cache.)
|
|
||||||
deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
|
||||||
identity = self._parse_token(deleted_key_token)
|
|
||||||
self.assertEqual(identity.id, TEST_USER.username)
|
|
||||||
self.assertEqual(0, len(identity.provides))
|
|
||||||
|
|
||||||
# Break the cache.
|
|
||||||
instance_keys.clear_cache()
|
|
||||||
|
|
||||||
# Ensure the key no longer works.
|
|
||||||
with self.assertRaises(InvalidJWTException):
|
|
||||||
self._parse_token(deleted_key_token)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
Reference in a new issue