keyserver: tests!

This commit is contained in:
Jimmy Zelinskie 2016-04-06 20:03:48 -04:00 committed by Jimmy Zelinskie
parent d19eb16b45
commit cfc15746a6
2 changed files with 121 additions and 12 deletions

View file

@ -26,13 +26,10 @@ JWT_HEADER_NAME = 'Authorization'
JWT_AUDIENCE = app.config['PREFERRED_URL_SCHEME'] + '://' + app.config['SERVER_HOSTNAME'] JWT_AUDIENCE = app.config['PREFERRED_URL_SCHEME'] + '://' + app.config['SERVER_HOSTNAME']
def _validate_jwk(jwk, kid): def _validate_jwk(jwk):
if 'kty' not in jwk: if 'kty' not in jwk:
abort(400) abort(400)
if 'kid' not in jwk or jwk['kid'] != kid:
abort(400)
if jwk['kty'] == 'EC': if jwk['kty'] == 'EC':
if 'x' not in jwk or 'y' not in jwk: if 'x' not in jwk or 'y' not in jwk:
abort(400) abort(400)
@ -129,7 +126,7 @@ def put_service_key(service, kid):
encoded_jwt = match.group(1) encoded_jwt = match.group(1)
_validate_jwk(jwk, kid) _validate_jwk(jwk)
signer_kid = _signer_kid(encoded_jwt) signer_kid = _signer_kid(encoded_jwt)
@ -196,10 +193,10 @@ def delete_service_key(service, kid):
_validate_jwt(encoded_jwt, signer_key.jwk, service) _validate_jwt(encoded_jwt, signer_key.jwk, service)
try: try:
data.model.service_keys.delete_service_key(service, kid) data.model.service_keys.delete_service_key(kid)
except data.model.ServiceKeyDoesNotExist: except data.model.ServiceKeyDoesNotExist:
abort(404) abort(404)
return make_response('', 200) return make_response('', 204)
abort(403) abort(403)

View file

@ -1,15 +1,22 @@
# coding=utf-8 # coding=utf-8
import unittest
import json as py_json import json as py_json
import time
import unittest
from urllib import urlencode from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs from urlparse import urlparse, urlunparse, parse_qs
import jwt
from Crypto.PublicKey import RSA
from flask import url_for from flask import url_for
from jwkest.jwk import RSAKey
from app import app from app import app
from data import model from data import model
from data.database import ServiceKeyApprovalType
from endpoints import key_server
from endpoints.api import api, api_bp from endpoints.api import api, api_bp
from endpoints.api.user import Signin from endpoints.api.user import Signin
from endpoints.web import web as web_bp from endpoints.web import web as web_bp
@ -18,6 +25,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing
try: try:
app.register_blueprint(web_bp, url_prefix='') app.register_blueprint(web_bp, url_prefix='')
app.register_blueprint(key_server.key_server, url_prefix='')
except ValueError: except ValueError:
# This blueprint was already registered # This blueprint was already registered
pass pass
@ -32,6 +40,7 @@ except ValueError:
CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme' CSRF_TOKEN = '123csrfforme'
class EndpointTestCase(unittest.TestCase): class EndpointTestCase(unittest.TestCase):
maxDiff = None maxDiff = None
@ -62,6 +71,19 @@ class EndpointTestCase(unittest.TestCase):
self.assertEquals(rv.status_code, expected_code) self.assertEquals(rv.status_code, expected_code)
return rv.data return rv.data
def deleteResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
headers = headers or {}
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, headers=None, data=None, expected_code=204, **kwargs):
headers = headers or {}
data = data or {}
rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def login(self, username, password): def login(self, username, password):
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)), rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
data=py_json.dumps(dict(username=username, password=password)), data=py_json.dumps(dict(username=username, password=password)),
@ -171,17 +193,107 @@ class WebEndpointTestCase(EndpointTestCase):
class KeyServerTestCase(EndpointTestCase): class KeyServerTestCase(EndpointTestCase):
_test_jwt_payload = {
'iss': 'sample_service',
'aud': key_server.JWT_AUDIENCE,
'exp': int(time.time()) + 60,
'iat': int(time.time()),
'nbf': int(time.time()),
'kid': 'kid123',
}
def test_list_service_keys(self): def test_list_service_keys(self):
pass unapproved_key = model.service_keys.get_service_key(kid='kid3')
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
jwkset = py_json.loads(rv)
# Make sure the unapproved key isn't returned in our results
for jwk in jwkset['keys']:
self.assertTrue(jwk != unapproved_key.jwk)
def test_get_service_key(self): def test_get_service_key(self):
pass # 200 for an approved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')
# 409 for an unapproved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
expected_code=409)
# 404 for a non-existant key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
expected_code=404)
def test_put_service_key(self): def test_put_service_key(self):
pass # No Authorization header should yield a 400
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
expected_code=400)
# Mint a JWT with our test payload
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
payload = self._test_jwt_payload
payload.pop('kid')
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
# Publish a new key
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=202)
# Rotate that new key
payload['kid'] = 'kid420'
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=200)
# Rotation should only work when signed by the previous key
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=403)
def test_delete_service_key(self): def test_delete_service_key(self):
pass # No Authorization header should yield a 400
self.deleteResponse('key_server.delete_service_key', expected_code=400,
service='sample_service', kid='kid1')
# Generate two keys and approve one
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
# Mint a JWT with our test payload
token = jwt.encode(self._test_jwt_payload, private_key.exportKey('PEM'), 'RS256')
# Using the credentials of our approved key, delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid321')
# Attempt to delete a key signed by a key from a different service
bad_payload = self._test_jwt_payload
bad_payload['kid'] = 'kid5'
bad_token = jwt.encode(self._test_jwt_payload, private_key.exportKey('PEM'), 'RS256')
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % bad_token},
expected_code=403, service='sample_service', kid='kid123')
# Delete a self-signed, approved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid123')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()