diff --git a/data/users/externalldap.py b/data/users/externalldap.py index 9a488b283..1fef02396 100644 --- a/data/users/externalldap.py +++ b/data/users/externalldap.py @@ -9,6 +9,16 @@ from data.users.federated import FederatedUsers, VerifiedCredentials logger = logging.getLogger(__name__) +class LDAPConnectionBuilder(object): + def __init__(self, ldap_uri, user_dn, user_pw): + self._ldap_uri = ldap_uri + self._user_dn = user_dn + self._user_pw = user_pw + + def get_connection(self): + return LDAPConnection(self._ldap_uri, self._user_dn, self._user_pw) + + class LDAPConnection(object): def __init__(self, ldap_uri, user_dn, user_pw): self._ldap_uri = ldap_uri @@ -20,13 +30,7 @@ class LDAPConnection(object): trace_level = 2 if os.environ.get('USERS_DEBUG') == '1' else 0 self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) self._conn.set_option(ldap.OPT_REFERRALS, 1) - - try: - self._conn.simple_bind_s(self._user_dn, self._user_pw) - except ldap.INVALID_CREDENTIALS: - logger.exception('LDAP admin dn or password are invalid') - return None - + self._conn.simple_bind_s(self._user_dn, self._user_pw) return self._conn def __exit__(self, exc_type, value, tb): @@ -38,7 +42,7 @@ class LDAPUsers(FederatedUsers): def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr): super(LDAPUsers, self).__init__('ldap') - self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd) + self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd) self._ldap_uri = ldap_uri self._base_dn = base_dn self._user_rdn = user_rdn @@ -65,10 +69,15 @@ class LDAPUsers(FederatedUsers): return referral_dn def _ldap_user_search(self, username_or_email): - with self._ldap_conn as conn: - if conn is None: - return (None, 'LDAP Admin dn or password is invalid') + # Verify the admin connection works first. We do this here to avoid wrapping + # the entire block in the INVALID CREDENTIALS check. + try: + with self._ldap.get_connection(): + pass + except ldap.INVALID_CREDENTIALS: + return (None, 'LDAP Admin dn or password is invalid') + with self._ldap.get_connection() as conn: logger.debug('Incoming username or email param: %s', username_or_email.__repr__()) user_search_dn = ','.join(self._user_rdn + self._base_dn) query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, diff --git a/test/test_external_jwt_authn.py b/test/test_external_jwt_authn.py index b911e8078..0352e8822 100644 --- a/test/test_external_jwt_authn.py +++ b/test/test_external_jwt_authn.py @@ -114,6 +114,9 @@ class JWTAuthTestCase(LiveServerTestCase): self.assertIsNotNone(result) # Confirm a user with the same internal and external username. + result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword') + self.assertIsNone(result) + result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password') self.assertIsNotNone(result) self.assertEquals('cooluser', result.username) diff --git a/test/test_ldap.py b/test/test_ldap.py index df8342620..3c1252f05 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -112,6 +112,17 @@ class TestLDAP(unittest.TestCase): (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') + def test_invalid_password(self): + # Verify we cannot login with an invalid password. + (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Invalid password') + + # Verify we cannot confirm the user. + (response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Invalid user') + def test_missing_mail(self): (response, err_msg) = self.ldap.verify_and_link_user('nomail', 'somepass') self.assertIsNone(response)