import logging from collections import namedtuple from data import model from util.validation import generate_valid_usernames logger = logging.getLogger(__name__) UserInformation = namedtuple('UserInformation', ['username', 'email', 'id']) class FederatedUsers(object): """ Base class for all federated users systems. """ def __init__(self, federated_service, requires_email): self._federated_service = federated_service self._requires_email = requires_email @property def federated_service(self): return self._federated_service def get_user(self, username_or_email): """ Retrieves the user with the given username or email, returning a tuple containing a UserInformation (if success) and the error message (on failure). """ raise NotImplementedError def verify_credentials(self, username_or_email, password): """ Verifies the given credentials against the backing federated service, returning a tuple containing a UserInformation (on success) and the error message (on failure). """ raise NotImplementedError def query_users(self, query, limit=20): """ If implemented, get_user must be implemented as well. """ return (None, 'Not supported') def _get_federated_user(self, username, email): db_user = model.user.verify_federated_login(self._federated_service, username) if not db_user: # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): if model.user.is_username_unique(valid_username): break if not valid_username: logger.error('Unable to pick a username for user: %s', username) return (None, 'Unable to pick a username. Please report this to your administrator.') db_user = model.user.create_federated_user(valid_username, email, self._federated_service, username, set_password_notification=False, email_required=self._requires_email) else: # Update the db attributes from the federated service. if email: db_user.email = email db_user.save() return (db_user, None) def link_user(self, username_or_email): (credentials, err_msg) = self.get_user(username_or_email) if credentials is None: return (None, err_msg) return self._get_federated_user(credentials.username, credentials.email) def verify_and_link_user(self, username_or_email, password): """ Verifies the given credentials and, if valid, creates/links a database user to the associated federated service. """ (credentials, err_msg) = self.verify_credentials(username_or_email, password) if credentials is None: return (None, err_msg) return self._get_federated_user(credentials.username, credentials.email) def confirm_existing_user(self, username, password): """ Confirms that the given *database* username and service password are valid for the linked service. This method is used when the federated service's username is not known. """ db_user = model.user.get_user(username) if not db_user: return (None, 'Invalid user') federated_login = model.user.lookup_federated_login(db_user, self._federated_service) if not federated_login: return (None, 'Invalid user') (credentials, err_msg) = self.verify_credentials(federated_login.service_ident, password) if credentials is None: return (None, err_msg) return (db_user, None)