Merge pull request #45 from coreos-inc/ldapreferfix
Fix LDAP referral and multiple pair handling
This commit is contained in:
commit
491de200f6
4 changed files with 117 additions and 48 deletions
|
@ -9,6 +9,7 @@ import os
|
|||
from util.aes import AESCipher
|
||||
from util.validation import generate_valid_usernames
|
||||
from data import model
|
||||
from collections import namedtuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if os.environ.get('LDAP_DEBUG') == '1':
|
||||
|
@ -55,6 +56,8 @@ class LDAPConnection(object):
|
|||
|
||||
|
||||
class LDAPUsers(object):
|
||||
_LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs'])
|
||||
|
||||
def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr):
|
||||
self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd)
|
||||
self._ldap_uri = ldap_uri
|
||||
|
@ -63,6 +66,25 @@ class LDAPUsers(object):
|
|||
self._uid_attr = uid_attr
|
||||
self._email_attr = email_attr
|
||||
|
||||
def _get_ldap_referral_dn(self, referral_exception):
|
||||
logger.debug('Got referral: %s', referral_exception.args[0])
|
||||
if not referral_exception.args[0] or not referral_exception.args[0].get('info'):
|
||||
logger.debug('LDAP referral missing info block')
|
||||
return None
|
||||
|
||||
referral_info = referral_exception.args[0]['info']
|
||||
if not referral_info.startswith('Referral:\n'):
|
||||
logger.debug('LDAP referral missing Referral header')
|
||||
return None
|
||||
|
||||
referral_uri = referral_info[len('Referral:\n'):]
|
||||
if not referral_uri.startswith('ldap:///'):
|
||||
logger.debug('LDAP referral URI does not start with ldap:///')
|
||||
return None
|
||||
|
||||
referral_dn = referral_uri[len('ldap:///'):]
|
||||
return referral_dn
|
||||
|
||||
def _ldap_user_search(self, username_or_email):
|
||||
with self._ldap_conn as conn:
|
||||
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
||||
|
@ -73,20 +95,40 @@ class LDAPUsers(object):
|
|||
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
|
||||
try:
|
||||
pairs = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8'))
|
||||
except ldap.REFERRAL as re:
|
||||
referral_dn = self._get_ldap_referral_dn(re)
|
||||
if not referral_dn:
|
||||
return None
|
||||
|
||||
try:
|
||||
subquery = u'(%s=%s)' % (self._uid_attr, username_or_email)
|
||||
pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery)
|
||||
except ldap.LDAPError:
|
||||
logger.exception('LDAP referral search exception')
|
||||
return None
|
||||
|
||||
except ldap.LDAPError:
|
||||
logger.exception('LDAP search exception')
|
||||
return None
|
||||
|
||||
logger.debug('Found matching pairs: %s', pairs)
|
||||
if len(pairs) < 1:
|
||||
|
||||
results = [LDAPUsers._LDAPResult(*pair) for pair in pairs]
|
||||
|
||||
# Filter out pairs without DNs. Some LDAP impls will return such
|
||||
# pairs.
|
||||
with_dns = [result for result in results if result.dn]
|
||||
if len(with_dns) < 1:
|
||||
return None
|
||||
|
||||
for pair in pairs:
|
||||
if pair[0] is not None:
|
||||
logger.debug('Found user: %s', pair)
|
||||
return pair
|
||||
# If we have found a single pair, then return it.
|
||||
if len(with_dns) == 1:
|
||||
return with_dns[0]
|
||||
|
||||
return None
|
||||
# Otherwise, there are multiple pairs with DNs, so find the one with the mail
|
||||
# attribute (if any).
|
||||
with_mail = [result for result in results if result.attrs.get(self._email_attr)]
|
||||
return with_mail[0] if with_mail else with_dns[0]
|
||||
|
||||
def confirm_existing_user(self, username, password):
|
||||
""" Verify the username and password by looking up the *LDAP* username and confirming the
|
||||
|
@ -111,17 +153,29 @@ class LDAPUsers(object):
|
|||
return (None, 'Anonymous binding not allowed')
|
||||
|
||||
found_user = self._ldap_user_search(username_or_email)
|
||||
|
||||
if found_user is None:
|
||||
return (None, 'Username not found')
|
||||
|
||||
found_dn, found_response = found_user
|
||||
logger.debug('Found user for LDAP username %s; validating password', username_or_email)
|
||||
logger.debug('DN %s found: %s', found_dn, found_response)
|
||||
|
||||
# First validate the password by binding as the user
|
||||
logger.debug('Found user %s; validating password', username_or_email)
|
||||
try:
|
||||
with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')):
|
||||
pass
|
||||
except ldap.REFERRAL as re:
|
||||
referral_dn = self._get_ldap_referral_dn(re)
|
||||
if not referral_dn:
|
||||
return (None, 'Invalid username')
|
||||
|
||||
try:
|
||||
with LDAPConnection(self._ldap_uri, referral_dn, password.encode('utf-8')):
|
||||
pass
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
logger.exception('Invalid LDAP credentials')
|
||||
return (None, 'Invalid password')
|
||||
|
||||
except ldap.INVALID_CREDENTIALS:
|
||||
logger.exception('Invalid LDAP credentials')
|
||||
return (None, 'Invalid password')
|
||||
|
|
|
@ -42,6 +42,7 @@ git+https://github.com/DevTable/container-cloud-config.git
|
|||
git+https://github.com/DevTable/python-etcd.git
|
||||
git+https://github.com/coreos/py-bitbucket.git
|
||||
git+https://github.com/coreos/pyapi-gitlab.git
|
||||
git+https://github.com/coreos/mockldap.git
|
||||
gipc
|
||||
pyOpenSSL
|
||||
pygpgme
|
||||
|
@ -49,6 +50,5 @@ cachetools
|
|||
mock
|
||||
psutil
|
||||
stringscore
|
||||
mockldap
|
||||
python-swiftclient
|
||||
python-keystoneclient
|
|
@ -44,7 +44,6 @@ jsonschema==2.4.0
|
|||
marisa-trie==0.7.2
|
||||
mixpanel-py==4.0.2
|
||||
mock==1.0.1
|
||||
mockldap==0.2.4
|
||||
msgpack-python==0.4.6
|
||||
netaddr==0.7.14
|
||||
netifaces==0.10.4
|
||||
|
@ -96,3 +95,4 @@ git+https://github.com/DevTable/python-etcd.git
|
|||
git+https://github.com/NateFerrero/oauth2lib.git
|
||||
git+https://github.com/coreos/py-bitbucket.git
|
||||
git+https://github.com/coreos/pyapi-gitlab.git
|
||||
git+https://github.com/coreos/mockldap.git
|
|
@ -42,76 +42,91 @@ class TestLDAP(unittest.TestCase):
|
|||
'uid=cool.user,ou=employees,dc=quay,dc=io': {
|
||||
'dc': ['quay', 'io'],
|
||||
'ou': 'employees',
|
||||
'uid': ['cool.user'],
|
||||
'uid': ['cool.user', 'referred'],
|
||||
'userPassword': ['somepass'],
|
||||
'mail': ['foo@bar.com']
|
||||
}
|
||||
},
|
||||
'uid=referred,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['referred'],
|
||||
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
|
||||
},
|
||||
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['invalidreferred'],
|
||||
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
|
||||
},
|
||||
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['multientry'],
|
||||
'mail': ['foo@bar.com'],
|
||||
'userPassword': ['somepass'],
|
||||
},
|
||||
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
|
||||
'uid': ['multientry'],
|
||||
'another': ['key']
|
||||
},
|
||||
})
|
||||
|
||||
self.mockldap.start()
|
||||
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
self.ldap = ldap
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
self.mockldap.stop()
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
def test_login(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify we can login.
|
||||
(response, _) = ldap.verify_user('someuser', 'somepass')
|
||||
(response, _) = self.ldap.verify_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
# Verify we can confirm the user.
|
||||
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
|
||||
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
|
||||
self.assertEquals(response.username, 'someuser')
|
||||
|
||||
def test_missing_mail(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
(response, err_msg) = ldap.verify_user('nomail', 'somepass')
|
||||
(response, err_msg) = self.ldap.verify_user('nomail', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals('Missing mail field "mail" in user record', err_msg)
|
||||
|
||||
def test_confirm_different_username(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
user_rdn = ['ou=employees']
|
||||
uid_attr = 'uid'
|
||||
email_attr = 'mail'
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr)
|
||||
|
||||
# Verify that the user is logged in and their username was adjusted.
|
||||
(response, _) = ldap.verify_user('cool.user', 'somepass')
|
||||
(response, _) = self.ldap.verify_user('cool.user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify that we *cannot* confirm the LDAP username.
|
||||
(response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
|
||||
(response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
def test_referral(self):
|
||||
(response, _) = self.ldap.verify_user('referred', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
# Verify we can confirm the user's quay username.
|
||||
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
|
||||
self.assertEquals(response.username, 'cool_user')
|
||||
|
||||
def test_invalid_referral(self):
|
||||
(response, _) = self.ldap.verify_user('invalidreferred', 'somepass')
|
||||
self.assertIsNone(response)
|
||||
|
||||
def test_multientry(self):
|
||||
(response, _) = self.ldap.verify_user('multientry', 'somepass')
|
||||
self.assertEquals(response.username, 'multientry')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue