Fix key server to not list expired keys
Fixes the key server to not list expire keys and by default not return expired or unapproved keys unless explicitly requested. Fixes #1430
This commit is contained in:
parent
f0af2ca9c3
commit
6e2df3b339
4 changed files with 129 additions and 34 deletions
|
@ -6,6 +6,7 @@ import unittest
|
|||
|
||||
from urllib import urlencode
|
||||
from urlparse import urlparse, urlunparse, parse_qs
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import jwt
|
||||
|
||||
|
@ -201,22 +202,27 @@ class KeyServerTestCase(EndpointTestCase):
|
|||
}
|
||||
|
||||
def test_list_service_keys(self):
|
||||
unapproved_key = model.service_keys.get_service_key(kid='kid3')
|
||||
expired_key = model.service_keys.get_service_key(kid='kid6')
|
||||
# Retrieve all the keys.
|
||||
all_keys = model.service_keys.list_all_keys()
|
||||
visible_jwks = [key.jwk for key in model.service_keys.list_service_keys('sample_service')]
|
||||
|
||||
invisible_jwks = []
|
||||
for key in all_keys:
|
||||
is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow()
|
||||
if key.service != 'sample_service' or key.approval is None or is_expired:
|
||||
invisible_jwks.append(key.jwk)
|
||||
|
||||
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
|
||||
jwkset = py_json.loads(rv)
|
||||
|
||||
# Make sure the hidden keys are not returned and the visible ones are returned.
|
||||
self.assertTrue(len(jwkset['keys']) > 0)
|
||||
expired_key_found = False
|
||||
self.assertTrue(len(visible_jwks) > 0)
|
||||
self.assertTrue(len(invisible_jwks) > 0)
|
||||
self.assertEquals(len(visible_jwks), len(jwkset['keys']))
|
||||
|
||||
for jwk in jwkset['keys']:
|
||||
self.assertNotEquals(jwk, unapproved_key.jwk)
|
||||
|
||||
if expired_key.jwk == jwk:
|
||||
expired_key_found = True
|
||||
|
||||
self.assertTrue(expired_key_found)
|
||||
self.assertIn(jwk, visible_jwks)
|
||||
self.assertNotIn(jwk, invisible_jwks)
|
||||
|
||||
|
||||
def test_get_service_key(self):
|
||||
|
@ -269,6 +275,17 @@ class KeyServerTestCase(EndpointTestCase):
|
|||
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
|
||||
expected_code=409)
|
||||
|
||||
# Attempt to rotate the key. Since not approved, it will fail.
|
||||
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
||||
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
||||
headers={
|
||||
'Authorization': 'Bearer %s' % token,
|
||||
'Content-Type': 'application/json',
|
||||
}, data=jwk, expected_code=403)
|
||||
|
||||
# Approve the key.
|
||||
model.service_keys.approve_service_key('kid420', 1, ServiceKeyApprovalType.SUPERUSER)
|
||||
|
||||
# Rotate that new key
|
||||
with assert_action_logged('service_key_rotate'):
|
||||
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
||||
|
@ -289,20 +306,88 @@ class KeyServerTestCase(EndpointTestCase):
|
|||
}, data=jwk, expected_code=403)
|
||||
|
||||
|
||||
def test_delete_service_key(self):
|
||||
def test_attempt_delete_service_key_with_no_kid_signer(self):
|
||||
# Generate two keys, approving the first.
|
||||
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
|
||||
|
||||
# Mint a JWT with our test payload but *no kid*.
|
||||
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
||||
headers={})
|
||||
|
||||
# Using the credentials of our key, attempt to delete our unapproved key
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
headers={'Authorization': 'Bearer %s' % token},
|
||||
expected_code=400, service='sample_service', kid='first')
|
||||
|
||||
|
||||
def test_attempt_delete_service_key_with_expired_key(self):
|
||||
# Generate two keys, approving the first.
|
||||
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
|
||||
model.service_keys.approve_service_key('first', 1, ServiceKeyApprovalType.SUPERUSER)
|
||||
model.service_keys.generate_service_key('sample_service', None, kid='second')
|
||||
|
||||
# Mint a JWT with our test payload
|
||||
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
||||
headers={'kid': 'first'})
|
||||
|
||||
# Set the expiration of the first to now - some time.
|
||||
model.service_keys.set_key_expiration('first', datetime.utcnow() - timedelta(seconds=100))
|
||||
|
||||
# Using the credentials of our second key, attempt to delete our unapproved key
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
headers={'Authorization': 'Bearer %s' % token},
|
||||
expected_code=403, service='sample_service', kid='second')
|
||||
|
||||
# Set the expiration to the future and delete the key.
|
||||
model.service_keys.set_key_expiration('first', datetime.utcnow() + timedelta(seconds=100))
|
||||
|
||||
with assert_action_logged('service_key_delete'):
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
headers={'Authorization': 'Bearer %s' % token},
|
||||
expected_code=204, service='sample_service', kid='second')
|
||||
|
||||
|
||||
def test_delete_unapproved_service_key(self):
|
||||
# No Authorization header should yield a 400
|
||||
self.deleteResponse('key_server.delete_service_key', expected_code=400,
|
||||
service='sample_service', kid='kid1')
|
||||
|
||||
# Generate two keys and approve one
|
||||
# Generate an unapproved key.
|
||||
private_key, _ = model.service_keys.generate_service_key('sample_service', None,
|
||||
kid='unapprovedkeyhere')
|
||||
|
||||
# Mint a JWT with our test payload
|
||||
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
||||
headers={'kid': 'unapprovedkeyhere'})
|
||||
|
||||
# Delete our unapproved key with itself.
|
||||
with assert_action_logged('service_key_delete'):
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
headers={'Authorization': 'Bearer %s' % token},
|
||||
expected_code=204, service='sample_service', kid='unapprovedkeyhere')
|
||||
|
||||
|
||||
def test_delete_chained_service_key(self):
|
||||
# No Authorization header should yield a 400
|
||||
self.deleteResponse('key_server.delete_service_key', expected_code=400,
|
||||
service='sample_service', kid='kid1')
|
||||
|
||||
# Generate two keys.
|
||||
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
|
||||
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
|
||||
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
|
||||
|
||||
# Mint a JWT with our test payload
|
||||
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
||||
headers={'kid': 'kid123'})
|
||||
|
||||
# Using the credentials of our second key, attempt tp delete our unapproved key
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
headers={'Authorization': 'Bearer %s' % token},
|
||||
expected_code=403, service='sample_service', kid='kid321')
|
||||
|
||||
# Approve the second key.
|
||||
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
|
||||
|
||||
# Using the credentials of our approved key, delete our unapproved key
|
||||
with assert_action_logged('service_key_delete'):
|
||||
self.deleteResponse('key_server.delete_service_key',
|
||||
|
|
Reference in a new issue