Fix setup tool when binding to external auth

We now query the external auth provider for the external service's identifier before adding the linking row into the database. This fixes the case where the external service resolves a different identifier for the same username.

Fixes #1477
This commit is contained in:
Joseph Schorr 2016-05-23 15:08:51 -04:00
parent d6b73a41de
commit 60bbca2185
6 changed files with 151 additions and 62 deletions

View file

@ -305,16 +305,7 @@ def list_entity_robot_permission_teams(entity_name, include_permissions=False):
return TupleSelector(query, fields)
def confirm_attached_federated_login(user, service_name):
""" Verifies that the given user has a federated service identity for the specified service.
If none found, a row is added for that service and user.
"""
with db_transaction():
if not lookup_federated_login(user, service_name):
attach_federated_login(user, service_name, user.username)
def create_federated_user(username, email, service_name, service_id,
def create_federated_user(username, email, service_name, service_ident,
set_password_notification, metadata={}):
if not is_create_user_allowed():
raise TooManyUsersException()
@ -325,7 +316,7 @@ def create_federated_user(username, email, service_name, service_id,
service = LoginService.get(LoginService.name == service_name)
FederatedLogin.create(user=new_user, service=service,
service_ident=service_id,
service_ident=service_ident,
metadata_json=json.dumps(metadata))
if set_password_notification:
@ -334,20 +325,20 @@ def create_federated_user(username, email, service_name, service_id,
return new_user
def attach_federated_login(user, service_name, service_id, metadata={}):
def attach_federated_login(user, service_name, service_ident, metadata={}):
service = LoginService.get(LoginService.name == service_name)
FederatedLogin.create(user=user, service=service, service_ident=service_id,
FederatedLogin.create(user=user, service=service, service_ident=service_ident,
metadata_json=json.dumps(metadata))
return user
def verify_federated_login(service_name, service_id):
def verify_federated_login(service_name, service_ident):
try:
found = (FederatedLogin
.select(FederatedLogin, User)
.join(LoginService)
.switch(FederatedLogin).join(User)
.where(FederatedLogin.service_ident == service_id, LoginService.name == service_name)
.where(FederatedLogin.service_ident == service_ident, LoginService.name == service_name)
.get())
return found.user
except FederatedLogin.DoesNotExist: