From 7d1bbbfe199731b030e61c30d6bd100fa473cb4f Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 5 Jul 2017 15:20:47 -0400 Subject: [PATCH] test: convert registry auth test to pytest This also moves them into the auth package. --- auth/test/test_registry_jwt.py | 127 ++++++++ auth/test/test_registry_jwt_e2e.py | 114 +++++++ .../test/{test_v2auth.py => test_v2_tuf.py} | 26 +- test/test_registry_v2_auth.py | 301 ------------------ 4 files changed, 256 insertions(+), 312 deletions(-) create mode 100644 auth/test/test_registry_jwt.py create mode 100644 auth/test/test_registry_jwt_e2e.py rename endpoints/v2/test/{test_v2auth.py => test_v2_tuf.py} (96%) delete mode 100644 test/test_registry_v2_auth.py diff --git a/auth/test/test_registry_jwt.py b/auth/test/test_registry_jwt.py new file mode 100644 index 000000000..de9dbe6a0 --- /dev/null +++ b/auth/test/test_registry_jwt.py @@ -0,0 +1,127 @@ +import time + +import jwt +import pytest + +from app import app, instance_keys +from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException +from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S +from initdb import setup_database_for_testing, finished_database_for_testing +from util.morecollections import AttrDict +from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject + + +TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] +TEST_USER = AttrDict({'username': 'joeuser'}) +MAX_SIGNED_S = 3660 + + +def _access(typ='repository', name='somens/somerepo', actions=None): + actions = [] if actions is None else actions + return [{ + 'type': typ, + 'name': name, + 'actions': actions, + }] + + +def _delete_field(token_data, field_name): + token_data.pop(field_name) + return token_data + + +def _token_data(access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, exp=None, + nbf=None, iss=None, subject=None): + if subject is None: + _, subject = build_context_and_subject(user=user) + return { + 'iss': iss or instance_keys.service_name, + 'aud': audience, + 'nbf': nbf if nbf is not None else int(time.time()), + 'iat': iat if iat is not None else int(time.time()), + 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), + 'sub': subject, + 'access': access, + 'context': context, + } + + +def _token(token_data, key_id=None, private_key=None, skip_header=False, alg=None): + key_id = key_id or instance_keys.local_key_id + private_key = private_key or instance_keys.local_private_key + + if alg == "none": + private_key = None + + token_headers = {'kid': key_id} + + if skip_header: + token_headers = {} + + token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers) + return 'Bearer {0}'.format(token_data) + + +def _parse_token(token): + return identity_from_bearer_token(token)[0] + + +def test_accepted_token(): + token = _token(_token_data()) + identity = _parse_token(token) + assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id) + assert len(identity.provides) == 0 + + anon_token = _token(_token_data(user=None)) + anon_identity = _parse_token(anon_token) + assert anon_identity.id == ANONYMOUS_SUB, 'should be %s, but was %s' % (ANONYMOUS_SUB, anon_identity.id) + assert len(identity.provides) == 0 + + +@pytest.mark.parametrize('access', [ + (_access(actions=['pull', 'push'])), + (_access(actions=['pull', '*'])), + (_access(actions=['*', 'push'])), + (_access(actions=['*'])), + (_access(actions=['pull', '*', 'push'])), +]) +def test_token_with_access(access): + token = _token(_token_data(access=access)) + identity = _parse_token(token) + assert identity.id == TEST_USER.username, 'should be %s, but was %s' % (TEST_USER.username, identity.id) + assert len(identity.provides) == 1 + + role = list(identity.provides)[0][3] + if "*" in access[0]['actions']: + assert role == 'admin' + elif "push" in access[0]['actions']: + assert role == 'write' + elif "pull" in access[0]['actions']: + assert role == 'read' + + +@pytest.mark.parametrize('token', [ + (_token(_token_data(access=[{'toipe': 'repository', 'namesies': 'somens/somerepo', 'akshuns': ['pull', 'push', '*']}]))), + (_token(_token_data(audience='someotherapp'))), + (_token(_delete_field(_token_data(), 'aud'))), + (_token(_token_data(nbf=int(time.time()) + 60))), + (_token(_delete_field(_token_data(), 'nbf'))), + (_token(_token_data(iat=int(time.time()) + 60))), + (_token(_delete_field(_token_data(), 'iat'))), + (_token(_token_data(exp=int(time.time()) + MAX_SIGNED_S * 2))), + (_token(_token_data(exp=int(time.time()) - 60))), + (_token(_delete_field(_token_data(), 'exp'))), + (_token(_delete_field(_token_data(), 'sub'))), + (_token(_token_data(iss='badissuer'))), + (_token(_delete_field(_token_data(), 'iss'))), + (_token(_token_data(), skip_header=True)), + (_token(_token_data(), key_id='someunknownkey')), + (_token(_token_data(), key_id='kid7')), + (_token(_token_data(), alg='none', private_key=None)), + ('some random token'), + ('Bearer: sometokenhere'), + ('\nBearer: dGVzdA'), +]) +def test_invalid_jwt(token): + with pytest.raises(InvalidJWTException): + _parse_token(token) diff --git a/auth/test/test_registry_jwt_e2e.py b/auth/test/test_registry_jwt_e2e.py new file mode 100644 index 000000000..94c2affbe --- /dev/null +++ b/auth/test/test_registry_jwt_e2e.py @@ -0,0 +1,114 @@ +import unittest +import time +import jwt + +from app import app, instance_keys +from data import model +from data.database import ServiceKeyApprovalType +from initdb import setup_database_for_testing, finished_database_for_testing +from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S +from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException +from util.morecollections import AttrDict +from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject) + + +TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] +TEST_USER = AttrDict({'username': 'joeuser'}) +MAX_SIGNED_S = 3660 + + +class TestRegistryV2Auth(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + + def tearDown(self): + finished_database_for_testing(self) + + def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, + exp=None, nbf=None, iss=None): + + _, subject = build_context_and_subject(user=user) + return { + 'iss': iss or instance_keys.service_name, + 'aud': audience, + 'nbf': nbf if nbf is not None else int(time.time()), + 'iat': iat if iat is not None else int(time.time()), + 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), + 'sub': subject, + 'access': access, + 'context': context, + } + + def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None): + key_id = key_id or instance_keys.local_key_id + private_key = private_key or instance_keys.local_private_key + + if alg == "none": + private_key = None + + token_headers = { + 'kid': key_id, + } + + if skip_header: + token_headers = {} + + token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers) + return 'Bearer {0}'.format(token_data) + + def _parse_token(self, token): + return identity_from_bearer_token(token)[0] + + def test_mixing_keys(self): + token_data = self._generate_token_data() + + # Create a new key for testing. + p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey', + name='newkey', metadata={}) + + private_key = p.exportKey('PEM') + + # Test first with the new valid, but unapproved key. + unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) + with self.assertRaises(InvalidJWTException): + self._parse_token(unapproved_key_token) + + # Approve the key and try again. + admin_user = model.user.get_user('devtable') + model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER) + + valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) + + identity = self._parse_token(valid_token) + self.assertEqual(identity.id, TEST_USER.username) + self.assertEqual(0, len(identity.provides)) + + # Try using a different private key with the existing key ID. + bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key) + with self.assertRaises(InvalidJWTException): + self._parse_token(bad_private_token) + + # Try using a different key ID with the existing private key. + kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key) + with self.assertRaises(InvalidJWTException): + self._parse_token(kid_mismatch_token) + + # Delete the new key. + key.delete_instance(recursive=True) + + # Ensure it still works (via the cache.) + deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) + identity = self._parse_token(deleted_key_token) + self.assertEqual(identity.id, TEST_USER.username) + self.assertEqual(0, len(identity.provides)) + + # Break the cache. + instance_keys.clear_cache() + + # Ensure the key no longer works. + with self.assertRaises(InvalidJWTException): + self._parse_token(deleted_key_token) + + +if __name__ == '__main__': + unittest.main() diff --git a/endpoints/v2/test/test_v2auth.py b/endpoints/v2/test/test_v2_tuf.py similarity index 96% rename from endpoints/v2/test/test_v2auth.py rename to endpoints/v2/test/test_v2_tuf.py index ec0502a3a..8e6ee95f9 100644 --- a/endpoints/v2/test/test_v2auth.py +++ b/endpoints/v2/test/test_v2_tuf.py @@ -1,15 +1,15 @@ import pytest - import flask -from flask_principal import Identity, Principal -from endpoints.v2.v2auth import get_tuf_root -from auth import permissions -from util.security.registry_jwt import QUAY_TUF_ROOT, SIGNER_TUF_ROOT, DISABLED_TUF_ROOT - -from test import testconfig +from flask_principal import Identity, Principal from mock import Mock +from auth import permissions +from endpoints.v2.v2auth import get_tuf_root +from test import testconfig +from util.security.registry_jwt import QUAY_TUF_ROOT, SIGNER_TUF_ROOT, DISABLED_TUF_ROOT + + def admin_identity(namespace, reponame): identity = Identity('admin') @@ -17,24 +17,28 @@ def admin_identity(namespace, reponame): identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'admin')) return identity + def write_identity(namespace, reponame): identity = Identity('writer') identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'write')) identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'write')) return identity - + + def read_identity(namespace, reponame): identity = Identity('reader') identity.provides.add(permissions._RepositoryNeed(namespace, reponame, 'read')) identity.provides.add(permissions._OrganizationRepoNeed(namespace, 'read')) return identity + def app_with_principal(): app = flask.Flask(__name__) app.config.from_object(testconfig.TestConfig()) - principal = Principal(app) + principal = Principal(app) return app, principal + @pytest.mark.parametrize('identity,expected', [ (Identity('anon'), QUAY_TUF_ROOT), (read_identity("namespace", "repo"), QUAY_TUF_ROOT), @@ -50,8 +54,8 @@ def test_get_tuf_root(identity, expected): principal.set_identity(identity) actual = get_tuf_root(Mock(), "namespace", "repo") assert actual == expected, "should be %s, but was %s" % (expected, actual) - - + + @pytest.mark.parametrize('trust_enabled,tuf_root', [ (True, QUAY_TUF_ROOT), (False, DISABLED_TUF_ROOT), diff --git a/test/test_registry_v2_auth.py b/test/test_registry_v2_auth.py deleted file mode 100644 index 19982a985..000000000 --- a/test/test_registry_v2_auth.py +++ /dev/null @@ -1,301 +0,0 @@ -import unittest -import time -import jwt - -from app import app, instance_keys -from data import model -from data.database import ServiceKeyApprovalType -from initdb import setup_database_for_testing, finished_database_for_testing -from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S -from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException -from util.morecollections import AttrDict -from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject, - generate_bearer_token) - - -TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] -TEST_USER = AttrDict({'username': 'joeuser'}) -MAX_SIGNED_S = 3660 - - -class TestRegistryV2Auth(unittest.TestCase): - def setUp(self): - setup_database_for_testing(self) - - def tearDown(self): - finished_database_for_testing(self) - - def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, - exp=None, nbf=None, iss=None): - - _, subject = build_context_and_subject(user=user) - return { - 'iss': iss or instance_keys.service_name, - 'aud': audience, - 'nbf': nbf if nbf is not None else int(time.time()), - 'iat': iat if iat is not None else int(time.time()), - 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), - 'sub': subject, - 'access': access, - 'context': context, - } - - def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None): - key_id = key_id or instance_keys.local_key_id - private_key = private_key or instance_keys.local_private_key - - if alg == "none": - private_key = None - - token_headers = { - 'kid': key_id, - } - - if skip_header: - token_headers = {} - - token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers) - return 'Bearer {0}'.format(token_data) - - def _parse_token(self, token): - return identity_from_bearer_token(token)[0] - - def test_accepted_token(self): - token = self._generate_token(self._generate_token_data()) - identity = self._parse_token(token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(0, len(identity.provides)) - - anon_token = self._generate_token(self._generate_token_data(user=None)) - anon_identity = self._parse_token(anon_token) - self.assertEqual(anon_identity.id, ANONYMOUS_SUB) - self.assertEqual(0, len(identity.provides)) - - def test_token_with_access(self): - valid_access = [ - [ - { - 'type': 'repository', - 'name': 'somens/somerepo', - 'actions': ['pull', 'push'], - } - ], - [ - { - 'type': 'repository', - 'name': 'somens/somerepo', - 'actions': ['pull', '*'], - } - ], - [ - { - 'type': 'repository', - 'name': 'somens/somerepo', - 'actions': ['*', 'push'], - } - ], - [ - { - 'type': 'repository', - 'name': 'somens/somerepo', - 'actions': ['*'], - } - ], - [ - { - 'type': 'repository', - 'name': 'somens/somerepo', - 'actions': ['pull', '*', 'push'], - } - ], - ] - for access in valid_access: - token = self._generate_token(self._generate_token_data(access=access)) - identity = self._parse_token(token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(1, len(identity.provides)) - role = list(identity.provides)[0][3] - if "*" in access[0]['actions']: - self.assertEqual(role, 'admin') - elif "push" in access[0]['actions']: - self.assertEqual(role, 'write') - elif "pull" in access[0]['actions']: - self.assertEqual(role, 'read') - - def test_malformed_access(self): - access = [ - { - 'toipe': 'repository', - 'namesies': 'somens/somerepo', - 'akshuns': ['pull', 'push', '*'], - } - ] - token = self._generate_token(self._generate_token_data(access=access)) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - def test_audience(self): - token_data = self._generate_token_data(audience='someotherapp') - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - token_data.pop('aud') - no_aud = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(no_aud) - - def test_nbf(self): - future = int(time.time()) + 60 - token_data = self._generate_token_data(nbf=future) - - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - token_data.pop('nbf') - no_nbf_token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(no_nbf_token) - - def test_iat(self): - future = int(time.time()) + 60 - token_data = self._generate_token_data(iat=future) - - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - token_data.pop('iat') - no_iat_token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(no_iat_token) - - def test_exp(self): - too_far = int(time.time()) + MAX_SIGNED_S * 2 - token_data = self._generate_token_data(exp=too_far) - - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - past = int(time.time()) - 60 - token_data['exp'] = past - expired_token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(expired_token) - - token_data.pop('exp') - no_exp_token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(no_exp_token) - - def test_no_sub(self): - token_data = self._generate_token_data() - token_data.pop('sub') - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - def test_iss(self): - token_data = self._generate_token_data(iss='badissuer') - token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(token) - - token_data.pop('iss') - no_iss_token = self._generate_token(token_data) - with self.assertRaises(InvalidJWTException): - self._parse_token(no_iss_token) - - def test_missing_header(self): - token_data = self._generate_token_data() - missing_header_token = self._generate_token(token_data, skip_header=True) - with self.assertRaises(InvalidJWTException): - self._parse_token(missing_header_token) - - def test_invalid_key(self): - token_data = self._generate_token_data() - invalid_key_token = self._generate_token(token_data, key_id='someunknownkey') - with self.assertRaises(InvalidJWTException): - self._parse_token(invalid_key_token) - - def test_expired_key(self): - token_data = self._generate_token_data() - expired_key_token = self._generate_token(token_data, key_id='kid7') - with self.assertRaises(InvalidJWTException): - self._parse_token(expired_key_token) - - def test_mixing_keys(self): - token_data = self._generate_token_data() - - # Create a new key for testing. - p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey', - name='newkey', metadata={}) - - private_key = p.exportKey('PEM') - - # Test first with the new valid, but unapproved key. - unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(unapproved_key_token) - - # Approve the key and try again. - admin_user = model.user.get_user('devtable') - model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER) - - valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) - - identity = self._parse_token(valid_token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(0, len(identity.provides)) - - # Try using a different private key with the existing key ID. - bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(bad_private_token) - - # Try using a different key ID with the existing private key. - kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key) - with self.assertRaises(InvalidJWTException): - self._parse_token(kid_mismatch_token) - - # Delete the new key. - key.delete_instance(recursive=True) - - # Ensure it still works (via the cache.) - deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) - identity = self._parse_token(deleted_key_token) - self.assertEqual(identity.id, TEST_USER.username) - self.assertEqual(0, len(identity.provides)) - - # Break the cache. - instance_keys.clear_cache() - - # Ensure the key no longer works. - with self.assertRaises(InvalidJWTException): - self._parse_token(deleted_key_token) - - def test_bad_token(self): - with self.assertRaises(InvalidJWTException): - self._parse_token("some random token here") - - def test_bad_bearer_token(self): - with self.assertRaises(InvalidJWTException): - self._parse_token("Bearer: sometokenhere") - - def test_bad_bearer_newline_token(self): - with self.assertRaises(InvalidJWTException): - self._parse_token("\nBearer: dGVzdA") - - def test_ensure_no_none(self): - token_data = self._generate_token_data() - none_token = self._generate_token(token_data, alg='none', private_key=None) - - with self.assertRaises(InvalidJWTException): - self._parse_token(none_token) - - -if __name__ == '__main__': - unittest.main() -