This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_endpoints.py
Joseph Schorr 651666b60b Refactor our auth handling code to be cleaner
Breaks out the validation code from the auth context modification calls, makes decorators easier to define and adds testing for each individual piece. Will be the basis of better error messaging in the following change.
2017-03-23 15:42:45 -04:00

633 lines
25 KiB
Python

# coding=utf-8
import json as py_json
import time
import unittest
import base64
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from datetime import datetime, timedelta
import jwt
from Crypto.PublicKey import RSA
from flask import url_for
from jwkest.jwk import RSAKey
from app import app
from data import model
from data.database import ServiceKeyApprovalType
from endpoints import keyserver
from endpoints.api import api, api_bp
from endpoints.api.user import Signin
from endpoints.keyserver import jwk_with_kid
from endpoints.csrf import OAUTH_CSRF_TOKEN_NAME
from endpoints.web import web as web_bp
from endpoints.webhooks import webhooks as webhooks_bp
from initdb import setup_database_for_testing, finished_database_for_testing
from test.helpers import assert_action_logged
try:
app.register_blueprint(web_bp, url_prefix='')
except ValueError:
# This blueprint was already registered
pass
try:
app.register_blueprint(webhooks_bp, url_prefix='/webhooks')
except ValueError:
# This blueprint was already registered
pass
try:
app.register_blueprint(keyserver.key_server, url_prefix='')
except ValueError:
# This blueprint was already registered
pass
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
class EndpointTestCase(unittest.TestCase):
maxDiff = None
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
sess[OAUTH_CSRF_TOKEN_NAME] = 'someoauthtoken'
def getResponse(self, resource_name, expected_code=200, **kwargs):
rv = self.app.get(url_for(resource_name, **kwargs))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteResponse(self, resource_name, headers=None, expected_code=200, **kwargs):
headers = headers or {}
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteEmptyResponse(self, resource_name, headers=None, expected_code=204, **kwargs):
headers = headers or {}
rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers)
self.assertEquals(rv.status_code, expected_code)
self.assertEquals(rv.data, '') # ensure response body empty
return
def putResponse(self, resource_name, headers=None, data=None, expected_code=200, **kwargs):
headers = headers or {}
data = data or {}
rv = self.app.put(url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def postResponse(self, resource_name, headers=None, data=None, form=None, with_csrf=True, expected_code=200, **kwargs):
headers = headers or {}
form = form or {}
url = url_for(resource_name, **kwargs)
if with_csrf:
url = EndpointTestCase._add_csrf(url)
post_data = None
if form:
post_data = form
elif data:
post_data = py_json.dumps(data)
rv = self.app.post(url, headers=headers, data=post_data)
if expected_code is not None:
self.assertEquals(rv.status_code, expected_code)
return rv
def login(self, username, password):
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
data=py_json.dumps(dict(username=username, password=password)),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, 200)
class WebhookEndpointTestCase(EndpointTestCase):
def test_invalid_build_trigger_webhook(self):
self.postResponse('webhooks.build_trigger_webhook', trigger_uuid='invalidtrigger',
expected_code=404)
def test_valid_build_trigger_webhook_invalid_auth(self):
trigger = list(model.build.list_build_triggers('devtable', 'building'))[0]
self.postResponse('webhooks.build_trigger_webhook', trigger_uuid=trigger.uuid,
expected_code=403)
def test_valid_build_trigger_webhook_cookie_auth(self):
self.login('devtable', 'password')
# Cookie auth is not supported, so this should 403
trigger = list(model.build.list_build_triggers('devtable', 'building'))[0]
self.postResponse('webhooks.build_trigger_webhook', trigger_uuid=trigger.uuid,
expected_code=403)
def test_valid_build_trigger_webhook_missing_payload(self):
auth_header = 'Basic %s' % (base64.b64encode('devtable:password'))
trigger = list(model.build.list_build_triggers('devtable', 'building'))[0]
self.postResponse('webhooks.build_trigger_webhook', trigger_uuid=trigger.uuid,
expected_code=400, headers={'Authorization': auth_header})
def test_valid_build_trigger_webhook_invalid_payload(self):
auth_header = 'Basic %s' % (base64.b64encode('devtable:password'))
trigger = list(model.build.list_build_triggers('devtable', 'building'))[0]
self.postResponse('webhooks.build_trigger_webhook', trigger_uuid=trigger.uuid,
expected_code=400,
headers={'Authorization': auth_header, 'Content-Type': 'application/json'},
data={'invalid': 'payload'})
class WebEndpointTestCase(EndpointTestCase):
def test_index(self):
self.getResponse('web.index')
def test_robots(self):
self.getResponse('web.robots')
def test_sitemap(self):
self.getResponse('web.sitemap')
def test_repo_view(self):
self.getResponse('web.repository', path='devtable/simple')
def test_unicode_repo_view(self):
self.getResponse('web.repository', path='%E2%80%8Bcoreos/hyperkube%E2%80%8B')
def test_org_view(self):
self.getResponse('web.org_view', path='buynlarge')
def test_user_view(self):
self.getResponse('web.user_view', path='devtable')
def test_confirm_repo_email(self):
code = model.repository.create_email_authorization_for_repo('devtable', 'simple', 'foo@bar.com')
self.getResponse('web.confirm_repo_email', code=code.code)
found = model.repository.get_email_authorized_for_repo('devtable', 'simple', 'foo@bar.com')
self.assertTrue(found.confirmed)
def test_confirm_email(self):
user = model.user.get_user('devtable')
self.assertNotEquals(user.email, 'foo@bar.com')
code = model.user.create_confirm_email_code(user, 'foo@bar.com')
self.getResponse('web.confirm_email', code=code.code, expected_code=302)
user = model.user.get_user('devtable')
self.assertEquals(user.email, 'foo@bar.com')
def test_confirm_recovery(self):
# Try for an invalid code.
self.getResponse('web.confirm_recovery', code='someinvalidcode', expected_code=200)
# Create a valid code and try.
user = model.user.get_user('devtable')
code = model.user.create_reset_password_email_code(user.email)
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
def test_confirm_recovery_verified(self):
# Create a valid code and try.
user = model.user.get_user('devtable')
user.verified = False
user.save()
code = model.user.create_reset_password_email_code(user.email)
self.getResponse('web.confirm_recovery', code=code.code, expected_code=302)
# Ensure the current user is the expected user and that they are verified.
user = model.user.get_user('devtable')
self.assertTrue(user.verified)
self.getResponse('web.receipt', expected_code=404) # Will 401 if no user.
def test_build_status_badge(self):
# Try for an invalid repository.
self.getResponse('web.build_status_badge', repository='foo/bar', expected_code=404)
# Try for a public repository.
self.getResponse('web.build_status_badge', repository='public/publicrepo')
# Try for an private repository.
self.getResponse('web.build_status_badge', repository='devtable/simple',
expected_code=404)
# Try for an private repository with an invalid token.
self.getResponse('web.build_status_badge', repository='devtable/simple',
token='sometoken', expected_code=404)
# Try for an private repository with a valid token.
repository = model.repository.get_repository('devtable', 'simple')
self.getResponse('web.build_status_badge', repository='devtable/simple',
token=repository.badge_token)
def test_attach_custom_build_trigger(self):
self.getResponse('web.attach_custom_build_trigger', repository='foo/bar', expected_code=401)
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=401)
self.login('freshuser', 'password')
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=403)
self.login('devtable', 'password')
self.getResponse('web.attach_custom_build_trigger', repository='devtable/simple', expected_code=302)
def test_redirect_to_repository(self):
self.getResponse('web.redirect_to_repository', repository='foo/bar', expected_code=404)
self.getResponse('web.redirect_to_repository', repository='public/publicrepo', expected_code=302)
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=403)
self.login('devtable', 'password')
self.getResponse('web.redirect_to_repository', repository='devtable/simple', expected_code=302)
def test_redirect_to_namespace(self):
self.getResponse('web.redirect_to_namespace', namespace='unknown', expected_code=404)
self.getResponse('web.redirect_to_namespace', namespace='devtable', expected_code=302)
self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)
class OAuthTestCase(EndpointTestCase):
def test_authorize_nologin(self):
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=401)
def test_authorize_invalidclient(self):
self.login('devtable', 'password')
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:5000/foobar?error=unauthorized_client', resp.headers['Location'])
def test_authorize_invalidscope(self):
self.login('devtable', 'password')
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'invalid:scope',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:8000/o2c.html?error=invalid_scope', resp.headers['Location'])
def test_authorize_invalidredirecturi(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://some/invalid/uri',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=400)
def test_authorize_success(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403)
def test_authorize_nocsrf_withinvalidheader(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Some random header')
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
def test_authorize_nocsrf_withbadheader(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False,
expected_code=401)
def test_authorize_nocsrf_correctheader(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False,
expected_code=403)
# Add the client ID to the whitelist and try again.
app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef']
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
resp = self.postResponse('web.authorize_application', headers=headers, form=form,
with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf_ratelimiting(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist a few times, making sure we eventually get rate limited.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
counter = 0
while True:
r = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=None)
self.assertNotEquals(200, r.status_code)
counter = counter + 1
if counter > 5:
self.fail('Exponential backoff did not fire')
if r.status_code == 429:
break
class KeyServerTestCase(EndpointTestCase):
def _get_test_jwt_payload(self):
return {
'iss': 'sample_service',
'aud': keyserver.JWT_AUDIENCE,
'exp': int(time.time()) + 60,
'iat': int(time.time()),
'nbf': int(time.time()),
}
def test_list_service_keys(self):
# Retrieve all the keys.
all_keys = model.service_keys.list_all_keys()
visible_jwks = [jwk_with_kid(key) for key in model.service_keys.list_service_keys('sample_service')]
invisible_jwks = []
for key in all_keys:
is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow()
if key.service != 'sample_service' or key.approval is None or is_expired:
invisible_jwks.append(key.jwk)
rv = self.getResponse('key_server.list_service_keys', service='sample_service')
jwkset = py_json.loads(rv)
# Make sure the hidden keys are not returned and the visible ones are returned.
self.assertTrue(len(visible_jwks) > 0)
self.assertTrue(len(invisible_jwks) > 0)
self.assertEquals(len(visible_jwks), len(jwkset['keys']))
for jwk in jwkset['keys']:
self.assertIn(jwk, visible_jwks)
self.assertNotIn(jwk, invisible_jwks)
def test_get_service_key(self):
# 200 for an approved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid1')
# 409 for an unapproved key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid3',
expected_code=409)
# 404 for a non-existant key
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid9999',
expected_code=404)
# 403 for an approved but expired key that is inside of the 2 week window.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid6',
expected_code=403)
# 404 for an approved, expired key that is outside of the 2 week window.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid7',
expected_code=404)
def test_put_service_key(self):
# No Authorization header should yield a 400
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
expected_code=400)
# Mint a JWT with our test payload
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
payload = self._get_test_jwt_payload()
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256')
# Invalid service name should yield a 400.
self.putResponse('key_server.put_service_key', service='sample service', kid='kid420',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=400)
# Publish a new key
with assert_action_logged('service_key_create'):
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid420',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=202)
# Ensure that the key exists but is unapproved.
self.getResponse('key_server.get_service_key', service='sample_service', kid='kid420',
expected_code=409)
# Attempt to rotate the key. Since not approved, it will fail.
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=403)
# Approve the key.
model.service_keys.approve_service_key('kid420', 1, ServiceKeyApprovalType.SUPERUSER)
# Rotate that new key
with assert_action_logged('service_key_rotate'):
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=200)
# Rotation should only work when signed by the previous key
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
token = jwt.encode(payload, private_key.exportKey('PEM'), 'RS256', headers={'kid': 'kid420'})
self.putResponse('key_server.put_service_key', service='sample_service', kid='kid6969',
headers={
'Authorization': 'Bearer %s' % token,
'Content-Type': 'application/json',
}, data=jwk, expected_code=403)
def test_attempt_delete_service_key_with_no_kid_signer(self):
# Generate two keys, approving the first.
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
# Mint a JWT with our test payload but *no kid*.
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={})
# Using the credentials of our key, attempt to delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=400, service='sample_service', kid='first')
def test_attempt_delete_service_key_with_expired_key(self):
# Generate two keys, approving the first.
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='first')
model.service_keys.approve_service_key('first', 1, ServiceKeyApprovalType.SUPERUSER)
model.service_keys.generate_service_key('sample_service', None, kid='second')
# Mint a JWT with our test payload
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'first'})
# Set the expiration of the first to now - some time.
model.service_keys.set_key_expiration('first', datetime.utcnow() - timedelta(seconds=100))
# Using the credentials of our second key, attempt to delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=403, service='sample_service', kid='second')
# Set the expiration to the future and delete the key.
model.service_keys.set_key_expiration('first', datetime.utcnow() + timedelta(seconds=100))
with assert_action_logged('service_key_delete'):
self.deleteEmptyResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='second')
def test_delete_unapproved_service_key(self):
# No Authorization header should yield a 400
self.deleteResponse('key_server.delete_service_key', expected_code=400,
service='sample_service', kid='kid1')
# Generate an unapproved key.
private_key, _ = model.service_keys.generate_service_key('sample_service', None,
kid='unapprovedkeyhere')
# Mint a JWT with our test payload
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'unapprovedkeyhere'})
# Delete our unapproved key with itself.
with assert_action_logged('service_key_delete'):
self.deleteEmptyResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='unapprovedkeyhere')
def test_delete_chained_service_key(self):
# No Authorization header should yield a 400
self.deleteResponse('key_server.delete_service_key', expected_code=400,
service='sample_service', kid='kid1')
# Generate two keys.
private_key, _ = model.service_keys.generate_service_key('sample_service', None, kid='kid123')
model.service_keys.generate_service_key('sample_service', None, kid='kid321')
# Mint a JWT with our test payload
token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'kid123'})
# Using the credentials of our second key, attempt tp delete our unapproved key
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=403, service='sample_service', kid='kid321')
# Approve the second key.
model.service_keys.approve_service_key('kid123', 1, ServiceKeyApprovalType.SUPERUSER)
# Using the credentials of our approved key, delete our unapproved key
with assert_action_logged('service_key_delete'):
self.deleteEmptyResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid321')
# Attempt to delete a key signed by a key from a different service
bad_token = jwt.encode(self._get_test_jwt_payload(), private_key.exportKey('PEM'), 'RS256',
headers={'kid': 'kid5'})
self.deleteResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % bad_token},
expected_code=403, service='sample_service', kid='kid123')
# Delete a self-signed, approved key
with assert_action_logged('service_key_delete'):
self.deleteEmptyResponse('key_server.delete_service_key',
headers={'Authorization': 'Bearer %s' % token},
expected_code=204, service='sample_service', kid='kid123')
if __name__ == '__main__':
unittest.main()