Move OAuth login into its own endpoints module
This commit is contained in:
parent
0167e1e7bf
commit
198bdf88bc
6 changed files with 32 additions and 31 deletions
0
endpoints/oauth/__init__.py
Normal file
0
endpoints/oauth/__init__.py
Normal file
|
@ -23,17 +23,17 @@ oauthlogin = Blueprint('oauthlogin', __name__)
|
|||
oauthlogin_csrf_protect = csrf_protect(OAUTH_CSRF_TOKEN_NAME, 'state', all_methods=True)
|
||||
|
||||
|
||||
OAuthResult = namedtuple('oauthresult', ['to_login', 'service_name', 'error_message',
|
||||
OAuthResult = namedtuple('oauthresult', ['user_obj', 'service_name', 'error_message',
|
||||
'register_redirect'])
|
||||
|
||||
def _oauthresult(to_login=None, service_name=None, error_message=None, register_redirect=False):
|
||||
return OAuthResult(to_login, service_name, error_message, register_redirect)
|
||||
def _oauthresult(user_obj=None, service_name=None, error_message=None, register_redirect=False):
|
||||
return OAuthResult(user_obj, service_name, error_message, register_redirect)
|
||||
|
||||
def _get_response(result):
|
||||
if result.error_message is not None:
|
||||
return _render_ologin_error(result.service_name, result.error_message, result.register_redirect)
|
||||
|
||||
return _perform_login(result.to_login, result.service_name)
|
||||
return _perform_login(result.user_obj, result.service_name)
|
||||
|
||||
def _conduct_oauth_login(auth_system, login_service, lid, lusername, lemail, metadata=None):
|
||||
""" Conducts login from the result of an OAuth service's login flow and returns
|
||||
|
@ -43,9 +43,9 @@ def _conduct_oauth_login(auth_system, login_service, lid, lusername, lemail, met
|
|||
|
||||
# Check for an existing account *bound to this service*. If found, conduct login of that account
|
||||
# and redirect.
|
||||
to_login = model.user.verify_federated_login(service_id, lid)
|
||||
if to_login is not None:
|
||||
return _oauthresult(to_login=to_login, service_name=service_name)
|
||||
user_obj = model.user.verify_federated_login(service_id, lid)
|
||||
if user_obj is not None:
|
||||
return _oauthresult(user_obj=user_obj, service_name=service_name)
|
||||
|
||||
# If the login service has a bound field name, and we have a defined internal auth type that is
|
||||
# not the database, then search for an existing account with that matching field. This allows
|
||||
|
@ -78,7 +78,7 @@ def _conduct_oauth_login(auth_system, login_service, lid, lusername, lemail, met
|
|||
if result.error_message is not None:
|
||||
return result
|
||||
|
||||
return _oauthresult(to_login=user_obj, service_name=service_name)
|
||||
return _oauthresult(user_obj=user_obj, service_name=service_name)
|
||||
|
||||
# Otherwise, we need to create a new user account.
|
||||
if not features.USER_CREATION:
|
||||
|
@ -97,21 +97,22 @@ def _conduct_oauth_login(auth_system, login_service, lid, lusername, lemail, met
|
|||
break
|
||||
|
||||
prompts = model.user.get_default_user_prompts(features)
|
||||
to_login = model.user.create_federated_user(new_username, lemail, service_id, lid,
|
||||
user_obj = model.user.create_federated_user(new_username, lemail, service_id, lid,
|
||||
set_password_notification=True,
|
||||
metadata=metadata or {},
|
||||
prompts=prompts,
|
||||
email_required=features.MAILING)
|
||||
|
||||
# Success, tell analytics
|
||||
analytics.track(to_login.username, 'register', {'service': service_name.lower()})
|
||||
return _oauthresult(to_login=to_login, service_name=service_name)
|
||||
analytics.track(user_obj.username, 'register', {'service': service_name.lower()})
|
||||
return _oauthresult(user_obj=user_obj, service_name=service_name)
|
||||
|
||||
except model.InvalidEmailAddressException:
|
||||
message = "The e-mail address %s is already associated " % (lemail, )
|
||||
message = message + "with an existing %s account." % (app.config['REGISTRY_TITLE_SHORT'], )
|
||||
message = message + "\nPlease log in with your username and password and "
|
||||
message = message + "associate your %s account to use it in the future." % (service_name, )
|
||||
message = ("The e-mail address {0} is already associated "
|
||||
"with an existing {1} account. \n"
|
||||
"Please log in with your username and password and "
|
||||
"associate your {2} account to use it in the future.")
|
||||
message = message.format(lemail, app.config['REGISTRY_TITLE_SHORT'], service_name)
|
||||
return _oauthresult(service_name=service_name, error_message=message,
|
||||
register_redirect=True)
|
||||
|
||||
|
@ -135,11 +136,11 @@ def _render_ologin_error(service_name, error_message=None, register_redirect=Fal
|
|||
resp.status_code = 400
|
||||
return resp
|
||||
|
||||
def _perform_login(to_login, service_name):
|
||||
def _perform_login(user_obj, service_name):
|
||||
""" Attempts to login the given user, returning the Flask result of whether the login succeeded.
|
||||
"""
|
||||
if common_login(to_login):
|
||||
if model.user.has_user_prompts(to_login):
|
||||
if common_login(user_obj):
|
||||
if model.user.has_user_prompts(user_obj):
|
||||
return redirect(url_for('web.updateuser'))
|
||||
else:
|
||||
return redirect(url_for('web.index'))
|
||||
|
@ -157,7 +158,7 @@ def _attach_service(login_service, user_obj, lid, lusername):
|
|||
try:
|
||||
model.user.attach_federated_login(user_obj, login_service.service_id(), lid,
|
||||
metadata=metadata)
|
||||
return _oauthresult(to_login=user_obj)
|
||||
return _oauthresult(user_obj=user_obj)
|
||||
except IntegrityError:
|
||||
err = '%s account %s is already attached to a %s account' % (
|
||||
login_service.service_name(), lusername, app.config['REGISTRY_TITLE_SHORT'])
|
|
@ -1,6 +1,6 @@
|
|||
import pytest
|
||||
|
||||
from endpoints.oauthlogin import _conduct_oauth_login
|
||||
from endpoints.oauth.login import _conduct_oauth_login
|
||||
from endpoints.test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file
|
||||
|
||||
from oauth.services.github import GithubOAuthService
|
||||
|
@ -47,7 +47,7 @@ def test_existing_account(auth_system, login_service):
|
|||
login_service_lid, login_service_lid,
|
||||
'example@example.com')
|
||||
|
||||
assert result.to_login == created_user
|
||||
assert result.user_obj == created_user
|
||||
|
||||
# Ensure that no addtional users were created.
|
||||
current_user_count = database.User.select().count()
|
||||
|
@ -63,7 +63,7 @@ def test_new_account_via_database(login_service):
|
|||
# a new user should be created and bound to the external service.
|
||||
result = _conduct_oauth_login(internal_auth, login_service, login_service_lid, login_service_lid,
|
||||
'example@example.com')
|
||||
assert result.to_login is not None
|
||||
assert result.user_obj is not None
|
||||
|
||||
current_user_count = database.User.select().count()
|
||||
assert current_user_count == existing_user_count + 1
|
||||
|
@ -128,14 +128,14 @@ def test_new_account_via_ldap(binding_field, lid, lusername, lemail, expected_er
|
|||
# Ensure that the new user was created and that it is bound to both the
|
||||
# external login service and to LDAP (if a binding_field was given).
|
||||
assert current_user_count == existing_user_count + 1
|
||||
assert result.to_login is not None
|
||||
assert result.user_obj is not None
|
||||
|
||||
# Check the service bindings.
|
||||
external_login = model.user.lookup_federated_login(result.to_login,
|
||||
external_login = model.user.lookup_federated_login(result.user_obj,
|
||||
external_auth.service_id())
|
||||
assert external_login is not None
|
||||
|
||||
internal_login = model.user.lookup_federated_login(result.to_login,
|
||||
internal_login = model.user.lookup_federated_login(result.user_obj,
|
||||
internal_auth.federated_service)
|
||||
if binding_field is not None:
|
||||
assert internal_login is not None
|
||||
|
@ -168,17 +168,17 @@ def test_existing_account_in_ldap(app):
|
|||
|
||||
# Ensure that the same user was returned, and that it is now bound to the Github account
|
||||
# as well.
|
||||
assert result.to_login.id == bound_user.id
|
||||
assert result.user_obj.id == bound_user.id
|
||||
|
||||
# Ensure that no additional users were created.
|
||||
current_user_count = database.User.select().count()
|
||||
assert current_user_count == existing_user_count
|
||||
|
||||
# Check the service bindings.
|
||||
external_login = model.user.lookup_federated_login(result.to_login,
|
||||
external_login = model.user.lookup_federated_login(result.user_obj,
|
||||
external_auth.service_id())
|
||||
assert external_login is not None
|
||||
|
||||
internal_login = model.user.lookup_federated_login(result.to_login,
|
||||
internal_login = model.user.lookup_federated_login(result.user_obj,
|
||||
internal_auth.federated_service)
|
||||
assert internal_login is not None
|
|
@ -24,7 +24,7 @@ class OAuthLoginManager(object):
|
|||
if custom_service.login_enabled(config):
|
||||
self.services.append(custom_service)
|
||||
else:
|
||||
prefix = key[0:len(key) - len('_LOGIN_CONFIG')].lower()
|
||||
prefix = key.rstrip('_LOGIN_CONFIG').lower()
|
||||
if prefix in PREFIX_BLACKLIST:
|
||||
raise Exception('Cannot use reserved config name %s' % key)
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ from jwkest.jwk import RSAKey
|
|||
|
||||
from app import app, authentication
|
||||
from data import model
|
||||
from endpoints.oauthlogin import oauthlogin as oauthlogin_bp
|
||||
from endpoints.oauth.login import oauthlogin as oauthlogin_bp
|
||||
from test.test_endpoints import EndpointTestCase
|
||||
from test.test_ldap import mock_ldap
|
||||
|
||||
|
|
2
web.py
2
web.py
|
@ -8,7 +8,7 @@ from endpoints.bitbuckettrigger import bitbuckettrigger
|
|||
from endpoints.githubtrigger import githubtrigger
|
||||
from endpoints.gitlabtrigger import gitlabtrigger
|
||||
from endpoints.keyserver import key_server
|
||||
from endpoints.oauthlogin import oauthlogin
|
||||
from endpoints.oauth.login import oauthlogin
|
||||
from endpoints.realtime import realtime
|
||||
from endpoints.web import web
|
||||
from endpoints.webhooks import webhooks
|
||||
|
|
Reference in a new issue