# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments import json import time import urlparse import jwt import pytest import requests from httmock import urlmatch, HTTMock from Crypto.PublicKey import RSA from jwkest.jwk import RSAKey from oauth.oidc import OIDCLoginService, OAuthLoginException @pytest.fixture() def http_client(): sess = requests.Session() adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) sess.mount('http://', adapter) sess.mount('https://', adapter) return sess @pytest.fixture(params=[True, False]) def app_config(http_client, request): return { 'PREFERRED_URL_SCHEME': 'http', 'SERVER_HOSTNAME': 'localhost', 'FEATURE_MAILING': request.param, 'SOMEOIDC_TEST_SERVICE': { 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', 'SERVICE_NAME': 'Some Cool Service', 'SERVICE_ICON': 'http://some/icon', 'OIDC_SERVER': 'http://fakeoidc', 'DEBUGGING': True, }, 'HTTPCLIENT': http_client, } @pytest.fixture() def oidc_service(app_config): return OIDCLoginService(app_config, 'SOMEOIDC_TEST_SERVICE') @pytest.fixture() def discovery_content(): return { 'scopes_supported': ['profile'], 'authorization_endpoint': 'http://fakeoidc/authorize', 'token_endpoint': 'http://fakeoidc/token', 'userinfo_endpoint': 'http://fakeoidc/userinfo', 'jwks_uri': 'http://fakeoidc/jwks', } @pytest.fixture() def discovery_handler(discovery_content): @urlmatch(netloc=r'fakeoidc', path=r'.+openid.+') def handler(_, __): return json.dumps(discovery_content) return handler @pytest.fixture(scope="module") # Slow to generate, only do it once. def signing_key(): private_key = RSA.generate(2048) jwk = RSAKey(key=private_key.publickey()).serialize() return { 'id': 'somekey', 'private_key': private_key.exportKey('PEM'), 'jwk': jwk, } @pytest.fixture() def id_token(oidc_service, signing_key, app_config): token_data = { 'iss': oidc_service.config['OIDC_SERVER'], 'aud': oidc_service.client_id(), 'nbf': int(time.time()), 'iat': int(time.time()), 'exp': int(time.time() + 600), 'sub': 'cooluser', } token_headers = { 'kid': signing_key['id'], } return jwt.encode(token_data, signing_key['private_key'], 'RS256', headers=token_headers) @pytest.fixture() def valid_code(): return 'validcode' @pytest.fixture() def token_handler(oidc_service, id_token, valid_code): @urlmatch(netloc=r'fakeoidc', path=r'/token') def handler(_, request): params = urlparse.parse_qs(request.body) if params.get('redirect_uri')[0] != 'http://localhost/oauth2/someoidc/callback': return {'status_code': 400, 'content': 'Invalid redirect URI'} if params.get('client_id')[0] != oidc_service.client_id(): return {'status_code': 401, 'content': 'Invalid client id'} if params.get('client_secret')[0] != oidc_service.client_secret(): return {'status_code': 401, 'content': 'Invalid client secret'} if params.get('code')[0] != valid_code: return {'status_code': 401, 'content': 'Invalid code'} if params.get('grant_type')[0] != 'authorization_code': return {'status_code': 400, 'content': 'Invalid authorization type'} content = { 'access_token': 'sometoken', 'id_token': id_token, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def jwks_handler(signing_key): def jwk_with_kid(kid, jwk): jwk = jwk.copy() jwk.update({'kid': kid}) return jwk @urlmatch(netloc=r'fakeoidc', path=r'/jwks') def handler(_, __): content = {'keys': [jwk_with_kid(signing_key['id'], signing_key['jwk'])]} return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def emptykeys_jwks_handler(): @urlmatch(netloc=r'fakeoidc', path=r'/jwks') def handler(_, __): content = {'keys': []} return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture(params=["someusername", None]) def preferred_username(request): return request.param @pytest.fixture def userinfo_handler(oidc_service, preferred_username): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'cooluser', 'preferred_username':preferred_username, 'email': 'foo@example.com', 'email_verified': True, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def invalidsub_userinfo_handler(oidc_service): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'invalidsub', 'preferred_username': 'someusername', 'email': 'foo@example.com', 'email_verified': True, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def missingemail_userinfo_handler(oidc_service, preferred_username): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'cooluser', 'preferred_username': preferred_username, } return {'status_code': 200, 'content': json.dumps(content)} return handler def test_basic_config(oidc_service): assert oidc_service.service_id() == 'someoidc' assert oidc_service.service_name() == 'Some Cool Service' assert oidc_service.get_icon() == 'http://some/icon' def test_discovery(oidc_service, http_client, discovery_handler): with HTTMock(discovery_handler): assert oidc_service.authorize_endpoint() == 'http://fakeoidc/authorize?response_type=code&' assert oidc_service.token_endpoint() == 'http://fakeoidc/token' assert oidc_service.user_endpoint() == 'http://fakeoidc/userinfo' assert oidc_service.get_login_scopes() == ['profile'] def test_public_config(oidc_service, discovery_handler): with HTTMock(discovery_handler): assert oidc_service.get_public_config()['OIDC'] assert oidc_service.get_public_config()['CLIENT_ID'] == 'foo' assert 'CLIENT_SECRET' not in oidc_service.get_public_config() assert 'bar' not in oidc_service.get_public_config().values() def test_exchange_code_invalidcode(oidc_service, discovery_handler, app_config, http_client, token_handler): with HTTMock(token_handler, discovery_handler): with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, 'testcode', '') def test_exchange_code_validcode(oidc_service, discovery_handler, app_config, http_client, token_handler, userinfo_handler, jwks_handler, valid_code, preferred_username): with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler): lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') assert lid == 'cooluser' assert lemail == 'foo@example.com' if preferred_username is not None: assert lusername == preferred_username else: assert lusername == lid def test_exchange_code_missingemail(oidc_service, discovery_handler, app_config, http_client, token_handler, missingemail_userinfo_handler, jwks_handler, valid_code, preferred_username): with HTTMock(jwks_handler, token_handler, missingemail_userinfo_handler, discovery_handler): if app_config['FEATURE_MAILING']: # Should fail because there is no valid email address. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') else: # Should succeed because, while there is no valid email address, it isn't necessary with # mailing disabled. lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') assert lid == 'cooluser' assert lemail is None if preferred_username is not None: assert lusername == preferred_username else: assert lusername == lid def test_exchange_code_invalidsub(oidc_service, discovery_handler, app_config, http_client, token_handler, invalidsub_userinfo_handler, jwks_handler, valid_code): with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler): # Should fail because the sub of the user info doesn't match that returned by the id_token. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') def test_exchange_code_missingkey(oidc_service, discovery_handler, app_config, http_client, token_handler, userinfo_handler, emptykeys_jwks_handler, valid_code): with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler): # Should fail because the key is missing. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')