# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments import json import time import urlparse import jwt import pytest from httmock import urlmatch, HTTMock from Crypto.PublicKey import RSA from jwkest.jwk import RSAKey from oauth.oidc import OIDCLoginService, OAuthLoginException @pytest.fixture(params=[True, False]) def app_config(http_client, request): return { 'PREFERRED_URL_SCHEME': 'http', 'SERVER_HOSTNAME': 'localhost', 'FEATURE_MAILING': request.param, 'SOMEOIDC_TEST_SERVICE': { 'CLIENT_ID': 'foo', 'CLIENT_SECRET': 'bar', 'SERVICE_NAME': 'Some Cool Service', 'SERVICE_ICON': 'http://some/icon', 'OIDC_SERVER': 'http://fakeoidc', 'DEBUGGING': True, }, 'HTTPCLIENT': http_client, } @pytest.fixture() def oidc_service(app_config): return OIDCLoginService(app_config, 'SOMEOIDC_TEST_SERVICE') @pytest.fixture() def discovery_content(): return { 'scopes_supported': ['profile'], 'authorization_endpoint': 'http://fakeoidc/authorize', 'token_endpoint': 'http://fakeoidc/token', 'userinfo_endpoint': 'http://fakeoidc/userinfo', 'jwks_uri': 'http://fakeoidc/jwks', } @pytest.fixture() def discovery_handler(discovery_content): @urlmatch(netloc=r'fakeoidc', path=r'.+openid.+') def handler(_, __): return json.dumps(discovery_content) return handler @pytest.fixture(scope="module") # Slow to generate, only do it once. def signing_key(): private_key = RSA.generate(2048) jwk = RSAKey(key=private_key.publickey()).serialize() return { 'id': 'somekey', 'private_key': private_key.exportKey('PEM'), 'jwk': jwk, } @pytest.fixture() def id_token(oidc_service, signing_key, app_config): token_data = { 'iss': oidc_service.config['OIDC_SERVER'], 'aud': oidc_service.client_id(), 'nbf': int(time.time()), 'iat': int(time.time()), 'exp': int(time.time() + 600), 'sub': 'cooluser', } token_headers = { 'kid': signing_key['id'], } return jwt.encode(token_data, signing_key['private_key'], 'RS256', headers=token_headers) @pytest.fixture() def valid_code(): return 'validcode' @pytest.fixture() def token_handler(oidc_service, id_token, valid_code): @urlmatch(netloc=r'fakeoidc', path=r'/token') def handler(_, request): params = urlparse.parse_qs(request.body) if params.get('redirect_uri')[0] != 'http://localhost/oauth2/someoidc/callback': return {'status_code': 400, 'content': 'Invalid redirect URI'} if params.get('client_id')[0] != oidc_service.client_id(): return {'status_code': 401, 'content': 'Invalid client id'} if params.get('client_secret')[0] != oidc_service.client_secret(): return {'status_code': 401, 'content': 'Invalid client secret'} if params.get('code')[0] != valid_code: return {'status_code': 401, 'content': 'Invalid code'} if params.get('grant_type')[0] != 'authorization_code': return {'status_code': 400, 'content': 'Invalid authorization type'} content = { 'access_token': 'sometoken', 'id_token': id_token, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def jwks_handler(signing_key): def jwk_with_kid(kid, jwk): jwk = jwk.copy() jwk.update({'kid': kid}) return jwk @urlmatch(netloc=r'fakeoidc', path=r'/jwks') def handler(_, __): content = {'keys': [jwk_with_kid(signing_key['id'], signing_key['jwk'])]} return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def emptykeys_jwks_handler(): @urlmatch(netloc=r'fakeoidc', path=r'/jwks') def handler(_, __): content = {'keys': []} return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture(params=["someusername", None]) def preferred_username(request): return request.param @pytest.fixture def userinfo_handler(oidc_service, preferred_username): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'cooluser', 'preferred_username':preferred_username, 'email': 'foo@example.com', 'email_verified': True, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def invalidsub_userinfo_handler(oidc_service): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'invalidsub', 'preferred_username': 'someusername', 'email': 'foo@example.com', 'email_verified': True, } return {'status_code': 200, 'content': json.dumps(content)} return handler @pytest.fixture() def missingemail_userinfo_handler(oidc_service, preferred_username): @urlmatch(netloc=r'fakeoidc', path=r'/userinfo') def handler(_, __): content = { 'sub': 'cooluser', 'preferred_username': preferred_username, } return {'status_code': 200, 'content': json.dumps(content)} return handler def test_basic_config(oidc_service): assert oidc_service.service_id() == 'someoidc' assert oidc_service.service_name() == 'Some Cool Service' assert oidc_service.get_icon() == 'http://some/icon' def test_discovery(oidc_service, http_client, discovery_handler): with HTTMock(discovery_handler): assert oidc_service.authorize_endpoint() == 'http://fakeoidc/authorize?response_type=code&' assert oidc_service.token_endpoint() == 'http://fakeoidc/token' assert oidc_service.user_endpoint() == 'http://fakeoidc/userinfo' assert oidc_service.get_login_scopes() == ['profile'] def test_public_config(oidc_service, discovery_handler): with HTTMock(discovery_handler): assert oidc_service.get_public_config()['OIDC'] assert oidc_service.get_public_config()['CLIENT_ID'] == 'foo' assert 'CLIENT_SECRET' not in oidc_service.get_public_config() assert 'bar' not in oidc_service.get_public_config().values() def test_exchange_code_invalidcode(oidc_service, discovery_handler, app_config, http_client, token_handler): with HTTMock(token_handler, discovery_handler): with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, 'testcode', '') def test_exchange_code_validcode(oidc_service, discovery_handler, app_config, http_client, token_handler, userinfo_handler, jwks_handler, valid_code, preferred_username): with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler): lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') assert lid == 'cooluser' assert lemail == 'foo@example.com' if preferred_username is not None: assert lusername == preferred_username else: assert lusername == lid def test_exchange_code_missingemail(oidc_service, discovery_handler, app_config, http_client, token_handler, missingemail_userinfo_handler, jwks_handler, valid_code, preferred_username): with HTTMock(jwks_handler, token_handler, missingemail_userinfo_handler, discovery_handler): if app_config['FEATURE_MAILING']: # Should fail because there is no valid email address. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') else: # Should succeed because, while there is no valid email address, it isn't necessary with # mailing disabled. lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') assert lid == 'cooluser' assert lemail is None if preferred_username is not None: assert lusername == preferred_username else: assert lusername == lid def test_exchange_code_invalidsub(oidc_service, discovery_handler, app_config, http_client, token_handler, invalidsub_userinfo_handler, jwks_handler, valid_code): with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler): # Should fail because the sub of the user info doesn't match that returned by the id_token. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '') def test_exchange_code_missingkey(oidc_service, discovery_handler, app_config, http_client, token_handler, userinfo_handler, emptykeys_jwks_handler, valid_code): with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler): # Should fail because the key is missing. with pytest.raises(OAuthLoginException): oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')