import unittest import time import jwt from app import app, instance_keys from data import model from data.database import ServiceKeyApprovalType from initdb import setup_database_for_testing, finished_database_for_testing from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException from util.morecollections import AttrDict from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject) TEST_AUDIENCE = app.config['SERVER_HOSTNAME'] TEST_USER = AttrDict({'username': 'joeuser'}) MAX_SIGNED_S = 3660 class TestRegistryV2Auth(unittest.TestCase): def setUp(self): setup_database_for_testing(self) def tearDown(self): finished_database_for_testing(self) def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None, exp=None, nbf=None, iss=None): _, subject = build_context_and_subject(user=user) return { 'iss': iss or instance_keys.service_name, 'aud': audience, 'nbf': nbf if nbf is not None else int(time.time()), 'iat': iat if iat is not None else int(time.time()), 'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S), 'sub': subject, 'access': access, 'context': context, } def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None): key_id = key_id or instance_keys.local_key_id private_key = private_key or instance_keys.local_private_key if alg == "none": private_key = None token_headers = { 'kid': key_id, } if skip_header: token_headers = {} token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers) return 'Bearer {0}'.format(token_data) def _parse_token(self, token): return identity_from_bearer_token(token)[0] def test_mixing_keys(self): token_data = self._generate_token_data() # Create a new key for testing. p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey', name='newkey', metadata={}) private_key = p.exportKey('PEM') # Test first with the new valid, but unapproved key. unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) with self.assertRaises(InvalidJWTException): self._parse_token(unapproved_key_token) # Approve the key and try again. admin_user = model.user.get_user('devtable') model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER) valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) identity = self._parse_token(valid_token) self.assertEqual(identity.id, TEST_USER.username) self.assertEqual(0, len(identity.provides)) # Try using a different private key with the existing key ID. bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key) with self.assertRaises(InvalidJWTException): self._parse_token(bad_private_token) # Try using a different key ID with the existing private key. kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key) with self.assertRaises(InvalidJWTException): self._parse_token(kid_mismatch_token) # Delete the new key. key.delete_instance(recursive=True) # Ensure it still works (via the cache.) deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key) identity = self._parse_token(deleted_key_token) self.assertEqual(identity.id, TEST_USER.username) self.assertEqual(0, len(identity.provides)) # Break the cache. instance_keys.clear_cache() # Ensure the key no longer works. with self.assertRaises(InvalidJWTException): self._parse_token(deleted_key_token) if __name__ == '__main__': unittest.main()