Merge pull request #128 from coreos-inc/ldaperrormsg
Have LDAP return a better error message if it fails to connect
This commit is contained in:
		
						commit
						97fba84a95
					
				
					 2 changed files with 35 additions and 9 deletions
				
			
		|  | @ -143,7 +143,12 @@ class LDAPConnection(object): | |||
|     trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0 | ||||
|     self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) | ||||
|     self._conn.set_option(ldap.OPT_REFERRALS, 1) | ||||
|     self._conn.simple_bind_s(self._user_dn, self._user_pw) | ||||
| 
 | ||||
|     try: | ||||
|       self._conn.simple_bind_s(self._user_dn, self._user_pw) | ||||
|     except ldap.INVALID_CREDENTIALS: | ||||
|       logger.exception('LDAP admin dn or password are invalid') | ||||
|       return None | ||||
| 
 | ||||
|     return self._conn | ||||
| 
 | ||||
|  | @ -183,6 +188,9 @@ class LDAPUsers(object): | |||
| 
 | ||||
|   def _ldap_user_search(self, username_or_email): | ||||
|     with self._ldap_conn as conn: | ||||
|       if conn is None: | ||||
|         return (None, 'LDAP Admin dn or password is invalid') | ||||
| 
 | ||||
|       logger.debug('Incoming username or email param: %s', username_or_email.__repr__()) | ||||
|       user_search_dn = ','.join(self._user_rdn + self._base_dn) | ||||
|       query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, | ||||
|  | @ -194,18 +202,18 @@ class LDAPUsers(object): | |||
|       except ldap.REFERRAL as re: | ||||
|         referral_dn = self._get_ldap_referral_dn(re) | ||||
|         if not referral_dn: | ||||
|           return None | ||||
|           return (None, 'Failed to follow referral when looking up username') | ||||
| 
 | ||||
|         try: | ||||
|           subquery = u'(%s=%s)' % (self._uid_attr, username_or_email) | ||||
|           pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery) | ||||
|         except ldap.LDAPError: | ||||
|           logger.exception('LDAP referral search exception') | ||||
|           return None | ||||
|           return (None, 'Username not found') | ||||
| 
 | ||||
|       except ldap.LDAPError: | ||||
|         logger.exception('LDAP search exception') | ||||
|         return None | ||||
|         return (None, 'Username not found') | ||||
| 
 | ||||
|       logger.debug('Found matching pairs: %s', pairs) | ||||
| 
 | ||||
|  | @ -215,16 +223,16 @@ class LDAPUsers(object): | |||
|       # pairs. | ||||
|       with_dns = [result for result in results if result.dn] | ||||
|       if len(with_dns) < 1: | ||||
|         return None | ||||
|         return (None, 'Username not found') | ||||
| 
 | ||||
|       # If we have found a single pair, then return it. | ||||
|       if len(with_dns) == 1: | ||||
|         return with_dns[0] | ||||
|         return (with_dns[0], None) | ||||
| 
 | ||||
|       # Otherwise, there are multiple pairs with DNs, so find the one with the mail | ||||
|       # attribute (if any). | ||||
|       with_mail = [result for result in results if result.attrs.get(self._email_attr)] | ||||
|       return with_mail[0] if with_mail else with_dns[0] | ||||
|       return (with_mail[0] if with_mail else with_dns[0], None) | ||||
| 
 | ||||
|   def confirm_existing_user(self, username, password): | ||||
|     """ Verify the username and password by looking up the *LDAP* username and confirming the | ||||
|  | @ -248,9 +256,9 @@ class LDAPUsers(object): | |||
|     if not password: | ||||
|       return (None, 'Anonymous binding not allowed') | ||||
| 
 | ||||
|     found_user = self._ldap_user_search(username_or_email) | ||||
|     (found_user, err_msg) = self._ldap_user_search(username_or_email) | ||||
|     if found_user is None: | ||||
|       return (None, 'Username not found') | ||||
|       return (None, err_msg) | ||||
| 
 | ||||
|     found_dn, found_response = found_user | ||||
|     logger.debug('Found user for LDAP username %s; validating password', username_or_email) | ||||
|  |  | |||
|  | @ -85,6 +85,24 @@ class TestLDAP(unittest.TestCase): | |||
|     finished_database_for_testing(self) | ||||
|     self.ctx.__exit__(True, None, None) | ||||
| 
 | ||||
|   def test_invalid_admin_password(self): | ||||
|     base_dn = ['dc=quay', 'dc=io'] | ||||
|     admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' | ||||
|     admin_passwd = 'INVALIDPASSWORD' | ||||
|     user_rdn = ['ou=employees'] | ||||
|     uid_attr = 'uid' | ||||
|     email_attr = 'mail' | ||||
| 
 | ||||
|     ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, | ||||
|                      uid_attr, email_attr) | ||||
| 
 | ||||
|     self.ldap = ldap | ||||
| 
 | ||||
|     # Try to login. | ||||
|     (response, err_msg) = self.ldap.verify_user('someuser', 'somepass') | ||||
|     self.assertIsNone(response) | ||||
|     self.assertEquals('LDAP Admin dn or password is invalid', err_msg) | ||||
| 
 | ||||
|   def test_login(self): | ||||
|     # Verify we can login. | ||||
|     (response, _) = self.ldap.verify_user('someuser', 'somepass') | ||||
|  |  | |||
		Reference in a new issue