Fix LDAP referral and multiple pair handling

Fixes two issues found with our LDAP handling code. First, we now follow referrals in both LDAP calls, as some LDAP systems will return a referral instead of the original record. Second, we now make sure to handle multiple search result pairs properly by further filtering based on the presence of the 'mail' attribute when we have multiple valid pairs. This CL also adds tests for all of the above cases.
This commit is contained in:
Joseph Schorr 2015-05-26 15:46:35 -04:00
parent 0633db973e
commit 1aff701bc7
4 changed files with 113 additions and 49 deletions

View file

@ -63,6 +63,25 @@ class LDAPUsers(object):
self._uid_attr = uid_attr
self._email_attr = email_attr
def _get_ldap_referral_dn(self, referral_exception):
logger.debug('Got referral: %s', referral_exception.args[0])
if not referral_exception.args[0] or not referral_exception.args[0].get('info'):
logger.debug('LDAP referral missing info block')
return None
referral_info = referral_exception.args[0]['info']
if not referral_info.startswith('Referral:\n'):
logger.debug('LDAP referral missing Referral header')
return None
referral_uri = referral_info[len('Referral:\n'):]
if not referral_uri.startswith('ldap:///'):
logger.debug('LDAP referral URI does not start with ldap:///')
return None
referral_dn = referral_uri[len('ldap:///'):]
return referral_dn
def _ldap_user_search(self, username_or_email):
with self._ldap_conn as conn:
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
@ -73,20 +92,38 @@ class LDAPUsers(object):
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
try:
pairs = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8'))
except ldap.REFERRAL as re:
referral_dn = self._get_ldap_referral_dn(re)
if not referral_dn:
return None
try:
subquery = u'(%s=%s)' % (self._uid_attr, username_or_email)
pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery)
except ldap.LDAPError:
logger.exception('LDAP referral search exception')
return None
except ldap.LDAPError:
logger.exception('LDAP search exception')
return None
logger.debug('Found matching pairs: %s', pairs)
if len(pairs) < 1:
# Filter out pairs without DNs. Some LDAP impls will return such
# pairs.
with_dns = [pair for pair in pairs if pair[0]]
if len(with_dns) < 1:
return None
for pair in pairs:
if pair[0] is not None:
logger.debug('Found user: %s', pair)
return pair
# If we have found a single pair, then return it.
if len(with_dns) == 1:
return with_dns[0]
return None
# Otherwise, there are multiple pairs with DNs, so find the one with the mail
# attribute (if any).
with_mail = [pair for pair in pairs if pair[1].get(self._email_attr)]
return with_mail[0] if with_mail else with_dns[0]
def confirm_existing_user(self, username, password):
""" Verify the username and password by looking up the *LDAP* username and confirming the
@ -111,17 +148,29 @@ class LDAPUsers(object):
return (None, 'Anonymous binding not allowed')
found_user = self._ldap_user_search(username_or_email)
if found_user is None:
return (None, 'Username not found')
found_dn, found_response = found_user
logger.debug('Found user for LDAP username %s; validating password', username_or_email)
logger.debug('DN %s found: %s', found_dn, found_response)
# First validate the password by binding as the user
logger.debug('Found user %s; validating password', username_or_email)
try:
with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')):
pass
except ldap.REFERRAL as re:
referral_dn = self._get_ldap_referral_dn(re)
if not referral_dn:
return (None, 'Invalid username')
try:
with LDAPConnection(self._ldap_uri, referral_dn, password.encode('utf-8')):
pass
except ldap.INVALID_CREDENTIALS:
logger.exception('Invalid LDAP credentials')
return (None, 'Invalid password')
except ldap.INVALID_CREDENTIALS:
logger.exception('Invalid LDAP credentials')
return (None, 'Invalid password')

View file

@ -44,11 +44,11 @@ git+https://github.com/DevTable/container-cloud-config.git
git+https://github.com/DevTable/python-etcd.git
git+https://github.com/coreos/py-bitbucket.git
git+https://github.com/coreos/pyapi-gitlab.git
git+https://github.com/coreos/mockldap.git
gipc
pyOpenSSL
pygpgme
cachetools
mock
psutil
stringscore
mockldap
stringscore

View file

@ -37,7 +37,6 @@ jsonschema==2.4.0
marisa-trie==0.7
mixpanel-py==3.2.1
mock==1.0.1
mockldap==0.2.4
paramiko==1.15.2
peewee==2.4.7
psutil==2.2.1
@ -74,3 +73,4 @@ git+https://github.com/DevTable/python-etcd.git
git+https://github.com/NateFerrero/oauth2lib.git
git+https://github.com/coreos/py-bitbucket.git
git+https://github.com/coreos/pyapi-gitlab.git
git+https://github.com/coreos/mockldap.git

View file

@ -42,76 +42,91 @@ class TestLDAP(unittest.TestCase):
'uid=cool.user,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user'],
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
}
},
'uid=referred,ou=employees,dc=quay,dc=io': {
'uid': ['referred'],
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
},
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
'uid': ['invalidreferred'],
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
},
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'mail': ['foo@bar.com'],
'userPassword': ['somepass'],
},
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'another': ['key']
},
})
self.mockldap.start()
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
self.ldap = ldap
def tearDown(self):
self.mockldap.stop()
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def test_login(self):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
# Verify we can login.
(response, _) = ldap.verify_user('someuser', 'somepass')
(response, _) = self.ldap.verify_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can confirm the user.
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
def test_missing_mail(self):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
(response, err_msg) = ldap.verify_user('nomail', 'somepass')
(response, err_msg) = self.ldap.verify_user('nomail', 'somepass')
self.assertIsNone(response)
self.assertEquals('Missing mail field "mail" in user record', err_msg)
def test_confirm_different_username(self):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
# Verify that the user is logged in and their username was adjusted.
(response, _) = ldap.verify_user('cool.user', 'somepass')
(response, _) = self.ldap.verify_user('cool.user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify that we *cannot* confirm the LDAP username.
(response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
(response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass')
self.assertIsNone(response)
def test_referral(self):
(response, _) = self.ldap.verify_user('referred', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
def test_invalid_referral(self):
(response, _) = self.ldap.verify_user('invalidreferred', 'somepass')
self.assertIsNone(response)
def test_multientry(self):
(response, _) = self.ldap.verify_user('multientry', 'somepass')
self.assertEquals(response.username, 'multientry')
if __name__ == '__main__':
unittest.main()