This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_endpoints.py

567 lines
22 KiB
Python
Raw Normal View History

# coding=utf-8
import json as py_json
2016-04-07 00:03:48 +00:00
import time
import unittest
import base64
2016-04-05 16:40:50 +00:00
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from datetime import datetime, timedelta
2016-04-05 16:40:50 +00:00
2016-04-07 00:03:48 +00:00
import jwt
from Crypto.PublicKey import RSA
from flask import url_for
2016-04-07 00:03:48 +00:00
from jwkest.jwk import RSAKey
2016-04-05 16:40:50 +00:00
from app import app
2016-04-05 16:40:50 +00:00
from data import model
2016-04-07 00:03:48 +00:00
from data.database import ServiceKeyApprovalType
from endpoints import key_server
from endpoints.api import api, api_bp
from endpoints.api.user import Signin
2016-04-05 16:40:50 +00:00
from endpoints.web import web as web_bp
from initdb import setup_database_for_testing, finished_database_for_testing
from test.helpers import assert_action_logged
try:
app.register_blueprint(web_bp, url_prefix='')
2016-04-07 00:03:48 +00:00
app.register_blueprint(key_server.key_server, url_prefix='')
except ValueError:
# This blueprint was already registered
pass
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
2016-04-07 00:03:48 +00:00
class EndpointTestCase(unittest.TestCase):
maxDiff = None
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
def getResponse(self, resource_name, expected_code=200, **kwargs):
rv = self.app.get(url_for(resource_name, **kwargs))
self.assertEquals(rv.status_code, expected_code)
return rv.data
2016-04-07 00:03:48 +00:00
def deleteResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
headers = headers or {}
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, headers=None, data=None, expected_code=204, **kwargs):
headers = headers or {}
data = data or {}
rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def postResponse(self, resource_name, headers=None, form=None, with_csrf=True, expected_code=200, **kwargs):
headers = headers or {}
form = form or {}
url = url_for(resource_name, **kwargs)
if with_csrf:
url = EndpointTestCase._add_csrf(url)
rv = self.app.post(url, headers=headers, data=form)
if expected_code is not None:
self.assertEquals(rv.status_code, expected_code)
return rv
def login(self, username, password):
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
data=py_json.dumps(dict(username=username, password=password)),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, 200)
class WebEndpointTestCase(EndpointTestCase):
def test_index(self):
self.getResponse('web.index')
def test_repo_view(self):
self.getResponse('web.repository', path='devtable/simple')
def test_org_view(self):
self.getResponse('web.org_view', path='buynlarge')
def test_user_view(self):
self.getResponse('web.user_view', path='devtable')
def test_confirm_repo_email(self):
code = model.repository.create_email_authorization_for_repo('devtable', 'simple', 'foo@bar.com')
self.getResponse('web.confirm_repo_email', code=code.code)
found = model.repository.get_email_authorized_for_repo('devtable', 'simple', 'foo@bar.com')
self.assertTrue(found.confirmed)
def test_confirm_email(self):
user = model.user.get_user('devtable')
self.assertNotEquals(user.email, 'foo@bar.com')
code = model.user.create_confirm_email_code(user, 'foo@bar.com')
self.getResponse('web.confirm_email', code=code.code, expected_code=302)
user = model.user.get_user('devtable')
self.assertEquals(user.email, 'foo@bar.com')
def test_confirm_recovery(self):
# Try for an invalid code.
2016-03-30 20:32:08 +00:00
self.getResponse('web.confirm_recovery', code='someinvalidcode', expected_code=200)
# Create a valid code and try.
user = model.user.get_user('devtable')
code = model.user.create_reset_password_email_code(user.email)
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
def test_confirm_recovery_verified(self):
# Create a valid code and try.
user = model.user.get_user('devtable')
user.verified = False
user.save()
code = model.user.create_reset_password_email_code(user.email)
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
# Ensure the current user is the expected user and that they are verified.
user = model.user.get_user('devtable')
self.assertTrue(user.verified)
self.getResponse('web.receipt', expected_code=404) # Will 401 if no user.
def test_build_status_badge(self):
# Try for an invalid repository.
self.getResponse('web.build_status_badge', repository='foo/bar', expected_code=404)
# Try for a public repository.
self.getResponse('web.build_status_badge', repository='public/publicrepo')
# Try for an private repository.
self.getResponse('web.build_status_badge', repository='devtable/simple',
expected_code=404)
# Try for an private repository with an invalid token.
self.getResponse('web.build_status_badge', repository='devtable/simple',
token='sometoken', expected_code=404)
# Try for an private repository with a valid token.
repository = model.repository.get_repository('devtable', 'simple')
self.getResponse('web.build_status_badge', repository='devtable/simple',
token=repository.badge_token)
def test_attach_custom_build_trigger(self):
self.getResponse('web.attach_custom_build_trigger', repository='foo/bar', expected_code=401)
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=401)
self.login('freshuser', 'password')
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=403)
self.login('devtable', 'password')
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=302)
def test_redirect_to_repository(self):
self.getResponse('web.redirect_to_repository', repository='foo/bar', expected_code=404)
self.getResponse('web.redirect_to_repository', repository='public/publicrepo', expected_code=302)
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=404)
self.login('devtable', 'password')
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=302)
def test_redirect_to_namespace(self):
self.getResponse('web.redirect_to_namespace', namespace='unknown', expected_code=404)
self.getResponse('web.redirect_to_namespace', namespace='devtable', expected_code=302)
self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)
2016-04-05 16:40:50 +00:00
class OAuthTestCase(EndpointTestCase):
def test_authorize_nologin(self):
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=401)
def test_authorize_invalidclient(self):
self.login('devtable', 'password')
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:5000/foobar?error=unauthorized_client', resp.headers['Location'])
def test_authorize_invalidscope(self):
self.login('devtable', 'password')
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'invalid:scope',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:8000/o2c.html?error=invalid_scope', resp.headers['Location'])
def test_authorize_invalidredirecturi(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://some/invalid/uri',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=400)
def test_authorize_success(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403)
def test_authorize_nocsrf_withinvalidheader(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Some random header')
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
def test_authorize_nocsrf_withbadheader(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
def test_authorize_nocsrf_correctheader(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=403)
# Add the client ID to the whitelist and try again.
app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef']
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
resp = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf_ratelimiting(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist a few times, making sure we eventually get rate limited.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
counter = 0
while True:
r = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=None)
self.assertNotEquals(200, r.status_code)
counter = counter + 1
if counter > 5:
self.fail('Exponential backoff did not fire')
if r.status_code == 429:
break
2016-04-05 16:40:50 +00:00
class KeyServerTestCase(EndpointTestCase):
2016-04-13 17:59:07 +00:00
def _get_test_jwt_payload(self):
return {
'iss': 'sample_service',
'aud': key_server.JWT_AUDIENCE,
'exp': int(time.time()) + 60,
'iat': int(time.time()),
'nbf': int(time.time()),
}
2016-04-07 00:03:48 +00:00
2016-04-05 16:40:50 +00:00
def test_list_service_keys(self):
# Retrieve all the keys.
all_keys = model.service_keys.list_all_keys()
visible_jwks = [key.jwk for key in model.service_keys.list_service_keys('sample_service')]
invisible_jwks = []
for key in all_keys:
is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow()
if key.service != 'sample_service' or key.approval is None or is_expired:
invisible_jwks.append(key.jwk)
2016-04-07 00:03:48 +00:00
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
jwkset = py_json.loads(rv)
# Make sure the hidden keys are not returned and the visible ones are returned.
self.assertTrue(len(visible_jwks) > 0)
self.assertTrue(len(invisible_jwks) > 0)
self.assertEquals(len(visible_jwks), len(jwkset['keys']))
for jwk in jwkset['keys']:
self.assertIn(jwk, visible_jwks)
self.assertNotIn(jwk, invisible_jwks)
2016-04-05 16:40:50 +00:00
def test_get_service_key(self):
2016-04-07 00:03:48 +00:00
# 200 for an approved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')
# 409 for an unapproved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
expected_code=409)
# 404 for a non-existant key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
expected_code=404)
2016-04-05 16:40:50 +00:00
# 403 for an approved but expired key that is inside of the 2 week window.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid6',
expected_code=403)
# 404 for an approved, expired key that is outside of the 2 week window.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid7',
expected_code=404)
2016-04-05 16:40:50 +00:00
def test_put_service_key(self):
2016-04-07 00:03:48 +00:00
# No Authorization header should yield a 400
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
expected_code=400)
# Mint a JWT with our test payload
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
2016-04-13 17:59:07 +00:00
payload = self._get_test_jwt_payload()
2016-04-07 00:03:48 +00:00
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
# Invalid service name should yield a 400.
self.putResponse('key_server.put_service_key', service='sample service', kid='kid420',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=400)
2016-04-07 00:03:48 +00:00
# Publish a new key
with assert_action_logged('service_key_create'):
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=202)
# Ensure that the key exists but is unapproved.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
expected_code=409)
2016-04-07 00:03:48 +00:00
# Attempt to rotate the key. Since not approved, it will fail.
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=403)
# Approve the key.
model.service_keys.approve_service_key('kid420', 1, ServiceKeyApprovalType.SUPERUSER)
2016-04-07 00:03:48 +00:00
# Rotate that new key
with assert_action_logged('service_key_rotate'):
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=200)
2016-04-07 00:03:48 +00:00
# Rotation should only work when signed by the previous key
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
2016-04-07 00:03:48 +00:00
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=403)
2016-04-05 16:40:50 +00:00
def test_attempt_delete_service_key_with_no_kid_signer(self):
# Generate two keys, approving the first.
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
# Mint a JWT with our test payload but *no kid*.
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={})
# Using the credentials of our key, attempt to delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=400, service='sample_service', kid='first')
def test_attempt_delete_service_key_with_expired_key(self):
# Generate two keys, approving the first.
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
model.service_keys.approve_service_key('first', 1, ServiceKeyApprovalType.SUPERUSER)
model.service_keys.generate_service_key('sample_service', None, kid='second')
# Mint a JWT with our test payload
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'first'})
# Set the expiration of the first to now - some time.
model.service_keys.set_key_expiration('first', datetime.utcnow() - timedelta(seconds=100))
# Using the credentials of our second key, attempt to delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=403, service='sample_service', kid='second')
# Set the expiration to the future and delete the key.
model.service_keys.set_key_expiration('first', datetime.utcnow() + timedelta(seconds=100))
with assert_action_logged('service_key_delete'):
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='second')
def test_delete_unapproved_service_key(self):
2016-04-07 00:03:48 +00:00
# No Authorization header should yield a 400
self.deleteResponse('key_server.delete_service_key', expected_code=400,
service='sample_service', kid='kid1')
# Generate an unapproved key.
private_key, _ = model.service_keys.generate_service_key('sample_service', None,
kid='unapprovedkeyhere')
# Mint a JWT with our test payload
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'unapprovedkeyhere'})
# Delete our unapproved key with itself.
with assert_action_logged('service_key_delete'):
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='unapprovedkeyhere')
def test_delete_chained_service_key(self):
# No Authorization header should yield a 400
self.deleteResponse('key_server.delete_service_key', expected_code=400,
service='sample_service', kid='kid1')
# Generate two keys.
2016-04-07 00:03:48 +00:00
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
# Mint a JWT with our test payload
2016-04-13 17:59:07 +00:00
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'kid123'})
2016-04-07 00:03:48 +00:00
# Using the credentials of our second key, attempt tp delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=403, service='sample_service', kid='kid321')
# Approve the second key.
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
2016-04-07 00:03:48 +00:00
# Using the credentials of our approved key, delete our unapproved key
with assert_action_logged('service_key_delete'):
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid321')
2016-04-07 00:03:48 +00:00
# Attempt to delete a key signed by a key from a different service
2016-04-13 17:59:07 +00:00
bad_token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'kid5'})
2016-04-07 00:03:48 +00:00
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % bad_token},
expected_code=403, service='sample_service', kid='kid123')
# Delete a self-signed, approved key
with assert_action_logged('service_key_delete'):
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid123')
2016-04-07 00:03:48 +00:00
2016-04-05 16:40:50 +00:00
if __name__ == '__main__':
unittest.main()