import ldap import logging logger = logging.getLogger(__name__) class DatabaseUsers(object): def __init__(self, app_db): self._app_db = app_db def verify_user(self, username_or_email, password): """ Simply delegate to the model implementation. """ return self._app_db.verify_user(username_or_email, password) class LDAPConnection(object): def __init__(self, ldap_uri, user_dn, user_pw): self._ldap_uri = ldap_uri self._user_dn = user_dn self._user_pw = user_pw self._conn = None def __enter__(self): self._conn = ldap.initialize(self._ldap_uri) self._conn.simple_bind_s(self._user_dn, self._user_pw) return self._conn def __exit__(self, exc_type, value, tb): self._conn.unbind_s() class LDAPUsers(object): def __init__(self, app_db, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, passwd_attr): self._app_db = app_db self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd) self._base_dn = base_dn self._user_rdn = user_rdn self._uid_attr = uid_attr self._email_attr = email_attr self._passwd_attr = passwd_attr def verify_user(self, username_or_email, password): """ Verify the credentials with LDAP and if they are valid, create or update the user in our database. """ with self._ldap_conn as conn: user_search_dn = ','.join(self._user_rdn + self._base_dn) query = '(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, username_or_email) user = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query) if len(user) != 1: return None found_dn, found_response = user[0] # First validate the password valid_passwd = conn.compare_s(found_dn, self._passwd_attr, password) == 1 if not valid_passwd: return None logger.debug('LDAP Response: %s', found_response) # Now check if we have the same username in our DB username = found_response[self._uid_attr][0] email = found_response[self._email_attr][0] password = found_response[self._passwd_attr][0] db_user = self._app_db.get_user(username) logger.debug('Email: %s', email) if not db_user: # We must create the user in our db db_user = self._app_db.create_user(username, 'password_from_ldap', email) db_user.verified = True else: # Update the db attributes from ldap db_user.email = email db_user.save() return db_user class UserAuthentication(object): def __init__(self, app=None, model=None): self.app = app if app is not None: self.state = self.init_app(app, model) else: self.state = None def init_app(self, app, model): authentication_type = app.config.get('AUTHENTICATION_TYPE', 'Database') if authentication_type == 'Database': users = DatabaseUsers(model) elif authentication_type == 'LDAP': ldap_uri = app.config.get('LDAP_URI', 'ldap://localhost') base_dn = app.config.get('LDAP_BASE_DN') admin_dn = app.config.get('LDAP_ADMIN_DN') admin_passwd = app.config.get('LDAP_ADMIN_PASSWD') user_rdn = app.config.get('LDAP_USER_RDN', []) uid_attr = app.config.get('LDAP_UID_ATTR', 'uid') email_attr = app.config.get('LDAP_EMAIL_ATTR', 'mail') passwd_attr = app.config.get('LDAP_PASSWD_ATTR', 'userPassword') users = LDAPUsers(model, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, passwd_attr) else: raise RuntimeError('Unknown authentication type: %s' % authentication_type) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['authentication'] = users return users def __getattr__(self, name): return getattr(self.state, name, None)