This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/oauth/test/test_oidc.py
Joseph Schorr c0cc574ca2 Add Authorization header with token to user info call
While not required, it is recommended to send the token as an Authorization header to the UserInfo call in OIDC: http://openid.net/specs/openid-connect-core-1_0.html#UserInfo

Some implementations expect this and will fail if not present
2017-04-27 11:24:12 -04:00

276 lines
9.3 KiB
Python

# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments
import json
import time
import urlparse
import jwt
import pytest
import requests
from httmock import urlmatch, HTTMock
from Crypto.PublicKey import RSA
from jwkest.jwk import RSAKey
from oauth.oidc import OIDCLoginService, OAuthLoginException
@pytest.fixture()
def http_client():
sess = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100,
pool_maxsize=100)
sess.mount('http://', adapter)
sess.mount('https://', adapter)
return sess
@pytest.fixture(params=[True, False])
def app_config(http_client, request):
return {
'PREFERRED_URL_SCHEME': 'http',
'SERVER_HOSTNAME': 'localhost',
'FEATURE_MAILING': request.param,
'SOMEOIDC_TEST_SERVICE': {
'CLIENT_ID': 'foo',
'CLIENT_SECRET': 'bar',
'SERVICE_NAME': 'Some Cool Service',
'SERVICE_ICON': 'http://some/icon',
'OIDC_SERVER': 'http://fakeoidc',
'DEBUGGING': True,
},
'HTTPCLIENT': http_client,
}
@pytest.fixture()
def oidc_service(app_config):
return OIDCLoginService(app_config, 'SOMEOIDC_TEST_SERVICE')
@pytest.fixture()
def discovery_content():
return {
'scopes_supported': ['profile'],
'authorization_endpoint': 'http://fakeoidc/authorize',
'token_endpoint': 'http://fakeoidc/token',
'userinfo_endpoint': 'http://fakeoidc/userinfo',
'jwks_uri': 'http://fakeoidc/jwks',
}
@pytest.fixture()
def discovery_handler(discovery_content):
@urlmatch(netloc=r'fakeoidc', path=r'.+openid.+')
def handler(_, __):
return json.dumps(discovery_content)
return handler
@pytest.fixture(scope="module") # Slow to generate, only do it once.
def signing_key():
private_key = RSA.generate(2048)
jwk = RSAKey(key=private_key.publickey()).serialize()
return {
'id': 'somekey',
'private_key': private_key.exportKey('PEM'),
'jwk': jwk,
}
@pytest.fixture()
def id_token(oidc_service, signing_key, app_config):
token_data = {
'iss': oidc_service.config['OIDC_SERVER'],
'aud': oidc_service.client_id(),
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + 600),
'sub': 'cooluser',
}
token_headers = {
'kid': signing_key['id'],
}
return jwt.encode(token_data, signing_key['private_key'], 'RS256', headers=token_headers)
@pytest.fixture()
def valid_code():
return 'validcode'
@pytest.fixture()
def token_handler(oidc_service, id_token, valid_code):
@urlmatch(netloc=r'fakeoidc', path=r'/token')
def handler(_, request):
params = urlparse.parse_qs(request.body)
if params.get('redirect_uri')[0] != 'http://localhost/oauth2/someoidc/callback':
return {'status_code': 400, 'content': 'Invalid redirect URI'}
if params.get('client_id')[0] != oidc_service.client_id():
return {'status_code': 401, 'content': 'Invalid client id'}
if params.get('client_secret')[0] != oidc_service.client_secret():
return {'status_code': 401, 'content': 'Invalid client secret'}
if params.get('code')[0] != valid_code:
return {'status_code': 401, 'content': 'Invalid code'}
if params.get('grant_type')[0] != 'authorization_code':
return {'status_code': 400, 'content': 'Invalid authorization type'}
content = {
'access_token': 'sometoken',
'id_token': id_token,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def jwks_handler(signing_key):
def jwk_with_kid(kid, jwk):
jwk = jwk.copy()
jwk.update({'kid': kid})
return jwk
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': [jwk_with_kid(signing_key['id'], signing_key['jwk'])]}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def emptykeys_jwks_handler():
@urlmatch(netloc=r'fakeoidc', path=r'/jwks')
def handler(_, __):
content = {'keys': []}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture(params=["someusername", None])
def preferred_username(request):
return request.param
@pytest.fixture
def userinfo_handler(oidc_service, preferred_username):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, req):
if req.headers.get('Authorization') != 'Bearer sometoken':
return {'status_code': 401, 'content': 'Missing expected header'}
content = {
'sub': 'cooluser',
'preferred_username':preferred_username,
'email': 'foo@example.com',
'email_verified': True,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def invalidsub_userinfo_handler(oidc_service):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, __):
content = {
'sub': 'invalidsub',
'preferred_username': 'someusername',
'email': 'foo@example.com',
'email_verified': True,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
@pytest.fixture()
def missingemail_userinfo_handler(oidc_service, preferred_username):
@urlmatch(netloc=r'fakeoidc', path=r'/userinfo')
def handler(_, __):
content = {
'sub': 'cooluser',
'preferred_username': preferred_username,
}
return {'status_code': 200, 'content': json.dumps(content)}
return handler
def test_basic_config(oidc_service):
assert oidc_service.service_id() == 'someoidc'
assert oidc_service.service_name() == 'Some Cool Service'
assert oidc_service.get_icon() == 'http://some/icon'
def test_discovery(oidc_service, http_client, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.authorize_endpoint() == 'http://fakeoidc/authorize?response_type=code&'
assert oidc_service.token_endpoint() == 'http://fakeoidc/token'
assert oidc_service.user_endpoint() == 'http://fakeoidc/userinfo'
assert oidc_service.get_login_scopes() == ['profile']
def test_public_config(oidc_service, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.get_public_config()['OIDC']
assert oidc_service.get_public_config()['CLIENT_ID'] == 'foo'
assert 'CLIENT_SECRET' not in oidc_service.get_public_config()
assert 'bar' not in oidc_service.get_public_config().values()
def test_exchange_code_invalidcode(oidc_service, discovery_handler, app_config, http_client,
token_handler):
with HTTMock(token_handler, discovery_handler):
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, 'testcode', '')
def test_exchange_code_validcode(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, jwks_handler, valid_code,
preferred_username):
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client,
valid_code, '')
assert lid == 'cooluser'
assert lemail == 'foo@example.com'
if preferred_username is not None:
assert lusername == preferred_username
else:
assert lusername == lid
def test_exchange_code_missingemail(oidc_service, discovery_handler, app_config, http_client,
token_handler, missingemail_userinfo_handler, jwks_handler,
valid_code, preferred_username):
with HTTMock(jwks_handler, token_handler, missingemail_userinfo_handler, discovery_handler):
if app_config['FEATURE_MAILING']:
# Should fail because there is no valid email address.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
else:
# Should succeed because, while there is no valid email address, it isn't necessary with
# mailing disabled.
lid, lusername, lemail = oidc_service.exchange_code_for_login(app_config, http_client,
valid_code, '')
assert lid == 'cooluser'
assert lemail is None
if preferred_username is not None:
assert lusername == preferred_username
else:
assert lusername == lid
def test_exchange_code_invalidsub(oidc_service, discovery_handler, app_config, http_client,
token_handler, invalidsub_userinfo_handler, jwks_handler,
valid_code):
with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler):
# Should fail because the sub of the user info doesn't match that returned by the id_token.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')
def test_exchange_code_missingkey(oidc_service, discovery_handler, app_config, http_client,
token_handler, userinfo_handler, emptykeys_jwks_handler,
valid_code):
with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler):
# Should fail because the key is missing.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, '')