19f7acf575
Moves all the external login services into a set of classes that share as much code as possible. These services are then registered on both the client and server, allowing us in the followup change to dynamically register new handlers
684 lines
26 KiB
Python
684 lines
26 KiB
Python
# coding=utf-8
|
|
|
|
import json as py_json
|
|
import time
|
|
import unittest
|
|
import base64
|
|
|
|
from urllib import urlencode
|
|
from urlparse import urlparse, urlunparse, parse_qs
|
|
from datetime import datetime, timedelta
|
|
from httmock import urlmatch, HTTMock
|
|
|
|
import jwt
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from flask import url_for
|
|
from jwkest.jwk import RSAKey
|
|
|
|
from app import app, oauth_login
|
|
from data import model
|
|
from data.database import ServiceKeyApprovalType
|
|
from endpoints import keyserver
|
|
from endpoints.api import api, api_bp
|
|
from endpoints.api.user import Signin
|
|
from endpoints.keyserver import jwk_with_kid
|
|
from endpoints.csrf import OAUTH_CSRF_TOKEN_NAME
|
|
from endpoints.web import web as web_bp
|
|
from endpoints.oauthlogin import oauthlogin as oauthlogin_bp
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from test.helpers import assert_action_logged
|
|
|
|
try:
|
|
app.register_blueprint(oauthlogin_bp, url_prefix='/oauth2')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
try:
|
|
app.register_blueprint(web_bp, url_prefix='')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
try:
|
|
app.register_blueprint(keyserver.key_server, url_prefix='')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
try:
|
|
app.register_blueprint(api_bp, url_prefix='/api')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
|
|
CSRF_TOKEN_KEY = '_csrf_token'
|
|
CSRF_TOKEN = '123csrfforme'
|
|
|
|
|
|
class EndpointTestCase(unittest.TestCase):
|
|
maxDiff = None
|
|
|
|
@staticmethod
|
|
def _add_csrf(without_csrf):
|
|
parts = urlparse(without_csrf)
|
|
query = parse_qs(parts[4])
|
|
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
|
|
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
self.setCsrfToken(CSRF_TOKEN)
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def setCsrfToken(self, token):
|
|
with self.app.session_transaction() as sess:
|
|
sess[CSRF_TOKEN_KEY] = token
|
|
sess[OAUTH_CSRF_TOKEN_NAME] = 'someoauthtoken'
|
|
|
|
def getResponse(self, resource_name, expected_code=200, **kwargs):
|
|
rv = self.app.get(url_for(resource_name, **kwargs))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def deleteResponse(self, resource_name, headers=None, expected_code=200, **kwargs):
|
|
headers = headers or {}
|
|
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def deleteEmptyResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
|
|
headers = headers or {}
|
|
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
self.assertEquals(rv.data, '') # ensure response body empty
|
|
return
|
|
|
|
def putResponse(self, resource_name, headers=None, data=None, expected_code=200, **kwargs):
|
|
headers = headers or {}
|
|
data = data or {}
|
|
rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def postResponse(self, resource_name, headers=None, form=None, with_csrf=True, expected_code=200, **kwargs):
|
|
headers = headers or {}
|
|
form = form or {}
|
|
url = url_for(resource_name, **kwargs)
|
|
if with_csrf:
|
|
url = EndpointTestCase._add_csrf(url)
|
|
|
|
rv = self.app.post(url, headers=headers, data=form)
|
|
if expected_code is not None:
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
|
|
return rv
|
|
|
|
def login(self, username, password):
|
|
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
|
|
data=py_json.dumps(dict(username=username, password=password)),
|
|
headers={"Content-Type": "application/json"})
|
|
self.assertEquals(rv.status_code, 200)
|
|
|
|
|
|
class OAuthLoginTestCase(EndpointTestCase):
|
|
def invoke_oauth_tests(self, callback_endpoint, attach_endpoint, service_name, service_ident,
|
|
new_username):
|
|
# Test callback.
|
|
created = self.invoke_oauth_test(callback_endpoint, service_name, service_ident, new_username)
|
|
|
|
# Delete the created user.
|
|
model.user.delete_user(created, [])
|
|
|
|
# Test attach.
|
|
self.login('devtable', 'password')
|
|
self.invoke_oauth_test(attach_endpoint, service_name, service_ident, 'devtable')
|
|
|
|
def invoke_oauth_test(self, endpoint_name, service_name, service_ident, username):
|
|
# No CSRF.
|
|
self.getResponse('oauthlogin.' + endpoint_name, expected_code=403)
|
|
|
|
# Invalid CSRF.
|
|
self.getResponse('oauthlogin.' + endpoint_name, state='somestate', expected_code=403)
|
|
|
|
# Valid CSRF, invalid code.
|
|
self.getResponse('oauthlogin.' + endpoint_name, state='someoauthtoken',
|
|
code='invalidcode', expected_code=400)
|
|
|
|
# Valid CSRF, valid code.
|
|
self.getResponse('oauthlogin.' + endpoint_name, state='someoauthtoken',
|
|
code='somecode', expected_code=302)
|
|
|
|
# Ensure the user was added/modified.
|
|
found_user = model.user.get_user(username)
|
|
self.assertIsNotNone(found_user)
|
|
|
|
federated_login = model.user.lookup_federated_login(found_user, service_name)
|
|
self.assertIsNotNone(federated_login)
|
|
self.assertEquals(federated_login.service_ident, service_ident)
|
|
return found_user
|
|
|
|
def test_google_oauth(self):
|
|
@urlmatch(netloc=r'accounts.google.com', path='/o/oauth2/token')
|
|
def account_handler(_, request):
|
|
if request.body.find("code=somecode") > 0:
|
|
content = {'access_token': 'someaccesstoken'}
|
|
return py_json.dumps(content)
|
|
else:
|
|
return {'status_code': 400, 'content': '{"message": "Invalid code"}'}
|
|
|
|
@urlmatch(netloc=r'www.googleapis.com', path='/oauth2/v1/userinfo')
|
|
def user_handler(_, __):
|
|
content = {
|
|
'id': 'someid',
|
|
'email': 'someemail@example.com',
|
|
'verified_email': True,
|
|
}
|
|
return py_json.dumps(content)
|
|
|
|
with HTTMock(account_handler, user_handler):
|
|
self.invoke_oauth_tests('google_oauth_callback', 'google_oauth_attach', 'google',
|
|
'someid', 'someemail')
|
|
|
|
def test_github_oauth(self):
|
|
@urlmatch(netloc=r'github.com', path='/login/oauth/access_token')
|
|
def account_handler(url, _):
|
|
if url.query.find("code=somecode") > 0:
|
|
content = {'access_token': 'someaccesstoken'}
|
|
return py_json.dumps(content)
|
|
else:
|
|
return {'status_code': 400, 'content': '{"message": "Invalid code"}'}
|
|
|
|
@urlmatch(netloc=r'github.com', path='/api/v3/user')
|
|
def user_handler(_, __):
|
|
content = {
|
|
'id': 'someid',
|
|
'login': 'someusername'
|
|
}
|
|
return py_json.dumps(content)
|
|
|
|
@urlmatch(netloc=r'github.com', path='/api/v3/user/emails')
|
|
def email_handler(_, __):
|
|
content = [{
|
|
'email': 'someemail@example.com',
|
|
'verified': True,
|
|
'primary': True,
|
|
}]
|
|
return py_json.dumps(content)
|
|
|
|
with HTTMock(account_handler, email_handler, user_handler):
|
|
self.invoke_oauth_tests('github_oauth_callback', 'github_oauth_attach', 'github',
|
|
'someid', 'someusername')
|
|
|
|
|
|
class WebEndpointTestCase(EndpointTestCase):
|
|
def test_index(self):
|
|
self.getResponse('web.index')
|
|
|
|
def test_robots(self):
|
|
self.getResponse('web.robots')
|
|
|
|
def test_sitemap(self):
|
|
self.getResponse('web.sitemap')
|
|
|
|
def test_repo_view(self):
|
|
self.getResponse('web.repository', path='devtable/simple')
|
|
|
|
def test_org_view(self):
|
|
self.getResponse('web.org_view', path='buynlarge')
|
|
|
|
def test_user_view(self):
|
|
self.getResponse('web.user_view', path='devtable')
|
|
|
|
def test_confirm_repo_email(self):
|
|
code = model.repository.create_email_authorization_for_repo('devtable', 'simple', 'foo@bar.com')
|
|
self.getResponse('web.confirm_repo_email', code=code.code)
|
|
|
|
found = model.repository.get_email_authorized_for_repo('devtable', 'simple', 'foo@bar.com')
|
|
self.assertTrue(found.confirmed)
|
|
|
|
def test_confirm_email(self):
|
|
user = model.user.get_user('devtable')
|
|
self.assertNotEquals(user.email, 'foo@bar.com')
|
|
|
|
code = model.user.create_confirm_email_code(user, 'foo@bar.com')
|
|
self.getResponse('web.confirm_email', code=code.code, expected_code=302)
|
|
|
|
user = model.user.get_user('devtable')
|
|
self.assertEquals(user.email, 'foo@bar.com')
|
|
|
|
def test_confirm_recovery(self):
|
|
# Try for an invalid code.
|
|
self.getResponse('web.confirm_recovery', code='someinvalidcode', expected_code=200)
|
|
|
|
# Create a valid code and try.
|
|
user = model.user.get_user('devtable')
|
|
code = model.user.create_reset_password_email_code(user.email)
|
|
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
|
|
|
|
def test_confirm_recovery_verified(self):
|
|
# Create a valid code and try.
|
|
user = model.user.get_user('devtable')
|
|
user.verified = False
|
|
user.save()
|
|
|
|
code = model.user.create_reset_password_email_code(user.email)
|
|
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
|
|
|
|
# Ensure the current user is the expected user and that they are verified.
|
|
user = model.user.get_user('devtable')
|
|
self.assertTrue(user.verified)
|
|
|
|
self.getResponse('web.receipt', expected_code=404) # Will 401 if no user.
|
|
|
|
def test_build_status_badge(self):
|
|
# Try for an invalid repository.
|
|
self.getResponse('web.build_status_badge', repository='foo/bar', expected_code=404)
|
|
|
|
# Try for a public repository.
|
|
self.getResponse('web.build_status_badge', repository='public/publicrepo')
|
|
|
|
# Try for an private repository.
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
expected_code=404)
|
|
|
|
# Try for an private repository with an invalid token.
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
token='sometoken', expected_code=404)
|
|
|
|
# Try for an private repository with a valid token.
|
|
repository = model.repository.get_repository('devtable', 'simple')
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
token=repository.badge_token)
|
|
|
|
def test_attach_custom_build_trigger(self):
|
|
self.getResponse('web.attach_custom_build_trigger', repository='foo/bar', expected_code=401)
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=401)
|
|
|
|
self.login('freshuser', 'password')
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=403)
|
|
|
|
self.login('devtable', 'password')
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=302)
|
|
|
|
def test_redirect_to_repository(self):
|
|
self.getResponse('web.redirect_to_repository', repository='foo/bar', expected_code=404)
|
|
self.getResponse('web.redirect_to_repository', repository='public/publicrepo', expected_code=302)
|
|
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=403)
|
|
|
|
self.login('devtable', 'password')
|
|
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=302)
|
|
|
|
def test_redirect_to_namespace(self):
|
|
self.getResponse('web.redirect_to_namespace', namespace='unknown', expected_code=404)
|
|
self.getResponse('web.redirect_to_namespace', namespace='devtable', expected_code=302)
|
|
self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)
|
|
|
|
|
|
class OAuthTestCase(EndpointTestCase):
|
|
def test_authorize_nologin(self):
|
|
form = {
|
|
'client_id': 'someclient',
|
|
'redirect_uri': 'http://localhost:5000/foobar',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=401)
|
|
|
|
def test_authorize_invalidclient(self):
|
|
self.login('devtable', 'password')
|
|
|
|
form = {
|
|
'client_id': 'someclient',
|
|
'redirect_uri': 'http://localhost:5000/foobar',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
|
|
self.assertEquals('http://localhost:5000/foobar?error=unauthorized_client', resp.headers['Location'])
|
|
|
|
def test_authorize_invalidscope(self):
|
|
self.login('devtable', 'password')
|
|
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'invalid:scope',
|
|
}
|
|
|
|
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
|
|
self.assertEquals('http://localhost:8000/o2c.html?error=invalid_scope', resp.headers['Location'])
|
|
|
|
def test_authorize_invalidredirecturi(self):
|
|
self.login('devtable', 'password')
|
|
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://some/invalid/uri',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=400)
|
|
|
|
def test_authorize_success(self):
|
|
self.login('devtable', 'password')
|
|
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
|
|
self.assertTrue('access_token=' in resp.headers['Location'])
|
|
|
|
def test_authorize_nocsrf(self):
|
|
self.login('devtable', 'password')
|
|
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403)
|
|
|
|
def test_authorize_nocsrf_withinvalidheader(self):
|
|
self.login('devtable', 'password')
|
|
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
headers = dict(authorization='Some random header')
|
|
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
|
|
|
|
def test_authorize_nocsrf_withbadheader(self):
|
|
self.login('devtable', 'password')
|
|
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
|
|
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
|
|
|
|
def test_authorize_nocsrf_correctheader(self):
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
# Try without the client id being in the whitelist.
|
|
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
|
|
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=403)
|
|
|
|
# Add the client ID to the whitelist and try again.
|
|
app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef']
|
|
|
|
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
|
|
resp = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=True, expected_code=302)
|
|
self.assertTrue('access_token=' in resp.headers['Location'])
|
|
|
|
def test_authorize_nocsrf_ratelimiting(self):
|
|
# Note: Defined in initdb.py
|
|
form = {
|
|
'client_id': 'deadbeef',
|
|
'redirect_uri': 'http://localhost:8000/o2c.html',
|
|
'scope': 'user:admin',
|
|
}
|
|
|
|
# Try without the client id being in the whitelist a few times, making sure we eventually get rate limited.
|
|
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
|
|
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
|
|
|
|
counter = 0
|
|
while True:
|
|
r = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=None)
|
|
self.assertNotEquals(200, r.status_code)
|
|
counter = counter + 1
|
|
if counter > 5:
|
|
self.fail('Exponential backoff did not fire')
|
|
|
|
if r.status_code == 429:
|
|
break
|
|
|
|
|
|
class KeyServerTestCase(EndpointTestCase):
|
|
def _get_test_jwt_payload(self):
|
|
return {
|
|
'iss': 'sample_service',
|
|
'aud': keyserver.JWT_AUDIENCE,
|
|
'exp': int(time.time()) + 60,
|
|
'iat': int(time.time()),
|
|
'nbf': int(time.time()),
|
|
}
|
|
|
|
def test_list_service_keys(self):
|
|
# Retrieve all the keys.
|
|
all_keys = model.service_keys.list_all_keys()
|
|
visible_jwks = [jwk_with_kid(key) for key in model.service_keys.list_service_keys('sample_service')]
|
|
|
|
invisible_jwks = []
|
|
for key in all_keys:
|
|
is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow()
|
|
if key.service != 'sample_service' or key.approval is None or is_expired:
|
|
invisible_jwks.append(key.jwk)
|
|
|
|
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
|
|
jwkset = py_json.loads(rv)
|
|
|
|
# Make sure the hidden keys are not returned and the visible ones are returned.
|
|
self.assertTrue(len(visible_jwks) > 0)
|
|
self.assertTrue(len(invisible_jwks) > 0)
|
|
self.assertEquals(len(visible_jwks), len(jwkset['keys']))
|
|
|
|
for jwk in jwkset['keys']:
|
|
self.assertIn(jwk, visible_jwks)
|
|
self.assertNotIn(jwk, invisible_jwks)
|
|
|
|
|
|
def test_get_service_key(self):
|
|
# 200 for an approved key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')
|
|
|
|
# 409 for an unapproved key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
|
|
expected_code=409)
|
|
|
|
# 404 for a non-existant key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
|
|
expected_code=404)
|
|
|
|
# 403 for an approved but expired key that is inside of the 2 week window.
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid6',
|
|
expected_code=403)
|
|
|
|
# 404 for an approved, expired key that is outside of the 2 week window.
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid7',
|
|
expected_code=404)
|
|
|
|
def test_put_service_key(self):
|
|
# No Authorization header should yield a 400
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
|
|
expected_code=400)
|
|
|
|
# Mint a JWT with our test payload
|
|
private_key = RSA.generate(2048)
|
|
jwk = RSAKey(key=private_key.publickey()).serialize()
|
|
payload = self._get_test_jwt_payload()
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
|
|
|
|
# Invalid service name should yield a 400.
|
|
self.putResponse('key_server.put_service_key', service='sample service', kid='kid420',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=400)
|
|
|
|
# Publish a new key
|
|
with assert_action_logged('service_key_create'):
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=202)
|
|
|
|
# Ensure that the key exists but is unapproved.
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
|
|
expected_code=409)
|
|
|
|
# Attempt to rotate the key. Since not approved, it will fail.
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=403)
|
|
|
|
# Approve the key.
|
|
model.service_keys.approve_service_key('kid420', 1, ServiceKeyApprovalType.SUPERUSER)
|
|
|
|
# Rotate that new key
|
|
with assert_action_logged('service_key_rotate'):
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=200)
|
|
|
|
# Rotation should only work when signed by the previous key
|
|
private_key = RSA.generate(2048)
|
|
jwk = RSAKey(key=private_key.publickey()).serialize()
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=403)
|
|
|
|
|
|
def test_attempt_delete_service_key_with_no_kid_signer(self):
|
|
# Generate two keys, approving the first.
|
|
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
|
|
|
|
# Mint a JWT with our test payload but *no kid*.
|
|
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={})
|
|
|
|
# Using the credentials of our key, attempt to delete our unapproved key
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=400, service='sample_service', kid='first')
|
|
|
|
|
|
def test_attempt_delete_service_key_with_expired_key(self):
|
|
# Generate two keys, approving the first.
|
|
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
|
|
model.service_keys.approve_service_key('first', 1, ServiceKeyApprovalType.SUPERUSER)
|
|
model.service_keys.generate_service_key('sample_service', None, kid='second')
|
|
|
|
# Mint a JWT with our test payload
|
|
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'first'})
|
|
|
|
# Set the expiration of the first to now - some time.
|
|
model.service_keys.set_key_expiration('first', datetime.utcnow() - timedelta(seconds=100))
|
|
|
|
# Using the credentials of our second key, attempt to delete our unapproved key
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=403, service='sample_service', kid='second')
|
|
|
|
# Set the expiration to the future and delete the key.
|
|
model.service_keys.set_key_expiration('first', datetime.utcnow() + timedelta(seconds=100))
|
|
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteEmptyResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='second')
|
|
|
|
|
|
def test_delete_unapproved_service_key(self):
|
|
# No Authorization header should yield a 400
|
|
self.deleteResponse('key_server.delete_service_key', expected_code=400,
|
|
service='sample_service', kid='kid1')
|
|
|
|
# Generate an unapproved key.
|
|
private_key, _ = model.service_keys.generate_service_key('sample_service', None,
|
|
kid='unapprovedkeyhere')
|
|
|
|
# Mint a JWT with our test payload
|
|
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'unapprovedkeyhere'})
|
|
|
|
# Delete our unapproved key with itself.
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteEmptyResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='unapprovedkeyhere')
|
|
|
|
|
|
def test_delete_chained_service_key(self):
|
|
# No Authorization header should yield a 400
|
|
self.deleteResponse('key_server.delete_service_key', expected_code=400,
|
|
service='sample_service', kid='kid1')
|
|
|
|
# Generate two keys.
|
|
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
|
|
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
|
|
|
|
# Mint a JWT with our test payload
|
|
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'kid123'})
|
|
|
|
# Using the credentials of our second key, attempt tp delete our unapproved key
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=403, service='sample_service', kid='kid321')
|
|
|
|
# Approve the second key.
|
|
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
|
|
|
|
# Using the credentials of our approved key, delete our unapproved key
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteEmptyResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='kid321')
|
|
|
|
# Attempt to delete a key signed by a key from a different service
|
|
bad_token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'kid5'})
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % bad_token},
|
|
expected_code=403, service='sample_service', kid='kid123')
|
|
|
|
# Delete a self-signed, approved key
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteEmptyResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='kid123')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|