This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/oauth/test/test_oidc.py

277 lines
9.3 KiB
Python
Raw Normal View History

2017-01-26 17:00:54 +00:00
# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments
import json
import time
import urlparse
import jwt
import pytest
import requests
from httmock import urlmatch, HTTMock
from Crypto.PublicKey import RSA
from jwkest.jwk import RSAKey
from oauth.oidc import OIDCLoginService, OAuthLoginException
@pytest.fixture()
def http_client():
sess = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100,
pool_maxsize=100)
sess.mount('http://', adapter)
sess.mount('https://', adapter)
return sess
@pytest.fixture(params=[True, False])
def app_config(http_client, request):
return {
'PREFERRED_URL_SCHEME': 'http',
'SERVER_HOSTNAME': 'localhost',
'FEATURE_MAILING': request.param,
'SOMEOIDC_TEST_SERVICE': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
'SERVICE_NAME': 'Some Cool Service',
'SERVICE_ICON': 'http://some/icon',
'OIDC_SERVER': 'http://fakeoidc',
'DEBUGGING': True,
},
'HTTPCLIENT': http_client,
}
@pytest.fixture()
def oidc_service(app_config):
return OIDCLoginService(app_config, 'SOMEOIDC_TEST_SERVICE')
@pytest.fixture()
def discovery_content():
return {
'scopes_supported': ['profile'],
'authorization_endpoint': 'http://fakeoidc/authorize',
'token_endpoint': 'http://fakeoidc/token',
'userinfo_endpoint': 'http://fakeoidc/userinfo',
'jwks_uri': 'http://fakeoidc/jwks',
}
@pytest.fixture()
def discovery_handler(discovery_content):
@urlmatch(netloc=r'fakeoidc', path=r'.+openid.+')
def handler(_, __):
return json.dumps(discovery_content)
return handler
@pytest.fixture(scope="module") # Slow to generate, only do it once.
def signing_key():
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
return {
'id': 'somekey',
'private_key': private_key.exportKey('PEM'),
'jwk': jwk,
}
@pytest.fixture()
def id_token(oidc_service, signing_key, app_config):
token_data = {
'iss': oidc_service.config['OIDC_SERVER'],
'aud': oidc_service.client_id(),
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + 600),
'sub': 'cooluser',
}
token_headers = {
'kid': signing_key['id'],
}
return jwt.encode(token_data, signing_key['private_key'], 'RS256', headers=token_headers)
@pytest.fixture()
def valid_code():
return 'validcode'
@pytest.fixture()
def token_handler(oidc_service, id_token, valid_code):
@urlmatch(netloc=r'fakeoidc', path=r'/token')
def handler(_, request):
params = urlparse.parse_qs(request.body)
if params.get('redirect_uri')[0] != 'http://localhost/oauth2/someoidc/callback':
return {'status_code': 400, 'content': 'Invalid redirect URI'}
if params.get('client_id')[0] != oidc_service.client_id():
return {'status_code': 401, 'content': 'Invalid client id'}
if params.get('client_secret')[0] != oidc_service.client_secret():
return {'status_code': 401, 'content': 'Invalid client secret'}
if params.get('code')[0] != valid_code:
return {'status_code': 401, 'content': 'Invalid code'}
if params.get('grant_type')[0] != 'authorization_code':
return {'status_code': 400, 'content': 'Invalid authorization type'}
content = {
'access_token': 'sometoken',
'id_token': id_token,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def jwks_handler(signing_key):
def jwk_with_kid(kid, jwk):
jwk = jwk.copy()
jwk.update({'kid': kid})
return jwk
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': [jwk_with_kid(signing_key['id'], signing_key['jwk'])]}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def emptykeys_jwks_handler():
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': []}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture(params=["someusername", None])
def preferred_username(request):
return request.param
@pytest.fixture
def userinfo_handler(oidc_service, preferred_username):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, req):
if req.headers.get('Authorization') != 'Bearer sometoken':
return {'status_code': 401, 'content': 'Missing expected header'}
content = {
'sub': 'cooluser',
'preferred_username':preferred_username,
'email': 'foo@example.com',
'email_verified': True,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def invalidsub_userinfo_handler(oidc_service):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, __):
content = {
'sub': 'invalidsub',
'preferred_username': 'someusername',
'email': 'foo@example.com',
'email_verified': True,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def missingemail_userinfo_handler(oidc_service, preferred_username):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, __):
content = {
'sub': 'cooluser',
'preferred_username': preferred_username,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
def test_basic_config(oidc_service):
assert oidc_service.service_id() == 'someoidc'
assert oidc_service.service_name() == 'Some Cool Service'
assert oidc_service.get_icon() == 'http://some/icon'
def test_discovery(oidc_service, http_client, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.authorize_endpoint() == 'http://fakeoidc/authorize?response_type=code&'
assert oidc_service.token_endpoint() == 'http://fakeoidc/token'
assert oidc_service.user_endpoint() == 'http://fakeoidc/userinfo'
assert oidc_service.get_login_scopes() == ['profile']
def test_public_config(oidc_service, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.get_public_config()['OIDC']
assert oidc_service.get_public_config()['CLIENT_ID'] == 'foo'
assert 'CLIENT_SECRET' not in oidc_service.get_public_config()
assert 'bar' not in oidc_service.get_public_config().values()
def test_exchange_code_invalidcode(oidc_service, discovery_handler, app_config, http_client,
token_handler):
with HTTMock(token_handler, discovery_handler):
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, 'testcode', '')
def test_exchange_code_validcode(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, jwks_handler, valid_code,
preferred_username):
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client,
valid_code, '')
assert lid == 'cooluser'
assert lemail == 'foo@example.com'
if preferred_username is not None:
assert lusername == preferred_username
else:
assert lusername == lid
def test_exchange_code_missingemail(oidc_service, discovery_handler, app_config, http_client,
token_handler, missingemail_userinfo_handler, jwks_handler,
valid_code, preferred_username):
with HTTMock(jwks_handler, token_handler, missingemail_userinfo_handler, discovery_handler):
if app_config['FEATURE_MAILING']:
# Should fail because there is no valid email address.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
else:
# Should succeed because, while there is no valid email address, it isn't necessary with
# mailing disabled.
lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client,
valid_code, '')
assert lid == 'cooluser'
assert lemail is None
if preferred_username is not None:
assert lusername == preferred_username
else:
assert lusername == lid
def test_exchange_code_invalidsub(oidc_service, discovery_handler, app_config, http_client,
token_handler, invalidsub_userinfo_handler, jwks_handler,
valid_code):
with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler):
# Should fail because the sub of the user info doesn't match that returned by the id_token.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
def test_exchange_code_missingkey(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, emptykeys_jwks_handler,
valid_code):
with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler):
# Should fail because the key is missing.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')