Refactor the users class into their own files, add a common base class for federated users and add a verify_credentials
method which only does the verification, without the linking. We use this in the superuser verification pass
This commit is contained in:
parent
1245385808
commit
33b54218cc
13 changed files with 541 additions and 495 deletions
71
data/users/federated.py
Normal file
71
data/users/federated.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
import logging
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from data import model
|
||||
from util.validation import generate_valid_usernames
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
VerifiedCredentials = namedtuple('VerifiedCredentials', ['username', 'email'])
|
||||
|
||||
class FederatedUsers(object):
|
||||
""" Base class for all federated users systems. """
|
||||
|
||||
def __init__(self, federated_service):
|
||||
self._federated_service = federated_service
|
||||
|
||||
def verify_credentials(self, username_or_email, password):
|
||||
""" Verifies the given credentials against the backing federated service, returning
|
||||
a tuple containing a VerifiedCredentials (if success) and the error message (if failed). """
|
||||
raise NotImplementedError
|
||||
|
||||
def _get_federated_user(self, username, email):
|
||||
db_user = model.user.verify_federated_login(self._federated_service, username)
|
||||
if not db_user:
|
||||
# We must create the user in our db
|
||||
valid_username = None
|
||||
for valid_username in generate_valid_usernames(username):
|
||||
if model.user.is_username_unique(valid_username):
|
||||
break
|
||||
|
||||
if not valid_username:
|
||||
logger.error('Unable to pick a username for user: %s', username)
|
||||
return (None, 'Unable to pick a username. Please report this to your administrator.')
|
||||
|
||||
db_user = model.user.create_federated_user(valid_username, email, self._federated_service,
|
||||
username, set_password_notification=False)
|
||||
else:
|
||||
# Update the db attributes from the federated service.
|
||||
db_user.email = email
|
||||
db_user.save()
|
||||
|
||||
return (db_user, None)
|
||||
|
||||
def verify_and_link_user(self, username_or_email, password):
|
||||
""" Verifies the given credentials and, if valid, creates/links a database user to the
|
||||
associated federated service.
|
||||
"""
|
||||
(credentials, err_msg) = self.verify_credentials(username_or_email, password)
|
||||
if credentials is None:
|
||||
return (None, err_msg)
|
||||
|
||||
return self._get_federated_user(credentials.username, credentials.email)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Confirms that the given *database* username and service password are valid for the linked
|
||||
service. This method is used when the federated service's username is not known.
|
||||
"""
|
||||
db_user = model.user.get_user(username)
|
||||
if not db_user:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
federated_login = model.user.lookup_federated_login(db_user, self._federated_service)
|
||||
if not federated_login:
|
||||
return (None, 'Invalid user')
|
||||
|
||||
(credentials, err_msg) = self.verify_credentials(federated_login.service_ident, password)
|
||||
if credentials is None:
|
||||
return (None, err_msg)
|
||||
|
||||
return (db_user, None)
|
Reference in a new issue