diff --git a/endpoints/key_server.py b/endpoints/key_server.py index 8b7ca1596..045976615 100644 --- a/endpoints/key_server.py +++ b/endpoints/key_server.py @@ -26,13 +26,10 @@ JWT_HEADER_NAME = 'Authorization' JWT_AUDIENCE = app.config['PREFERRED_URL_SCHEME'] + '://' + app.config['SERVER_HOSTNAME'] -def _validate_jwk(jwk, kid): +def _validate_jwk(jwk): if 'kty' not in jwk: abort(400) - if 'kid' not in jwk or jwk['kid'] != kid: - abort(400) - if jwk['kty'] == 'EC': if 'x' not in jwk or 'y' not in jwk: abort(400) @@ -129,7 +126,7 @@ def put_service_key(service, kid): encoded_jwt = match.group(1) - _validate_jwk(jwk, kid) + _validate_jwk(jwk) signer_kid = _signer_kid(encoded_jwt) @@ -196,10 +193,10 @@ def delete_service_key(service, kid): _validate_jwt(encoded_jwt, signer_key.jwk, service) try: - data.model.service_keys.delete_service_key(service, kid) + data.model.service_keys.delete_service_key(kid) except data.model.ServiceKeyDoesNotExist: abort(404) - return make_response('', 200) + return make_response('', 204) abort(403) diff --git a/test/test_endpoints.py b/test/test_endpoints.py index aa86318a4..8b80b08c6 100644 --- a/test/test_endpoints.py +++ b/test/test_endpoints.py @@ -1,15 +1,22 @@ # coding=utf-8 -import unittest import json as py_json +import time +import unittest from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs +import jwt + +from Crypto.PublicKey import RSA from flask import url_for +from jwkest.jwk import RSAKey from app import app from data import model +from data.database import ServiceKeyApprovalType +from endpoints import key_server from endpoints.api import api, api_bp from endpoints.api.user import Signin from endpoints.web import web as web_bp @@ -18,6 +25,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing try: app.register_blueprint(web_bp, url_prefix='') + app.register_blueprint(key_server.key_server, url_prefix='') except ValueError: # This blueprint was already registered pass @@ -32,6 +40,7 @@ except ValueError: CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN = '123csrfforme' + class EndpointTestCase(unittest.TestCase): maxDiff = None @@ -62,6 +71,19 @@ class EndpointTestCase(unittest.TestCase): self.assertEquals(rv.status_code, expected_code) return rv.data + def deleteResponse(self, resource_name, headers=None, expected_code=204, **kwargs): + headers = headers or {} + rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def putResponse(self, resource_name, headers=None, data=None, expected_code=204, **kwargs): + headers = headers or {} + data = data or {} + rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + def login(self, username, password): rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)), data=py_json.dumps(dict(username=username, password=password)), @@ -171,17 +193,107 @@ class WebEndpointTestCase(EndpointTestCase): class KeyServerTestCase(EndpointTestCase): + _test_jwt_payload = { + 'iss': 'sample_service', + 'aud': key_server.JWT_AUDIENCE, + 'exp': int(time.time()) + 60, + 'iat': int(time.time()), + 'nbf': int(time.time()), + 'kid': 'kid123', + } + def test_list_service_keys(self): - pass + unapproved_key = model.service_keys.get_service_key(kid='kid3') + rv = self.getResponse('key_server.list_service_keys', service='sample_service') + jwkset = py_json.loads(rv) + + # Make sure the unapproved key isn't returned in our results + for jwk in jwkset['keys']: + self.assertTrue(jwk != unapproved_key.jwk) def test_get_service_key(self): - pass + # 200 for an approved key + self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1') + + # 409 for an unapproved key + self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3', + expected_code=409) + + # 404 for a non-existant key + self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999', + expected_code=404) def test_put_service_key(self): - pass + # No Authorization header should yield a 400 + self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420', + expected_code=400) + + # Mint a JWT with our test payload + private_key = RSA.generate(2048) + jwk = RSAKey(key=private_key.publickey()).serialize() + payload = self._test_jwt_payload + payload.pop('kid') + token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256') + + # Publish a new key + self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420', + headers={ + 'Authorization': 'Bearer %s' % token, + 'Content-Type': 'application/json', + }, data=jwk, expected_code=202) + + # Rotate that new key + payload['kid'] = 'kid420' + token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256') + self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969', + headers={ + 'Authorization': 'Bearer %s' % token, + 'Content-Type': 'application/json', + }, data=jwk, expected_code=200) + + # Rotation should only work when signed by the previous key + private_key = RSA.generate(2048) + jwk = RSAKey(key=private_key.publickey()).serialize() + token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256') + self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969', + headers={ + 'Authorization': 'Bearer %s' % token, + 'Content-Type': 'application/json', + }, data=jwk, expected_code=403) def test_delete_service_key(self): - pass + # No Authorization header should yield a 400 + self.deleteResponse('key_server.delete_service_key', expected_code=400, + service='sample_service', kid='kid1') + + # Generate two keys and approve one + private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123') + model.service_keys.generate_service_key('sample_service', None, kid='kid321') + model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER) + + # Mint a JWT with our test payload + token = jwt.encode(self._test_jwt_payload, private_key.exportKey('PEM'), 'RS256') + + # Using the credentials of our approved key, delete our unapproved key + self.deleteResponse('key_server.delete_service_key', + headers={'Authorization': 'Bearer %s' % token}, + expected_code=204, service='sample_service', kid='kid321') + + # Attempt to delete a key signed by a key from a different service + bad_payload = self._test_jwt_payload + bad_payload['kid'] = 'kid5' + bad_token = jwt.encode(self._test_jwt_payload, private_key.exportKey('PEM'), 'RS256') + self.deleteResponse('key_server.delete_service_key', + headers={'Authorization': 'Bearer %s' % bad_token}, + expected_code=403, service='sample_service', kid='kid123') + + # Delete a self-signed, approved key + self.deleteResponse('key_server.delete_service_key', + headers={'Authorization': 'Bearer %s' % token}, + expected_code=204, service='sample_service', kid='kid123') + + + if __name__ == '__main__': unittest.main()