Merge pull request #2348 from coreos-inc/oidc-bind

Add support for binding OIDC and external login auth to the backing internal login auth
This commit is contained in:
josephschorr 2017-02-16 16:29:00 -05:00 committed by GitHub
commit 7cb2da4cea
13 changed files with 491 additions and 190 deletions

View file

@ -370,6 +370,13 @@ def update_user_metadata(user, given_name=None, family_name=None, company=None):
remove_user_prompt(user, UserPromptTypes.ENTER_COMPANY)
def _get_login_service(service_id):
try:
return LoginService.get(LoginService.name == service_id)
except LoginService.DoesNotExist:
return LoginService.create(name=service_id)
def create_federated_user(username, email, service_id, service_ident,
set_password_notification, metadata={},
email_required=True, prompts=tuple()):
@ -380,12 +387,7 @@ def create_federated_user(username, email, service_id, service_ident,
new_user.verified = True
new_user.save()
try:
service = LoginService.get(LoginService.name == service_id)
except LoginService.DoesNotExist:
service = LoginService.create(name=service_id)
FederatedLogin.create(user=new_user, service=service,
FederatedLogin.create(user=new_user, service=_get_login_service(service_id),
service_ident=service_ident,
metadata_json=json.dumps(metadata))
@ -395,10 +397,10 @@ def create_federated_user(username, email, service_id, service_ident,
return new_user
def attach_federated_login(user, service_id, service_ident, metadata={}):
service = LoginService.get(LoginService.name == service_id)
def attach_federated_login(user, service_id, service_ident, metadata=None):
service = _get_login_service(service_id)
FederatedLogin.create(user=user, service=service, service_ident=service_ident,
metadata_json=json.dumps(metadata))
metadata_json=json.dumps(metadata or {}))
return user

View file

@ -6,6 +6,7 @@ from data import model
from endpoints.api import api
from endpoints.api.superuser import SuperUserRepositoryBuildLogs, SuperUserRepositoryBuildResource
from endpoints.api.superuser import SuperUserRepositoryBuildStatus
from endpoints.test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
def client_with_identity(auth_username, client):

View file

231
endpoints/oauth/login.py Normal file
View file

@ -0,0 +1,231 @@
import logging
from collections import namedtuple
from flask import request, redirect, url_for, Blueprint
from peewee import IntegrityError
import features
from app import app, analytics, get_app_url, oauth_login, authentication
from auth.auth_context import get_authenticated_user
from auth.process import require_session_login
from data import model
from endpoints.common import common_login
from endpoints.web import index
from endpoints.csrf import csrf_protect, OAUTH_CSRF_TOKEN_NAME
from oauth.login import OAuthLoginException
from util.validation import generate_valid_usernames
logger = logging.getLogger(__name__)
client = app.config['HTTPCLIENT']
oauthlogin = Blueprint('oauthlogin', __name__)
oauthlogin_csrf_protect = csrf_protect(OAUTH_CSRF_TOKEN_NAME, 'state', all_methods=True)
OAuthResult = namedtuple('oauthresult', ['user_obj', 'service_name', 'error_message',
'register_redirect'])
def _oauthresult(user_obj=None, service_name=None, error_message=None, register_redirect=False):
return OAuthResult(user_obj, service_name, error_message, register_redirect)
def _get_response(result):
if result.error_message is not None:
return _render_ologin_error(result.service_name, result.error_message, result.register_redirect)
return _perform_login(result.user_obj, result.service_name)
def _conduct_oauth_login(auth_system, login_service, lid, lusername, lemail, metadata=None):
""" Conducts login from the result of an OAuth service's login flow and returns
the status of the login, as well as the followup step. """
service_id = login_service.service_id()
service_name = login_service.service_name()
# Check for an existing account *bound to this service*. If found, conduct login of that account
# and redirect.
user_obj = model.user.verify_federated_login(service_id, lid)
if user_obj is not None:
return _oauthresult(user_obj=user_obj, service_name=service_name)
# If the login service has a bound field name, and we have a defined internal auth type that is
# not the database, then search for an existing account with that matching field. This allows
# users to setup SSO while also being backed by something like LDAP.
bound_field_name = login_service.login_binding_field()
if auth_system.federated_service is not None and bound_field_name is not None:
# Perform lookup.
logger.debug('Got oauth bind field name of "%s"', bound_field_name)
lookup_value = None
if bound_field_name == 'sub':
lookup_value = lid
elif bound_field_name == 'username':
lookup_value = lusername
elif bound_field_name == 'email':
lookup_value = lemail
if lookup_value is None:
logger.error('Missing lookup value for OAuth login')
return _oauthresult(service_name=service_name,
error_message='Configuration error in this provider')
(user_obj, err) = auth_system.link_user(lookup_value)
if err is not None:
logger.debug('%s %s not found: %s', bound_field_name, lookup_value, err)
msg = '%s %s not found in backing auth system' % (bound_field_name, lookup_value)
return _oauthresult(service_name=service_name, error_message=msg)
# Found an existing user. Bind their internal auth account to this service as well.
result = _attach_service(login_service, user_obj, lid, lusername)
if result.error_message is not None:
return result
return _oauthresult(user_obj=user_obj, service_name=service_name)
# Otherwise, we need to create a new user account.
if not features.USER_CREATION:
error_message = 'User creation is disabled. Please contact your administrator'
return _oauthresult(service_name=service_name, error_message=error_message)
# Try to create the user
try:
# Generate a valid username.
new_username = None
for valid in generate_valid_usernames(lusername):
if model.user.get_user_or_org(valid):
continue
new_username = valid
break
prompts = model.user.get_default_user_prompts(features)
user_obj = model.user.create_federated_user(new_username, lemail, service_id, lid,
set_password_notification=True,
metadata=metadata or {},
prompts=prompts,
email_required=features.MAILING)
# Success, tell analytics
analytics.track(user_obj.username, 'register', {'service': service_name.lower()})
return _oauthresult(user_obj=user_obj, service_name=service_name)
except model.InvalidEmailAddressException:
message = ("The e-mail address {0} is already associated "
"with an existing {1} account. \n"
"Please log in with your username and password and "
"associate your {2} account to use it in the future.")
message = message.format(lemail, app.config['REGISTRY_TITLE_SHORT'], service_name)
return _oauthresult(service_name=service_name, error_message=message,
register_redirect=True)
except model.DataModelException as ex:
return _oauthresult(service_name=service_name, error_message=ex.message)
def _render_ologin_error(service_name, error_message=None, register_redirect=False):
""" Returns a Flask response indicating an OAuth error. """
user_creation = bool(features.USER_CREATION and features.DIRECT_LOGIN)
error_info = {
'reason': 'ologinerror',
'service_name': service_name,
'error_message': error_message or 'Could not load user data. The token may have expired',
'service_url': get_app_url(),
'user_creation': user_creation,
'register_redirect': register_redirect,
}
resp = index('', error_info=error_info)
resp.status_code = 400
return resp
def _perform_login(user_obj, service_name):
""" Attempts to login the given user, returning the Flask result of whether the login succeeded.
"""
if common_login(user_obj):
if model.user.has_user_prompts(user_obj):
return redirect(url_for('web.updateuser'))
else:
return redirect(url_for('web.index'))
else:
return _render_ologin_error(service_name, 'Could not login. Account may be disabled')
def _attach_service(login_service, user_obj, lid, lusername):
""" Attaches the given user account to the given service, with the given service user ID and
service username.
"""
metadata = {
'service_username': lusername,
}
try:
model.user.attach_federated_login(user_obj, login_service.service_id(), lid,
metadata=metadata)
return _oauthresult(user_obj=user_obj)
except IntegrityError:
err = '%s account %s is already attached to a %s account' % (
login_service.service_name(), lusername, app.config['REGISTRY_TITLE_SHORT'])
return _oauthresult(service_name=login_service.service_name(), error_message=err)
def _register_service(login_service):
""" Registers the given login service, adding its callback and attach routes to the blueprint. """
@oauthlogin_csrf_protect
def callback_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, lemail = login_service.exchange_code_for_login(app.config, client, code, '')
except OAuthLoginException as ole:
logger.exception('Got login exception')
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct login.
metadata = {
'service_username': lusername,
}
result = _conduct_oauth_login(authentication, login_service, lid, lusername, lemail,
metadata=metadata)
return _get_response(result)
@require_session_login
@oauthlogin_csrf_protect
def attach_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, _ = login_service.exchange_code_for_login(app.config, client, code, '/attach')
except OAuthLoginException as ole:
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct attach.
user_obj = get_authenticated_user()
result = _attach_service(login_service, user_obj, lid, lusername)
if result.error_message is not None:
return _get_response(result)
return redirect(url_for('web.user_view', path=user_obj.username, tab='external'))
oauthlogin.add_url_rule('/%s/callback' % login_service.service_id(),
'%s_oauth_callback' % login_service.service_id(),
callback_func,
methods=['GET'])
oauthlogin.add_url_rule('/%s/callback/attach' % login_service.service_id(),
'%s_oauth_attach' % login_service.service_id(),
attach_func,
methods=['GET'])
# Register the routes for each of the login services.
for current_service in oauth_login.services:
_register_service(current_service)

View file

@ -0,0 +1,184 @@
import pytest
from endpoints.oauth.login import _conduct_oauth_login
from endpoints.test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
from oauth.services.github import GithubOAuthService
from data import model, database
from data.users import get_users_handler, DatabaseUsers
from test.test_ldap import mock_ldap
@pytest.fixture(params=[None, 'username', 'email'])
def login_service(request, app):
config = {'GITHUB': {}}
if request is not None:
config['GITHUB']['LOGIN_BINDING_FIELD'] = request.param
return GithubOAuthService(config, 'GITHUB')
@pytest.fixture(params=['Database', 'LDAP'])
def auth_system(request):
return _get_users_handler(request.param)
def _get_users_handler(auth_type):
config = {}
config['AUTHENTICATION_TYPE'] = auth_type
config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io']
config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io'
config['LDAP_ADMIN_PASSWD'] = 'password'
config['LDAP_USER_RDN'] = ['ou=employees']
return get_users_handler(config, None, None)
def test_existing_account(auth_system, login_service):
login_service_lid = 'someexternaluser'
# Create an existing bound federated user.
created_user = model.user.create_federated_user('someuser', 'example@example.com',
login_service.service_id(),
login_service_lid, False)
existing_user_count = database.User.select().count()
with mock_ldap():
result = _conduct_oauth_login(auth_system, login_service,
login_service_lid, login_service_lid,
'example@example.com')
assert result.user_obj == created_user
# Ensure that no addtional users were created.
current_user_count = database.User.select().count()
assert current_user_count == existing_user_count
def test_new_account_via_database(login_service):
existing_user_count = database.User.select().count()
login_service_lid = 'someexternaluser'
internal_auth = DatabaseUsers()
# Conduct login. Since the external user doesn't (yet) bind to a user in the database,
# a new user should be created and bound to the external service.
result = _conduct_oauth_login(internal_auth, login_service, login_service_lid, login_service_lid,
'example@example.com')
assert result.user_obj is not None
current_user_count = database.User.select().count()
assert current_user_count == existing_user_count + 1
# Find the user and ensure it is bound.
new_user = model.user.get_user(login_service_lid)
federated_login = model.user.lookup_federated_login(new_user, login_service.service_id())
assert federated_login is not None
@pytest.mark.parametrize('binding_field, lid, lusername, lemail, expected_error', [
# No binding field + newly seen user -> New unlinked user
(None, 'someid', 'someunknownuser', 'someemail@example.com', None),
# sub binding field + unknown sub -> Error.
('sub', 'someid', 'someuser', 'foo@bar.com',
'sub someid not found in backing auth system'),
# username binding field + unknown username -> Error.
('username', 'someid', 'someunknownuser', 'foo@bar.com',
'username someunknownuser not found in backing auth system'),
# email binding field + unknown email address -> Error.
('email', 'someid', 'someuser', 'someemail@example.com',
'email someemail@example.com not found in backing auth system'),
# No binding field + newly seen user -> New unlinked user.
(None, 'someid', 'someuser', 'foo@bar.com', None),
# username binding field + valid username -> fully bound user.
('username', 'someid', 'someuser', 'foo@bar.com', None),
# sub binding field + valid sub -> fully bound user.
('sub', 'someuser', 'someusername', 'foo@bar.com', None),
# email binding field + valid email -> fully bound user.
('email', 'someid', 'someuser', 'foo@bar.com', None),
# username binding field + valid username + invalid email -> fully bound user.
('username', 'someid', 'someuser', 'another@email.com', None),
# email binding field + valid email + invalid username -> fully bound user.
('email', 'someid', 'someotherusername', 'foo@bar.com', None),
])
def test_new_account_via_ldap(binding_field, lid, lusername, lemail, expected_error, app):
existing_user_count = database.User.select().count()
config = {'GITHUB': {}}
if binding_field is not None:
config['GITHUB']['LOGIN_BINDING_FIELD'] = binding_field
external_auth = GithubOAuthService(config, 'GITHUB')
internal_auth = _get_users_handler('LDAP')
with mock_ldap():
# Conduct OAuth login.
result = _conduct_oauth_login(internal_auth, external_auth, lid, lusername, lemail)
assert result.error_message == expected_error
current_user_count = database.User.select().count()
if expected_error is None:
# Ensure that the new user was created and that it is bound to both the
# external login service and to LDAP (if a binding_field was given).
assert current_user_count == existing_user_count + 1
assert result.user_obj is not None
# Check the service bindings.
external_login = model.user.lookup_federated_login(result.user_obj,
external_auth.service_id())
assert external_login is not None
internal_login = model.user.lookup_federated_login(result.user_obj,
internal_auth.federated_service)
if binding_field is not None:
assert internal_login is not None
else:
assert internal_login is None
else:
# Ensure that no addtional users were created.
assert current_user_count == existing_user_count
def test_existing_account_in_ldap(app):
config = {'GITHUB': {'LOGIN_BINDING_FIELD': 'username'}}
external_auth = GithubOAuthService(config, 'GITHUB')
internal_auth = _get_users_handler('LDAP')
# Add an existing federated user bound to the LDAP account associated with `someuser`.
bound_user = model.user.create_federated_user('someuser', 'foo@bar.com',
internal_auth.federated_service, 'someuser', False)
existing_user_count = database.User.select().count()
with mock_ldap():
# Conduct OAuth login with the same lid and bound field. This should find the existing LDAP
# user (via the `username` binding), and then bind Github to it as well.
result = _conduct_oauth_login(internal_auth, external_auth, bound_user.username,
bound_user.username, bound_user.email)
assert result.error_message is None
# Ensure that the same user was returned, and that it is now bound to the Github account
# as well.
assert result.user_obj.id == bound_user.id
# Ensure that no additional users were created.
current_user_count = database.User.select().count()
assert current_user_count == existing_user_count
# Check the service bindings.
external_login = model.user.lookup_federated_login(result.user_obj,
external_auth.service_id())
assert external_login is not None
internal_login = model.user.lookup_federated_login(result.user_obj,
internal_auth.federated_service)
assert internal_login is not None

View file

@ -1,169 +0,0 @@
import logging
import uuid
from flask import request, redirect, url_for, Blueprint
from peewee import IntegrityError
import features
from app import app, analytics, get_app_url, oauth_login
from auth.auth_context import get_authenticated_user
from auth.process import require_session_login
from data import model
from endpoints.common import common_login
from endpoints.web import index
from endpoints.csrf import csrf_protect, OAUTH_CSRF_TOKEN_NAME
from oauth.login import OAuthLoginException
from util.validation import generate_valid_usernames
logger = logging.getLogger(__name__)
client = app.config['HTTPCLIENT']
oauthlogin = Blueprint('oauthlogin', __name__)
oauthlogin_csrf_protect = csrf_protect(OAUTH_CSRF_TOKEN_NAME, 'state', all_methods=True)
def _render_ologin_error(service_name, error_message=None, register_redirect=False):
""" Returns a Flask response indicating an OAuth error. """
user_creation = bool(features.USER_CREATION and features.DIRECT_LOGIN)
error_info = {
'reason': 'ologinerror',
'service_name': service_name,
'error_message': error_message or 'Could not load user data. The token may have expired',
'service_url': get_app_url(),
'user_creation': user_creation,
'register_redirect': register_redirect,
}
resp = index('', error_info=error_info)
resp.status_code = 400
return resp
def _conduct_oauth_login(service_id, service_name, user_id, username, email, metadata=None):
""" Conducts login from the result of an OAuth service's login flow. """
to_login = model.user.verify_federated_login(service_id, user_id)
if not to_login:
# See if we can create a new user.
if not features.USER_CREATION:
error_message = 'User creation is disabled. Please contact your administrator'
return _render_ologin_error(service_name, error_message)
# Try to create the user
try:
# Generate a valid username.
new_username = None
for valid in generate_valid_usernames(username):
if model.user.get_user_or_org(valid):
continue
new_username = valid
break
# Generate a valid email. If the email is None and the MAILING feature is turned
# off, simply place in a fake email address.
if email is None and not features.MAILING:
email = '%s@fake.example.com' % (str(uuid.uuid4()))
prompts = model.user.get_default_user_prompts(features)
to_login = model.user.create_federated_user(new_username, email, service_id,
user_id, set_password_notification=True,
metadata=metadata or {},
prompts=prompts)
# Success, tell analytics
analytics.track(to_login.username, 'register', {'service': service_name.lower()})
except model.InvalidEmailAddressException:
message = "The e-mail address %s is already associated " % (email, )
message = message + "with an existing %s account." % (app.config['REGISTRY_TITLE_SHORT'], )
message = message + "\nPlease log in with your username and password and "
message = message + "associate your %s account to use it in the future." % (service_name, )
return _render_ologin_error(service_name, message, register_redirect=True)
except model.DataModelException as ex:
return _render_ologin_error(service_name, ex.message)
if common_login(to_login):
if model.user.has_user_prompts(to_login):
return redirect(url_for('web.updateuser'))
else:
return redirect(url_for('web.index'))
return _render_ologin_error(service_name)
def _register_service(login_service):
""" Registers the given login service, adding its callback and attach routes to the blueprint. """
@oauthlogin_csrf_protect
def callback_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, lemail = login_service.exchange_code_for_login(app.config, client, code, '')
except OAuthLoginException as ole:
logger.exception('Got login exception')
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct login.
metadata = {
'service_username': lusername
}
return _conduct_oauth_login(login_service.service_id(), login_service.service_name(), lid,
lusername, lemail, metadata=metadata)
@require_session_login
@oauthlogin_csrf_protect
def attach_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, _ = login_service.exchange_code_for_login(app.config, client, code, '/attach')
except OAuthLoginException as ole:
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct attach.
metadata = {
'service_username': lusername
}
user_obj = get_authenticated_user()
try:
model.user.attach_federated_login(user_obj, login_service.service_id(), lid,
metadata=metadata)
except IntegrityError:
err = '%s account %s is already attached to a %s account' % (
login_service.service_name(), lusername, app.config['REGISTRY_TITLE_SHORT'])
return _render_ologin_error(login_service.service_name(), err)
return redirect(url_for('web.user_view', path=user_obj.username, tab='external'))
oauthlogin.add_url_rule('/%s/callback' % login_service.service_id(),
'%s_oauth_callback' % login_service.service_id(),
callback_func,
methods=['GET'])
oauthlogin.add_url_rule('/%s/callback/attach' % login_service.service_id(),
'%s_oauth_attach' % login_service.service_id(),
attach_func,
methods=['GET'])
# Register the routes for each of the login services.
for current_service in oauth_login.services:
_register_service(current_service)

View file

View file

@ -11,8 +11,10 @@ from data import model
from data.database import (close_db_filter, db)
from data.model.user import LoginWrappedDBUser
from endpoints.api import api_bp
from endpoints.web import web
from initdb import initialize_database, populate_database
from path_converters import APIRepositoryPathConverter, RegexConverter
from path_converters import APIRepositoryPathConverter, RegexConverter, RepositoryPathConverter
@pytest.fixture()
@ -33,7 +35,9 @@ def app(appconfig):
app.url_map.converters['regex'] = RegexConverter
app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
app.url_map.converters['repopath'] = RepositoryPathConverter
app.register_blueprint(api_bp, url_prefix='/api')
app.register_blueprint(web, url_prefix='/')
app.config.update(appconfig)
return app

View file

@ -64,6 +64,17 @@ class OAuthService(object):
def client_secret(self):
return self.config.get('CLIENT_SECRET')
def login_binding_field(self):
""" Returns the name of the field (`username` or `email`) used for auto binding an external
login service account to an *internal* login service account. For example, if the external
login service is GitHub and the internal login service is LDAP, a value of `email` here
will cause login-with-Github to conduct a search (via email) in LDAP for a user, an auto
bind the external and internal users together. May return None, in which case no binding
is performing, and login with this external account will simply create a new account in the
database.
"""
return self.config.get('LOGIN_BINDING_FIELD', None)
def get_auth_url(self, app_config, redirect_suffix, csrf_token, scopes):
""" Retrieves the authorization URL for this login service. """
redirect_uri = '%s/oauth2/%s/callback%s' % (get_app_url(app_config), self.service_id(),

View file

@ -1,12 +1,15 @@
from oauth.services.github import GithubOAuthService
from oauth.services.google import GoogleOAuthService
from oauth.oidc import OIDCLoginService
from data.users import UserAuthentication
CUSTOM_LOGIN_SERVICES = {
'GITHUB_LOGIN_CONFIG': GithubOAuthService,
'GOOGLE_LOGIN_CONFIG': GoogleOAuthService,
}
PREFIX_BLACKLIST = ['ldap', 'jwt', 'keystone']
class OAuthLoginManager(object):
""" Helper class which manages all registered OAuth login services. """
def __init__(self, config):
@ -21,6 +24,10 @@ class OAuthLoginManager(object):
if custom_service.login_enabled(config):
self.services.append(custom_service)
else:
prefix = key.rstrip('_LOGIN_CONFIG').lower()
if prefix in PREFIX_BLACKLIST:
raise Exception('Cannot use reserved config name %s' % key)
self.services.append(OIDCLoginService(config, key))
def get_service(self, service_id):

View file

@ -9,10 +9,23 @@ from Crypto.PublicKey import RSA
from httmock import urlmatch, HTTMock
from jwkest.jwk import RSAKey
from app import app
from app import app, authentication
from data import model
from endpoints.oauthlogin import oauthlogin as oauthlogin_bp
from endpoints.oauth.login import oauthlogin as oauthlogin_bp
from test.test_endpoints import EndpointTestCase
from test.test_ldap import mock_ldap
class AuthForTesting(object):
def __init__(self, auth_engine):
self.auth_engine = auth_engine
self.existing_state = None
def __enter__(self):
self.existing_state = authentication.state
authentication.state = self.auth_engine
def __exit__(self, type, value, traceback):
authentication.state = self.existing_state
try:
app.register_blueprint(oauthlogin_bp, url_prefix='/oauth2')
@ -22,16 +35,18 @@ except ValueError:
class OAuthLoginTestCase(EndpointTestCase):
def invoke_oauth_tests(self, callback_endpoint, attach_endpoint, service_name, service_ident,
new_username):
new_username, test_attach=True):
# Test callback.
created = self.invoke_oauth_test(callback_endpoint, service_name, service_ident, new_username)
# Delete the created user.
self.assertNotEquals(created.username, 'devtable')
model.user.delete_user(created, [])
# Test attach.
self.login('devtable', 'password')
self.invoke_oauth_test(attach_endpoint, service_name, service_ident, 'devtable')
if test_attach:
self.login('devtable', 'password')
self.invoke_oauth_test(attach_endpoint, service_name, service_ident, 'devtable')
def invoke_oauth_test(self, endpoint_name, service_name, service_ident, username):
# No CSRF.
@ -111,7 +126,7 @@ class OAuthLoginTestCase(EndpointTestCase):
self.invoke_oauth_tests('github_oauth_callback', 'github_oauth_attach', 'github',
'someid', 'someusername')
def test_oidc_auth(self):
def _get_oidc_mocks(self):
private_key = RSA.generate(2048)
generatedjwk = RSAKey(key=private_key.publickey()).serialize()
kid = 'somekey'
@ -123,7 +138,7 @@ class OAuthLoginTestCase(EndpointTestCase):
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + 600),
'sub': 'cooluser',
'sub': 'cool.user',
}
token_headers = {
@ -143,7 +158,7 @@ class OAuthLoginTestCase(EndpointTestCase):
@urlmatch(netloc=r'fakeoidc', path='/user')
def user_handler(_, __):
content = {
'sub': 'cooluser',
'sub': 'cool.user',
'preferred_username': 'someusername',
'email': 'someemail@example.com',
'email_verified': True,
@ -169,9 +184,23 @@ class OAuthLoginTestCase(EndpointTestCase):
}
return py_json.dumps(content)
with HTTMock(discovery_handler, jwks_handler, token_handler, user_handler):
return (discovery_handler, jwks_handler, token_handler, user_handler)
def test_oidc_database_auth(self):
oidc_mocks = self._get_oidc_mocks()
with HTTMock(*oidc_mocks):
self.invoke_oauth_tests('testoidc_oauth_callback', 'testoidc_oauth_attach', 'testoidc',
'cooluser', 'someusername')
'cool.user', 'someusername')
def test_oidc_ldap_auth(self):
# Test with database auth.
oidc_mocks = self._get_oidc_mocks()
with mock_ldap() as ldap:
with AuthForTesting(ldap):
with HTTMock(*oidc_mocks):
self.invoke_oauth_tests('testoidc_oauth_callback', 'testoidc_oauth_attach', 'testoidc',
'cool.user', 'cool_user', test_attach=False)
if __name__ == '__main__':
unittest.main()

View file

@ -87,6 +87,7 @@ class TestConfig(DefaultConfig):
'CLIENT_SECRET': 'bar',
'OIDC_SERVER': 'http://fakeoidc',
'DEBUGGING': True,
'LOGIN_BINDING_FIELD': 'sub',
}
RECAPTCHA_SITE_KEY = 'somekey'

2
web.py
View file

@ -8,7 +8,7 @@ from endpoints.bitbuckettrigger import bitbuckettrigger
from endpoints.githubtrigger import githubtrigger
from endpoints.gitlabtrigger import gitlabtrigger
from endpoints.keyserver import key_server
from endpoints.oauthlogin import oauthlogin
from endpoints.oauth.login import oauthlogin
from endpoints.realtime import realtime
from endpoints.web import web
from endpoints.webhooks import webhooks