This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/auth/test/test_registry_jwt_e2e.py
Jimmy Zelinskie 7d1bbbfe19 test: convert registry auth test to pytest
This also moves them into the auth package.
2017-07-12 15:14:12 -04:00

114 lines
4 KiB
Python

import unittest
import time
import jwt
from app import app, instance_keys
from data import model
from data.database import ServiceKeyApprovalType
from initdb import setup_database_for_testing, finished_database_for_testing
from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
from util.morecollections import AttrDict
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject)
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
TEST_USER = AttrDict({'username': 'joeuser'})
MAX_SIGNED_S = 3660
class TestRegistryV2Auth(unittest.TestCase):
def setUp(self):
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
def _generate_token_data(self, access=[], context=None, audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
exp=None, nbf=None, iss=None):
_, subject = build_context_and_subject(user=user)
return {
'iss': iss or instance_keys.service_name,
'aud': audience,
'nbf': nbf if nbf is not None else int(time.time()),
'iat': iat if iat is not None else int(time.time()),
'exp': exp if exp is not None else int(time.time() + TOKEN_VALIDITY_LIFETIME_S),
'sub': subject,
'access': access,
'context': context,
}
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None):
key_id = key_id or instance_keys.local_key_id
private_key = private_key or instance_keys.local_private_key
if alg == "none":
private_key = None
token_headers = {
'kid': key_id,
}
if skip_header:
token_headers = {}
token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers)
return 'Bearer {0}'.format(token_data)
def _parse_token(self, token):
return identity_from_bearer_token(token)[0]
def test_mixing_keys(self):
token_data = self._generate_token_data()
# Create a new key for testing.
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
name='newkey', metadata={})
private_key = p.exportKey('PEM')
# Test first with the new valid, but unapproved key.
unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(unapproved_key_token)
# Approve the key and try again.
admin_user = model.user.get_user('devtable')
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
identity = self._parse_token(valid_token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(0, len(identity.provides))
# Try using a different private key with the existing key ID.
bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(bad_private_token)
# Try using a different key ID with the existing private key.
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key)
with self.assertRaises(InvalidJWTException):
self._parse_token(kid_mismatch_token)
# Delete the new key.
key.delete_instance(recursive=True)
# Ensure it still works (via the cache.)
deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
identity = self._parse_token(deleted_key_token)
self.assertEqual(identity.id, TEST_USER.username)
self.assertEqual(0, len(identity.provides))
# Break the cache.
instance_keys.clear_cache()
# Ensure the key no longer works.
with self.assertRaises(InvalidJWTException):
self._parse_token(deleted_key_token)
if __name__ == '__main__':
unittest.main()