Have LDAP return a better error message if it fails to connect
Currently, the error results in a 500 being raised when a user tries to login.
This commit is contained in:
parent
4aae3b870a
commit
e7915baf8c
2 changed files with 35 additions and 9 deletions
|
@ -143,7 +143,12 @@ class LDAPConnection(object):
|
|||
trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0
|
||||
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
|
||||
self._conn.set_option(ldap.OPT_REFERRALS, 1)
|
||||
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
||||
|
||||
try:
|
||||
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
logger.exception('LDAP admin dn or password are invalid')
|
||||
return None
|
||||
|
||||
return self._conn
|
||||
|
||||
|
@ -183,6 +188,9 @@ class LDAPUsers(object):
|
|||
|
||||
def _ldap_user_search(self, username_or_email):
|
||||
with self._ldap_conn as conn:
|
||||
if conn is None:
|
||||
return (None, 'LDAP Admin dn or password is invalid')
|
||||
|
||||
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
||||
user_search_dn = ','.join(self._user_rdn + self._base_dn)
|
||||
query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr,
|
||||
|
@ -194,18 +202,18 @@ class LDAPUsers(object):
|
|||
except ldap.REFERRAL as re:
|
||||
referral_dn = self._get_ldap_referral_dn(re)
|
||||
if not referral_dn:
|
||||
return None
|
||||
return (None, 'Failed to follow referral when looking up username')
|
||||
|
||||
try:
|
||||
subquery = u'(%s=%s)' % (self._uid_attr, username_or_email)
|
||||
pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery)
|
||||
except ldap.LDAPError:
|
||||
logger.exception('LDAP referral search exception')
|
||||
return None
|
||||
return (None, 'Username not found')
|
||||
|
||||
except ldap.LDAPError:
|
||||
logger.exception('LDAP search exception')
|
||||
return None
|
||||
return (None, 'Username not found')
|
||||
|
||||
logger.debug('Found matching pairs: %s', pairs)
|
||||
|
||||
|
@ -215,16 +223,16 @@ class LDAPUsers(object):
|
|||
# pairs.
|
||||
with_dns = [result for result in results if result.dn]
|
||||
if len(with_dns) < 1:
|
||||
return None
|
||||
return (None, 'Username not found')
|
||||
|
||||
# If we have found a single pair, then return it.
|
||||
if len(with_dns) == 1:
|
||||
return with_dns[0]
|
||||
return (with_dns[0], None)
|
||||
|
||||
# Otherwise, there are multiple pairs with DNs, so find the one with the mail
|
||||
# attribute (if any).
|
||||
with_mail = [result for result in results if result.attrs.get(self._email_attr)]
|
||||
return with_mail[0] if with_mail else with_dns[0]
|
||||
return (with_mail[0] if with_mail else with_dns[0], None)
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verify the username and password by looking up the *LDAP* username and confirming the
|
||||
|
@ -248,9 +256,9 @@ class LDAPUsers(object):
|
|||
if not password:
|
||||
return (None, 'Anonymous binding not allowed')
|
||||
|
||||
found_user = self._ldap_user_search(username_or_email)
|
||||
(found_user, err_msg) = self._ldap_user_search(username_or_email)
|
||||
if found_user is None:
|
||||
return (None, 'Username not found')
|
||||
return (None, err_msg)
|
||||
|
||||
found_dn, found_response = found_user
|
||||
logger.debug('Found user for LDAP username %s; validating password', username_or_email)
|
||||
|
|
|
@ -85,6 +85,24 @@ class TestLDAP(unittest.TestCase):
|
|||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
def test_invalid_admin_password(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'INVALIDPASSWORD'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
self.ldap = ldap
|
||||
|
||||
# Try to login.
|
||||
(response, err_msg) = self.ldap.verify_user('someuser', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals('LDAP Admin dn or password is invalid', err_msg)
|
||||
|
||||
def test_login(self):
|
||||
# Verify we can login.
|
||||
(response, _) = self.ldap.verify_user('someuser', 'somepass')
|
||||
|
|
Reference in a new issue