This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/oauth/test/test_oidc.py

343 lines
12 KiB
Python
Raw Normal View History

2019-11-12 16:09:47 +00:00
# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments
import json
import time
import urlparse
import jwt
import pytest
import requests
from httmock import urlmatch, HTTMock
from Crypto.PublicKey import RSA
from jwkest.jwk import RSAKey
from oauth.oidc import OIDCLoginService, OAuthLoginException
from util.config import URLSchemeAndHostname
@pytest.fixture(scope='module') # Slow to generate, only do it once.
def signing_key():
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
return {
'id': 'somekey',
'private_key': private_key.exportKey('PEM'),
'jwk': jwk,
}
@pytest.fixture(scope="module")
def http_client():
sess = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100,
pool_maxsize=100)
sess.mount('http://', adapter)
sess.mount('https://', adapter)
return sess
@pytest.fixture(scope="module")
def valid_code():
return 'validcode'
@pytest.fixture(params=[True, False])
def mailing_feature(request):
return request.param
@pytest.fixture(params=[True, False])
def email_verified(request):
return request.param
@pytest.fixture(params=[True, False])
def userinfo_supported(request):
return request.param
@pytest.fixture(params=["someusername", "foo@bar.com", None])
def preferred_username(request):
return request.param
@pytest.fixture()
def app_config(http_client, mailing_feature):
return {
'PREFERRED_URL_SCHEME': 'http',
'SERVER_HOSTNAME': 'localhost',
'FEATURE_MAILING': mailing_feature,
'SOMEOIDC_LOGIN_CONFIG': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
'SERVICE_NAME': 'Some Cool Service',
'SERVICE_ICON': 'http://some/icon',
'OIDC_SERVER': 'http://fakeoidc',
'DEBUGGING': True,
},
'ANOTHEROIDC_LOGIN_CONFIG': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
'SERVICE_NAME': 'Some Other Service',
'SERVICE_ICON': 'http://some/icon',
'OIDC_SERVER': 'http://fakeoidc',
'LOGIN_SCOPES': ['openid'],
'DEBUGGING': True,
},
'OIDCWITHPARAMS_LOGIN_CONFIG': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
'SERVICE_NAME': 'Some Other Service',
'SERVICE_ICON': 'http://some/icon',
'OIDC_SERVER': 'http://fakeoidc',
'DEBUGGING': True,
'OIDC_ENDPOINT_CUSTOM_PARAMS': {
'authorization_endpoint': {
'some': 'param',
},
},
},
'HTTPCLIENT': http_client,
}
@pytest.fixture()
def oidc_service(app_config):
return OIDCLoginService(app_config, 'SOMEOIDC_LOGIN_CONFIG')
@pytest.fixture()
def another_oidc_service(app_config):
return OIDCLoginService(app_config, 'ANOTHEROIDC_LOGIN_CONFIG')
@pytest.fixture()
def oidc_withparams_service(app_config):
return OIDCLoginService(app_config, 'OIDCWITHPARAMS_LOGIN_CONFIG')
@pytest.fixture()
def discovery_content(userinfo_supported):
return {
'scopes_supported': ['openid', 'profile', 'somescope'],
'authorization_endpoint': 'http://fakeoidc/authorize',
'token_endpoint': 'http://fakeoidc/token',
'userinfo_endpoint': 'http://fakeoidc/userinfo' if userinfo_supported else None,
'jwks_uri': 'http://fakeoidc/jwks',
}
@pytest.fixture()
def userinfo_content(preferred_username, email_verified):
return {
'sub': 'cooluser',
'preferred_username': preferred_username,
'email': 'foo@example.com',
'email_verified': email_verified,
}
@pytest.fixture()
def id_token(oidc_service, signing_key, userinfo_content, app_config):
token_data = {
'iss': oidc_service.config['OIDC_SERVER'],
'aud': oidc_service.client_id(),
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + 600),
'sub': 'cooluser',
}
token_data.update(userinfo_content)
token_headers = {
'kid': signing_key['id'],
}
return jwt.encode(token_data, signing_key['private_key'], 'RS256', headers=token_headers)
@pytest.fixture()
def discovery_handler(discovery_content):
@urlmatch(netloc=r'fakeoidc', path=r'.+openid.+')
def handler(_, __):
return json.dumps(discovery_content)
return handler
@pytest.fixture()
def authorize_handler(discovery_content):
@urlmatch(netloc=r'fakeoidc', path=r'/authorize')
def handler(_, request):
parsed = urlparse.urlparse(request.url)
params = urlparse.parse_qs(parsed.query)
return json.dumps({'authorized': True, 'scope': params['scope'][0], 'state': params['state'][0]})
return handler
@pytest.fixture()
def token_handler(oidc_service, id_token, valid_code):
@urlmatch(netloc=r'fakeoidc', path=r'/token')
def handler(_, request):
params = urlparse.parse_qs(request.body)
if params.get('redirect_uri')[0] != 'http://localhost/oauth2/someoidc/callback':
return {'status_code': 400, 'content': 'Invalid redirect URI'}
if params.get('client_id')[0] != oidc_service.client_id():
return {'status_code': 401, 'content': 'Invalid client id'}
if params.get('client_secret')[0] != oidc_service.client_secret():
return {'status_code': 401, 'content': 'Invalid client secret'}
if params.get('code')[0] != valid_code:
return {'status_code': 401, 'content': 'Invalid code'}
if params.get('grant_type')[0] != 'authorization_code':
return {'status_code': 400, 'content': 'Invalid authorization type'}
content = {
'access_token': 'sometoken',
'id_token': id_token,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def jwks_handler(signing_key):
def jwk_with_kid(kid, jwk):
jwk = jwk.copy()
jwk.update({'kid': kid})
return jwk
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': [jwk_with_kid(signing_key['id'], signing_key['jwk'])]}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def emptykeys_jwks_handler():
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': []}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture
def userinfo_handler(oidc_service, userinfo_content):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, req):
if req.headers.get('Authorization') != 'Bearer sometoken':
return {'status_code': 401, 'content': 'Missing expected header'}
return {'status_code': 200, 'content': json.dumps(userinfo_content)}
return handler
@pytest.fixture()
def invalidsub_userinfo_handler(oidc_service):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, __):
content = {
'sub': 'invalidsub',
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
def test_basic_config(oidc_service):
assert oidc_service.service_id() == 'someoidc'
assert oidc_service.service_name() == 'Some Cool Service'
assert oidc_service.get_icon() == 'http://some/icon'
def test_discovery(oidc_service, http_client, discovery_content, discovery_handler):
with HTTMock(discovery_handler):
auth = discovery_content['authorization_endpoint'] + '?response_type=code'
assert oidc_service.authorize_endpoint().to_url() == auth
assert oidc_service.token_endpoint().to_url() == discovery_content['token_endpoint']
if discovery_content['userinfo_endpoint'] is None:
assert oidc_service.user_endpoint() is None
else:
assert oidc_service.user_endpoint().to_url() == discovery_content['userinfo_endpoint']
assert set(oidc_service.get_login_scopes()) == set(discovery_content['scopes_supported'])
def test_discovery_with_params(oidc_withparams_service, http_client, discovery_content, discovery_handler):
with HTTMock(discovery_handler):
assert 'some=param' in oidc_withparams_service.authorize_endpoint().to_url()
def test_filtered_discovery(another_oidc_service, http_client, discovery_content, discovery_handler):
with HTTMock(discovery_handler):
assert another_oidc_service.get_login_scopes() == ['openid']
def test_public_config(oidc_service, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.get_public_config()['OIDC']
assert oidc_service.get_public_config()['CLIENT_ID'] == 'foo'
assert 'CLIENT_SECRET' not in oidc_service.get_public_config()
assert 'bar' not in oidc_service.get_public_config().values()
def test_auth_url(oidc_service, discovery_handler, http_client, authorize_handler):
config = {'PREFERRED_URL_SCHEME': 'https', 'SERVER_HOSTNAME': 'someserver'}
with HTTMock(discovery_handler, authorize_handler):
url_scheme_and_hostname = URLSchemeAndHostname.from_app_config(config)
auth_url = oidc_service.get_auth_url(url_scheme_and_hostname, '', 'some csrf token', ['one', 'two'])
# Hit the URL and ensure it works.
result = http_client.get(auth_url).json()
assert result['state'] == 'some csrf token'
assert result['scope'] == 'one two'
def test_exchange_code_invalidcode(oidc_service, discovery_handler, app_config, http_client,
token_handler):
with HTTMock(token_handler, discovery_handler):
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, 'testcode', '')
def test_exchange_code_invalidsub(oidc_service, discovery_handler, app_config, http_client,
token_handler, invalidsub_userinfo_handler, jwks_handler,
valid_code, userinfo_supported):
# Skip when userinfo is not supported.
if not userinfo_supported:
return
with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler):
# Should fail because the sub of the user info doesn't match that returned by the id_token.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
def test_exchange_code_missingkey(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, emptykeys_jwks_handler,
valid_code):
with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler):
# Should fail because the key is missing.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
def test_exchange_code_validcode(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, jwks_handler, valid_code,
preferred_username, mailing_feature, email_verified):
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
if mailing_feature and not email_verified:
# Should fail because there isn't a verified email address.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
else:
# Should succeed.
lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client,
valid_code, '')
assert lid == 'cooluser'
if email_verified:
assert lemail == 'foo@example.com'
else:
assert lemail is None
if preferred_username is not None:
if preferred_username.find('@') >= 0:
preferred_username = preferred_username[0:preferred_username.find('@')]
assert lusername == preferred_username
else:
assert lusername == lid