303 lines
12 KiB
Python
303 lines
12 KiB
Python
# coding=utf-8
|
|
|
|
import json as py_json
|
|
import time
|
|
import unittest
|
|
|
|
from urllib import urlencode
|
|
from urlparse import urlparse, urlunparse, parse_qs
|
|
|
|
import jwt
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from flask import url_for
|
|
from jwkest.jwk import RSAKey
|
|
|
|
from app import app
|
|
from data import model
|
|
from data.database import ServiceKeyApprovalType
|
|
from endpoints import key_server
|
|
from endpoints.api import api, api_bp
|
|
from endpoints.api.user import Signin
|
|
from endpoints.web import web as web_bp
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from test.helpers import assert_action_logged
|
|
|
|
|
|
try:
|
|
app.register_blueprint(web_bp, url_prefix='')
|
|
app.register_blueprint(key_server.key_server, url_prefix='')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
try:
|
|
app.register_blueprint(api_bp, url_prefix='/api')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
|
|
CSRF_TOKEN_KEY = '_csrf_token'
|
|
CSRF_TOKEN = '123csrfforme'
|
|
|
|
|
|
class EndpointTestCase(unittest.TestCase):
|
|
maxDiff = None
|
|
|
|
@staticmethod
|
|
def _add_csrf(without_csrf):
|
|
parts = urlparse(without_csrf)
|
|
query = parse_qs(parts[4])
|
|
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
|
|
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
self.setCsrfToken(CSRF_TOKEN)
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def setCsrfToken(self, token):
|
|
with self.app.session_transaction() as sess:
|
|
sess[CSRF_TOKEN_KEY] = token
|
|
|
|
def getResponse(self, resource_name, expected_code=200, **kwargs):
|
|
rv = self.app.get(url_for(resource_name, **kwargs))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def deleteResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
|
|
headers = headers or {}
|
|
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def putResponse(self, resource_name, headers=None, data=None, expected_code=204, **kwargs):
|
|
headers = headers or {}
|
|
data = data or {}
|
|
rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def login(self, username, password):
|
|
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
|
|
data=py_json.dumps(dict(username=username, password=password)),
|
|
headers={"Content-Type": "application/json"})
|
|
self.assertEquals(rv.status_code, 200)
|
|
|
|
|
|
class WebEndpointTestCase(EndpointTestCase):
|
|
def test_index(self):
|
|
self.getResponse('web.index')
|
|
|
|
def test_repo_view(self):
|
|
self.getResponse('web.repository', path='devtable/simple')
|
|
|
|
def test_org_view(self):
|
|
self.getResponse('web.org_view', path='buynlarge')
|
|
|
|
def test_user_view(self):
|
|
self.getResponse('web.user_view', path='devtable')
|
|
|
|
def test_confirm_repo_email(self):
|
|
code = model.repository.create_email_authorization_for_repo('devtable', 'simple', 'foo@bar.com')
|
|
self.getResponse('web.confirm_repo_email', code=code.code)
|
|
|
|
found = model.repository.get_email_authorized_for_repo('devtable', 'simple', 'foo@bar.com')
|
|
self.assertTrue(found.confirmed)
|
|
|
|
def test_confirm_email(self):
|
|
user = model.user.get_user('devtable')
|
|
self.assertNotEquals(user.email, 'foo@bar.com')
|
|
|
|
code = model.user.create_confirm_email_code(user, 'foo@bar.com')
|
|
self.getResponse('web.confirm_email', code=code.code, expected_code=302)
|
|
|
|
user = model.user.get_user('devtable')
|
|
self.assertEquals(user.email, 'foo@bar.com')
|
|
|
|
def test_confirm_recovery(self):
|
|
# Try for an invalid code.
|
|
self.getResponse('web.confirm_recovery', code='someinvalidcode', expected_code=200)
|
|
|
|
# Create a valid code and try.
|
|
user = model.user.get_user('devtable')
|
|
code = model.user.create_reset_password_email_code(user.email)
|
|
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
|
|
|
|
def test_confirm_recovery_verified(self):
|
|
# Create a valid code and try.
|
|
user = model.user.get_user('devtable')
|
|
user.verified = False
|
|
user.save()
|
|
|
|
code = model.user.create_reset_password_email_code(user.email)
|
|
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
|
|
|
|
# Ensure the current user is the expected user and that they are verified.
|
|
user = model.user.get_user('devtable')
|
|
self.assertTrue(user.verified)
|
|
|
|
self.getResponse('web.receipt', expected_code=404) # Will 401 if no user.
|
|
|
|
def test_build_status_badge(self):
|
|
# Try for an invalid repository.
|
|
self.getResponse('web.build_status_badge', repository='foo/bar', expected_code=404)
|
|
|
|
# Try for a public repository.
|
|
self.getResponse('web.build_status_badge', repository='public/publicrepo')
|
|
|
|
# Try for an private repository.
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
expected_code=404)
|
|
|
|
# Try for an private repository with an invalid token.
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
token='sometoken', expected_code=404)
|
|
|
|
# Try for an private repository with a valid token.
|
|
repository = model.repository.get_repository('devtable', 'simple')
|
|
self.getResponse('web.build_status_badge', repository='devtable/simple',
|
|
token=repository.badge_token)
|
|
|
|
def test_attach_custom_build_trigger(self):
|
|
self.getResponse('web.attach_custom_build_trigger', repository='foo/bar', expected_code=401)
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=401)
|
|
|
|
self.login('freshuser', 'password')
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=403)
|
|
|
|
self.login('devtable', 'password')
|
|
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=302)
|
|
|
|
def test_redirect_to_repository(self):
|
|
self.getResponse('web.redirect_to_repository', repository='foo/bar', expected_code=404)
|
|
self.getResponse('web.redirect_to_repository', repository='public/publicrepo', expected_code=302)
|
|
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=404)
|
|
|
|
self.login('devtable', 'password')
|
|
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=302)
|
|
|
|
def test_redirect_to_namespace(self):
|
|
self.getResponse('web.redirect_to_namespace', namespace='unknown', expected_code=404)
|
|
self.getResponse('web.redirect_to_namespace', namespace='devtable', expected_code=302)
|
|
self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)
|
|
|
|
|
|
class KeyServerTestCase(EndpointTestCase):
|
|
def _get_test_jwt_payload(self):
|
|
return {
|
|
'iss': 'sample_service',
|
|
'aud': key_server.JWT_AUDIENCE,
|
|
'exp': int(time.time()) + 60,
|
|
'iat': int(time.time()),
|
|
'nbf': int(time.time()),
|
|
}
|
|
|
|
def test_list_service_keys(self):
|
|
unapproved_key = model.service_keys.get_service_key(kid='kid3')
|
|
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
|
|
jwkset = py_json.loads(rv)
|
|
|
|
# Make sure the unapproved key isn't returned in our results
|
|
self.assertTrue(len(jwkset['keys']) > 0)
|
|
for jwk in jwkset['keys']:
|
|
self.assertNotEquals(jwk, unapproved_key.jwk)
|
|
|
|
def test_get_service_key(self):
|
|
# 200 for an approved key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')
|
|
|
|
# 409 for an unapproved key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
|
|
expected_code=409)
|
|
|
|
# 404 for a non-existant key
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
|
|
expected_code=404)
|
|
|
|
def test_put_service_key(self):
|
|
# No Authorization header should yield a 400
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
|
|
expected_code=400)
|
|
|
|
# Mint a JWT with our test payload
|
|
private_key = RSA.generate(2048)
|
|
jwk = RSAKey(key=private_key.publickey()).serialize()
|
|
payload = self._get_test_jwt_payload()
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
|
|
|
|
# Publish a new key
|
|
with assert_action_logged('service_key_create'):
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=202)
|
|
|
|
# Ensure that the key exists but is unapproved.
|
|
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
|
|
expected_code=409)
|
|
|
|
# Rotate that new key
|
|
with assert_action_logged('service_key_rotate'):
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=200)
|
|
|
|
# Rotation should only work when signed by the previous key
|
|
private_key = RSA.generate(2048)
|
|
jwk = RSAKey(key=private_key.publickey()).serialize()
|
|
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
|
|
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
|
|
headers={
|
|
'Authorization': 'Bearer %s' % token,
|
|
'Content-Type': 'application/json',
|
|
}, data=jwk, expected_code=403)
|
|
|
|
|
|
def test_delete_service_key(self):
|
|
# No Authorization header should yield a 400
|
|
self.deleteResponse('key_server.delete_service_key', expected_code=400,
|
|
service='sample_service', kid='kid1')
|
|
|
|
# Generate two keys and approve one
|
|
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
|
|
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
|
|
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
|
|
|
|
# Mint a JWT with our test payload
|
|
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'kid123'})
|
|
|
|
# Using the credentials of our approved key, delete our unapproved key
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='kid321')
|
|
|
|
# Attempt to delete a key signed by a key from a different service
|
|
bad_token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
|
|
headers={'kid': 'kid5'})
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % bad_token},
|
|
expected_code=403, service='sample_service', kid='kid123')
|
|
|
|
# Delete a self-signed, approved key
|
|
with assert_action_logged('service_key_delete'):
|
|
self.deleteResponse('key_server.delete_service_key',
|
|
headers={'Authorization': 'Bearer %s' % token},
|
|
expected_code=204, service='sample_service', kid='kid123')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|