Use the instance service key for registry JWT signing
This commit is contained in:
parent
a4aa5cc02a
commit
8887f09ba8
26 changed files with 457 additions and 278 deletions
|
@ -2,15 +2,15 @@ import unittest
|
|||
import time
|
||||
import jwt
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
from app import app
|
||||
from app import app, instance_keys
|
||||
from data import model
|
||||
from data.database import ServiceKeyApprovalType
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from endpoints.v2.v2auth import TOKEN_VALIDITY_LIFETIME_S
|
||||
from auth.registry_jwt_auth import identity_from_bearer_token, load_public_key, InvalidJWTException
|
||||
from auth.registry_jwt_auth import identity_from_bearer_token, InvalidJWTException
|
||||
from util.morecollections import AttrDict
|
||||
from util.security.registry_jwt import (_load_certificate_bytes, _load_private_key, ANONYMOUS_SUB,
|
||||
build_context_and_subject)
|
||||
from util.security.registry_jwt import (ANONYMOUS_SUB, build_context_and_subject,
|
||||
decode_bearer_token, generate_bearer_token)
|
||||
|
||||
|
||||
TEST_AUDIENCE = app.config['SERVER_HOSTNAME']
|
||||
|
@ -19,20 +19,18 @@ MAX_SIGNED_S = 3660
|
|||
|
||||
|
||||
class TestRegistryV2Auth(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TestRegistryV2Auth, self).__init__(*args, **kwargs)
|
||||
self.public_key = None
|
||||
|
||||
def setUp(self):
|
||||
certificate_file_path = app.config['JWT_AUTH_CERTIFICATE_PATH']
|
||||
self.public_key = load_public_key(certificate_file_path)
|
||||
setup_database_for_testing(self)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
def _generate_token_data(self, access=[], audience=TEST_AUDIENCE, user=TEST_USER, iat=None,
|
||||
exp=None, nbf=None, iss=app.config['JWT_AUTH_TOKEN_ISSUER']):
|
||||
exp=None, nbf=None, iss=None):
|
||||
|
||||
_, subject = build_context_and_subject(user, None, None)
|
||||
return {
|
||||
'iss': iss,
|
||||
'iss': iss or instance_keys.service_name,
|
||||
'aud': audience,
|
||||
'nbf': nbf if nbf is not None else int(time.time()),
|
||||
'iat': iat if iat is not None else int(time.time()),
|
||||
|
@ -41,28 +39,25 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
'access': access,
|
||||
}
|
||||
|
||||
def _generate_token(self, token_data):
|
||||
def _generate_token(self, token_data, key_id=None, private_key=None, skip_header=False, alg=None):
|
||||
key_id = key_id or instance_keys.local_key_id
|
||||
private_key = private_key or instance_keys.local_private_key
|
||||
|
||||
certificate = _load_certificate_bytes(app.config['JWT_AUTH_CERTIFICATE_PATH'])
|
||||
if alg == "none":
|
||||
private_key = None
|
||||
|
||||
token_headers = {
|
||||
'x5c': [certificate],
|
||||
'kid': key_id,
|
||||
}
|
||||
|
||||
private_key = _load_private_key(app.config['JWT_AUTH_PRIVATE_KEY_PATH'])
|
||||
token_data = jwt.encode(token_data, private_key, 'RS256', headers=token_headers)
|
||||
if skip_header:
|
||||
token_headers = {}
|
||||
|
||||
token_data = jwt.encode(token_data, private_key, alg or 'RS256', headers=token_headers)
|
||||
return 'Bearer {0}'.format(token_data)
|
||||
|
||||
def _parse_token(self, token):
|
||||
return identity_from_bearer_token(token, MAX_SIGNED_S, self.public_key)[0]
|
||||
|
||||
def _generate_public_key(self):
|
||||
key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=1024,
|
||||
backend=default_backend()
|
||||
)
|
||||
return key.public_key()
|
||||
return identity_from_bearer_token(token)[0]
|
||||
|
||||
def test_accepted_token(self):
|
||||
token = self._generate_token(self._generate_token_data())
|
||||
|
@ -100,12 +95,6 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(token)
|
||||
|
||||
def test_bad_signature(self):
|
||||
token = self._generate_token(self._generate_token_data())
|
||||
other_public_key = self._generate_public_key()
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
identity_from_bearer_token(token, MAX_SIGNED_S, other_public_key)
|
||||
|
||||
def test_audience(self):
|
||||
token_data = self._generate_token_data(audience='someotherapp')
|
||||
token = self._generate_token(token_data)
|
||||
|
@ -171,7 +160,6 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
|
||||
def test_iss(self):
|
||||
token_data = self._generate_token_data(iss='badissuer')
|
||||
|
||||
token = self._generate_token(token_data)
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(token)
|
||||
|
@ -181,9 +169,94 @@ class TestRegistryV2Auth(unittest.TestCase):
|
|||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(no_iss_token)
|
||||
|
||||
def test_missing_header(self):
|
||||
token_data = self._generate_token_data()
|
||||
missing_header_token = self._generate_token(token_data, skip_header=True)
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(missing_header_token)
|
||||
|
||||
def test_invalid_key(self):
|
||||
token_data = self._generate_token_data()
|
||||
invalid_key_token = self._generate_token(token_data, key_id='someunknownkey')
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(invalid_key_token)
|
||||
|
||||
def test_expired_key(self):
|
||||
token_data = self._generate_token_data()
|
||||
expired_key_token = self._generate_token(token_data, key_id='kid7')
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(expired_key_token)
|
||||
|
||||
def test_mixing_keys(self):
|
||||
token_data = self._generate_token_data()
|
||||
|
||||
# Create a new key for testing.
|
||||
p, key = model.service_keys.generate_service_key(instance_keys.service_name, None, kid='newkey',
|
||||
name='newkey', metadata={})
|
||||
|
||||
private_key = p.exportKey('PEM')
|
||||
|
||||
# Test first with the new valid, but unapproved key.
|
||||
unapproved_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(unapproved_key_token)
|
||||
|
||||
# Approve the key and try again.
|
||||
admin_user = model.user.get_user('devtable')
|
||||
model.service_keys.approve_service_key(key.kid, admin_user, ServiceKeyApprovalType.SUPERUSER)
|
||||
|
||||
valid_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
||||
|
||||
identity = self._parse_token(valid_token)
|
||||
self.assertEqual(identity.id, TEST_USER.username)
|
||||
self.assertEqual(0, len(identity.provides))
|
||||
|
||||
# Try using a different private key with the existing key ID.
|
||||
bad_private_token = self._generate_token(token_data, key_id='newkey', private_key=instance_keys.local_private_key)
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(bad_private_token)
|
||||
|
||||
# Try using a different key ID with the existing private key.
|
||||
kid_mismatch_token = self._generate_token(token_data, key_id=instance_keys.local_key_id, private_key=private_key)
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(kid_mismatch_token)
|
||||
|
||||
# Delete the new key.
|
||||
key.delete_instance(recursive=True)
|
||||
|
||||
# Ensure it still works (via the cache.)
|
||||
deleted_key_token = self._generate_token(token_data, key_id='newkey', private_key=private_key)
|
||||
identity = self._parse_token(deleted_key_token)
|
||||
self.assertEqual(identity.id, TEST_USER.username)
|
||||
self.assertEqual(0, len(identity.provides))
|
||||
|
||||
# Break the cache.
|
||||
instance_keys.clear_cache()
|
||||
|
||||
# Ensure the key no longer works.
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(deleted_key_token)
|
||||
|
||||
def test_bad_token(self):
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token("some random token here")
|
||||
|
||||
def test_bad_bearer_token(self):
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token("Bearer: sometokenhere")
|
||||
|
||||
def test_bad_bearer_newline_token(self):
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token("\nBearer: dGVzdA")
|
||||
|
||||
def test_ensure_no_none(self):
|
||||
token_data = self._generate_token_data()
|
||||
none_token = self._generate_token(token_data, alg='none', private_key=None)
|
||||
|
||||
with self.assertRaises(InvalidJWTException):
|
||||
self._parse_token(none_token)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
unittest.main()
|
||||
|
||||
|
|
Reference in a new issue