import ldap import logging import json import itertools import uuid import struct from util.aes import AESCipher from util.validation import generate_valid_usernames from data import model logger = logging.getLogger(__name__) class DatabaseUsers(object): def verify_user(self, username_or_email, password): """ Simply delegate to the model implementation. """ return model.verify_user(username_or_email, password) def user_exists(self, username): return model.get_user(username) is not None class LDAPConnection(object): def __init__(self, ldap_uri, user_dn, user_pw): self._ldap_uri = ldap_uri self._user_dn = user_dn self._user_pw = user_pw self._conn = None def __enter__(self): self._conn = ldap.initialize(self._ldap_uri, trace_level=1) self._conn.simple_bind_s(self._user_dn, self._user_pw) return self._conn def __exit__(self, exc_type, value, tb): self._conn.unbind_s() class LDAPUsers(object): def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr): self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd) self._ldap_uri = ldap_uri self._base_dn = base_dn self._user_rdn = user_rdn self._uid_attr = uid_attr self._email_attr = email_attr def _ldap_user_search(self, username_or_email): with self._ldap_conn as conn: logger.debug('Incoming username or email param: %s', username_or_email.__repr__()) user_search_dn = ','.join(self._user_rdn + self._base_dn) query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, username_or_email) user = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8')) if len(user) != 1: return None return user[0] def verify_user(self, username_or_email, password): """ Verify the credentials with LDAP and if they are valid, create or update the user in our database. """ # Make sure that even if the server supports anonymous binds, we don't allow it if not password: return None found_user = self._ldap_user_search(username_or_email) if found_user is None: return None found_dn, found_response = found_user # First validate the password by binding as the user try: with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')): pass except ldap.INVALID_CREDENTIALS: return None # Now check if we have a federated login for this user username = found_response[self._uid_attr][0].decode('utf-8') email = found_response[self._email_attr][0] db_user = model.verify_federated_login('ldap', username) if not db_user: # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): if model.is_username_unique(valid_username): break if not valid_username: logger.error('Unable to pick a username for user: %s', username) return None db_user = model.create_federated_user(valid_username, email, 'ldap', username, set_password_notification=False) else: # Update the db attributes from ldap db_user.email = email db_user.save() return db_user def user_exists(self, username): found_user = self._ldap_user_search(username) return found_user is not None class UserAuthentication(object): def __init__(self, app=None): self.app = app if app is not None: self.state = self.init_app(app) else: self.state = None def init_app(self, app): authentication_type = app.config.get('AUTHENTICATION_TYPE', 'Database') if authentication_type == 'Database': users = DatabaseUsers() elif authentication_type == 'LDAP': ldap_uri = app.config.get('LDAP_URI', 'ldap://localhost') base_dn = app.config.get('LDAP_BASE_DN') admin_dn = app.config.get('LDAP_ADMIN_DN') admin_passwd = app.config.get('LDAP_ADMIN_PASSWD') user_rdn = app.config.get('LDAP_USER_RDN', []) uid_attr = app.config.get('LDAP_UID_ATTR', 'uid') email_attr = app.config.get('LDAP_EMAIL_ATTR', 'mail') users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) else: raise RuntimeError('Unknown authentication type: %s' % authentication_type) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['authentication'] = users return users def _get_secret_key(self): """ Returns the secret key to use for encrypting and decrypting. """ from app import app app_secret_key = app.config['SECRET_KEY'] # First try parsing the key as an int. try: big_int = int(app_secret_key) secret_key = bytearray.fromhex('{:02x}'.format(big_int)) except ValueError: secret_key = app_secret_key # Next try parsing it as an UUID. try: secret_key = uuid.UUID(app_secret_key).bytes except ValueError: secret_key = app_secret_key # Otherwise, use the bytes directly. return ''.join(itertools.islice(itertools.cycle(secret_key), 32)) def encrypt_user_password(self, password): """ Returns an encrypted version of the user's password. """ data = { 'password': password } message = json.dumps(data) cipher = AESCipher(self._get_secret_key()) return cipher.encrypt(message) def _decrypt_user_password(self, encrypted): """ Attempts to decrypt the given password and returns it. """ cipher = AESCipher(self._get_secret_key()) try: message = cipher.decrypt(encrypted) except ValueError: return None except TypeError: return None try: data = json.loads(message) except ValueError: return None return data.get('password', encrypted) def verify_user(self, username_or_email, password, basic_auth=False): # First try to decode the password as a signed token. if basic_auth: import features decrypted = self._decrypt_user_password(password) if decrypted is None: # This is a normal password. if features.REQUIRE_ENCRYPTED_BASIC_AUTH: msg = ('Client login with passwords is disabled. Please generate a client token ' + 'and use it in place of your password.') return (None, msg) else: password = decrypted result = self.state.verify_user(username_or_email, password) if result: return (result, '') else: return (result, 'Invalid password.') def __getattr__(self, name): return getattr(self.state, name, None)