From 1040c939bf8261efe9f25e57fd8e4a9a6785c51a Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 12 Jul 2017 15:13:35 -0400 Subject: [PATCH] auth.test: merge registry jwt into one pytest file --- auth/test/test_registry_jwt.py | 69 ++++++++++++++--- auth/test/test_registry_jwt_e2e.py | 115 ----------------------------- 2 files changed, 59 insertions(+), 125 deletions(-) delete mode 100644 auth/test/test_registry_jwt_e2e.py diff --git a/auth/test/test_registry_jwt.py b/auth/test/test_registry_jwt.py index 00e1aa96c..796417b32 100644 --- a/auth/test/test_registry_jwt.py +++ b/auth/test/test_registry_jwt.py @@ -5,6 +5,8 @@ import pytest from app import app, instance_keys from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException +from data import model # TODO(jzelinskie): remove this after service keys are decoupled +from data.database import ServiceKeyApprovalType from initdb import setup_database_for_testing, finished_database_for_testing from util.morecollections import AttrDict from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject @@ -16,13 +18,8 @@ TOKEN_VALIDITY_LIFETIME_S = 60 * 60 # 1 hour ANONYMOUS_SUB = '(anonymous)' SERVICE_NAME = 'quay' - -def setup_module(m): - setup_database_for_testing(m) - - -def teardown_module(m): - finished_database_for_testing(m) +# This import has to come below any references to "app". +from test.fixtures import * def _access(typ='repository', name='somens/somerepo', actions=None): @@ -73,7 +70,7 @@ def _parse_token(token): return identity_from_bearer_token(token)[0] -def test_accepted_token(): +def test_accepted_token(initialized_db): token = _token(_token_data()) identity = _parse_token(token) assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, @@ -93,7 +90,7 @@ def test_accepted_token(): (_access(actions=['*', 'push'])), (_access(actions=['*'])), (_access(actions=['pull', '*', 'push'])),]) -def test_token_with_access(access): +def test_token_with_access(access, initialized_db): token = _token(_token_data(access=access)) identity = _parse_token(token) assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, @@ -134,6 +131,58 @@ def test_token_with_access(access): ('some random token'), ('Bearer: sometokenhere'), ('\nBearer: dGVzdA'),]) -def test_invalid_jwt(token): +def test_invalid_jwt(token, initialized_db): with pytest.raises(InvalidJWTException): _parse_token(token) + + +def test_mixing_keys_e2e(initialized_db): + token_data = _token_data() + + # Create a new key for testing. + p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey', + name='newkey', metadata={}) + private_key = p.exportKey('PEM') + + # Test first with the new valid, but unapproved key. + unapproved_key_token = _token(token_data, key_id='newkey', private_key=private_key) + with pytest.raises(InvalidJWTException): + _parse_token(unapproved_key_token) + + # Approve the key and try again. + admin_user = model.user.get_user('devtable') + model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER) + + valid_token = _token(token_data, key_id='newkey', private_key=private_key) + + identity = _parse_token(valid_token) + assert identity.id == TEST_USER.username + assert len(identity.provides) == 0 + + # Try using a different private key with the existing key ID. + bad_private_token = _token(token_data, key_id='newkey', + private_key=instance_keys.local_private_key) + with pytest.raises(InvalidJWTException): + _parse_token(bad_private_token) + + # Try using a different key ID with the existing private key. + kid_mismatch_token = _token(token_data, key_id=instance_keys.local_key_id, + private_key=private_key) + with pytest.raises(InvalidJWTException): + _parse_token(kid_mismatch_token) + + # Delete the new key. + key.delete_instance(recursive=True) + + # Ensure it still works (via the cache.) + deleted_key_token = _token(token_data, key_id='newkey', private_key=private_key) + identity = _parse_token(deleted_key_token) + assert identity.id == TEST_USER.username + assert len(identity.provides) == 0 + + # Break the cache. + instance_keys.clear_cache() + + # Ensure the key no longer works. + with pytest.raises(InvalidJWTException): + _parse_token(deleted_key_token) diff --git a/auth/test/test_registry_jwt_e2e.py b/auth/test/test_registry_jwt_e2e.py deleted file mode 100644 index d7715b7a7..000000000 --- a/auth/test/test_registry_jwt_e2e.py +++ /dev/null @@ -1,115 +0,0 @@ -import unittest -import time -import jwt - -from app import app, instance_keys -from data import model -from data.database import ServiceKeyApprovalType -from initdb import setup_database_for_testing, finished_database_for_testing -from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S -from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException -from util.morecollections import AttrDict -from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject) - -TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] -TEST_USER = AttrDict({'username': 'joeuser'}) -MAX_SIGNED_S = 3660 - - -class TestRegistryV2Auth(unittest.TestCase): - def setUp(self): - setup_database_for_testing(self) - - def tearDown(self): - finished_database_for_testing(self) - - def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, - iat=None, exp=None, nbf=None, iss=None): - - _, subject = build_context_and_subject(user=user) - return { - 'iss': iss or instance_keys.service_name, - 'aud': audience, - 'nbf': nbf if nbf is not None else int(time.time()), - 'iat': iat if iat is not None else int(time.time()), - 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), - 'sub': subject, - 'access': access, - 'context': context,} - - def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, - alg=None): - key_id = key_id or instance_keys.local_key_id - private_key = private_key or instance_keys.local_private_key - - if alg == "none": - private_key = None - - token_headers = { - 'kid': key_id,} - - if skip_header: - token_headers = {} - - token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers) - return 'Bearer {0}'.format(token_data) - - def _parse_token(self, token): - return identity_from_bearer_token(token)[0] - - def test_mixing_keys(self): - token_data = self._generate_token_data() - - # Create a new key for testing. - p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, - kid='newkey', name='newkey', metadata={}) - - private_key = p.exportKey('PEM') - - # Test first with the new valid, but unapproved key. - unapproved_key_token = self._generate_token(token_data, key_id='newkey', - private_key=private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(unapproved_key_token) - - # Approve the key and try again. - admin_user = model.user.get_user('devtable') - model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER) - - valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) - - identity = self._parse_token(valid_token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(0, len(identity.provides)) - - # Try using a different private key with the existing key ID. - bad_private_token = self._generate_token(token_data, key_id='newkey', - private_key=instance_keys.local_private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(bad_private_token) - - # Try using a different key ID with the existing private key. - kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, - private_key=private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(kid_mismatch_token) - - # Delete the new key. - key.delete_instance(recursive=True) - - # Ensure it still works (via the cache.) - deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) - identity = self._parse_token(deleted_key_token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(0, len(identity.provides)) - - # Break the cache. - instance_keys.clear_cache() - - # Ensure the key no longer works. - with self.assertRaises(InvalidJWTException): - self._parse_token(deleted_key_token) - - -if __name__ == '__main__': - unittest.main()