import logging from collections import namedtuple from data import model from util.validation import generate_valid_usernames logger = logging.getLogger(__name__) VerifiedCredentials = namedtuple('VerifiedCredentials', ['username', 'email']) class FederatedUsers(object): """ Base class for all federated users systems. """ def __init__(self, federated_service): self._federated_service = federated_service def verify_credentials(self, username_or_email, password): """ Verifies the given credentials against the backing federated service, returning a tuple containing a VerifiedCredentials (if success) and the error message (if failed). """ raise NotImplementedError def _get_federated_user(self, username, email): db_user = model.user.verify_federated_login(self._federated_service, username) if not db_user: # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): if model.user.is_username_unique(valid_username): break if not valid_username: logger.error('Unable to pick a username for user: %s', username) return (None, 'Unable to pick a username. Please report this to your administrator.') db_user = model.user.create_federated_user(valid_username, email, self._federated_service, username, set_password_notification=False) else: # Update the db attributes from the federated service. db_user.email = email db_user.save() return (db_user, None) def verify_and_link_user(self, username_or_email, password): """ Verifies the given credentials and, if valid, creates/links a database user to the associated federated service. """ (credentials, err_msg) = self.verify_credentials(username_or_email, password) if credentials is None: return (None, err_msg) return self._get_federated_user(credentials.username, credentials.email) def confirm_existing_user(self, username, password): """ Confirms that the given *database* username and service password are valid for the linked service. This method is used when the federated service's username is not known. """ db_user = model.user.get_user(username) if not db_user: return (None, 'Invalid user') federated_login = model.user.lookup_federated_login(db_user, self._federated_service) if not federated_login: return (None, 'Invalid user') (credentials, err_msg) = self.verify_credentials(federated_login.service_ident, password) if credentials is None: return (None, err_msg) return (db_user, None)