Add support to LDAP for external user linking

This commit is contained in:
Joseph Schorr 2016-10-27 15:32:01 -04:00
parent d145222812
commit f9ee8d2bef
2 changed files with 131 additions and 27 deletions

View file

@ -1,9 +1,10 @@
import ldap import ldap
import logging import logging
import os import os
import itertools
from collections import namedtuple from collections import namedtuple
from data.users.federated import FederatedUsers, VerifiedCredentials from data.users.federated import FederatedUsers, UserInformation
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,11 +26,11 @@ class LDAPConnection(object):
self._user_dn = user_dn self._user_dn = user_dn
self._user_pw = user_pw self._user_pw = user_pw
self._allow_tls_fallback = allow_tls_fallback self._allow_tls_fallback = allow_tls_fallback
self._conn = None self._conn = None
def __enter__(self): def __enter__(self):
trace_level = 2 if os.environ.get('USERS_DEBUG') == '1' else 0 trace_level = 2 if os.environ.get('USERS_DEBUG') == '1' else 0
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
self._conn.set_option(ldap.OPT_REFERRALS, 1) self._conn.set_option(ldap.OPT_REFERRALS, 1)
@ -43,6 +44,10 @@ class LDAPConnection(object):
def __exit__(self, exc_type, value, tb): def __exit__(self, exc_type, value, tb):
self._conn.unbind_s() self._conn.unbind_s()
def _take(n, iterable):
"Return first n items of the iterable as a list"
return list(itertools.islice(iterable, n))
class LDAPUsers(FederatedUsers): class LDAPUsers(FederatedUsers):
_LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs']) _LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs'])
@ -51,6 +56,7 @@ class LDAPUsers(FederatedUsers):
allow_tls_fallback=False, secondary_user_rdns=None): allow_tls_fallback=False, secondary_user_rdns=None):
super(LDAPUsers, self).__init__('ldap') super(LDAPUsers, self).__init__('ldap')
self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback) self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback)
self._ldap_uri = ldap_uri self._ldap_uri = ldap_uri
self._uid_attr = uid_attr self._uid_attr = uid_attr
@ -109,7 +115,10 @@ class LDAPUsers(FederatedUsers):
logger.exception('LDAP search exception') logger.exception('LDAP search exception')
return (None, 'Username not found') return (None, 'Username not found')
def _ldap_user_search(self, username_or_email): def _ldap_user_search(self, username_or_email, limit=20):
if not username_or_email:
return (None, 'Empty username/email')
# Verify the admin connection works first. We do this here to avoid wrapping # Verify the admin connection works first. We do this here to avoid wrapping
# the entire block in the INVALID CREDENTIALS check. # the entire block in the INVALID CREDENTIALS check.
try: try:
@ -130,22 +139,72 @@ class LDAPUsers(FederatedUsers):
return (None, err_msg) return (None, err_msg)
logger.debug('Found matching pairs: %s', pairs) logger.debug('Found matching pairs: %s', pairs)
results = [LDAPUsers._LDAPResult(*pair) for pair in pairs] results = [LDAPUsers._LDAPResult(*pair) for pair in _take(limit, pairs)]
# Filter out pairs without DNs. Some LDAP impls will return such # Filter out pairs without DNs. Some LDAP impls will return such pairs.
# pairs.
with_dns = [result for result in results if result.dn] with_dns = [result for result in results if result.dn]
if len(with_dns) < 1: return (with_dns, None)
return (None, 'Username not found')
# If we have found a single pair, then return it. def _ldap_single_user_search(self, username_or_email):
if len(with_dns) == 1: with_dns, err_msg = self._ldap_user_search(username_or_email)
return (with_dns[0], None) if err_msg is not None:
return (None, err_msg)
# Otherwise, there are multiple pairs with DNs, so find the one with the mail # Make sure we have at least one result.
# attribute (if any). if len(with_dns) < 1:
with_mail = [result for result in results if result.attrs.get(self._email_attr)] return (None, 'Username not found')
return (with_mail[0] if with_mail else with_dns[0], None)
# If we have found a single pair, then return it.
if len(with_dns) == 1:
return (with_dns[0], None)
# Otherwise, there are multiple pairs with DNs, so find the one with the mail
# attribute (if any).
with_mail = [result for result in with_dns if result.attrs.get(self._email_attr)]
return (with_mail[0] if with_mail else with_dns[0], None)
def _credential_for_user(self, response):
if not response.get(self._uid_attr):
return (None, 'Missing uid field "%s" in user record' % self._uid_attr)
if not response.get(self._email_attr):
return (None, 'Missing mail field "%s" in user record' % self._email_attr)
username = response[self._uid_attr][0].decode('utf-8')
email = response[self._email_attr][0]
return (UserInformation(username=username, email=email, id=username), None)
def get_user(self, username_or_email):
""" Looks up a username or email in LDAP. """
logger.debug('Looking up LDAP username or email %s', username_or_email)
(found_user, err_msg) = self._ldap_single_user_search(username_or_email)
if err_msg is not None:
return (None, err_msg)
logger.debug('Found user for LDAP username or email %s', username_or_email)
_, found_response = found_user
return self._credential_for_user(found_response)
def query_users(self, query, limit=20):
""" Queries LDAP for matching users. """
if not query:
return (None, 'Empty query')
logger.debug('Got query %s with limit %s', query, limit)
(results, err_msg) = self._ldap_user_search(query + '*', limit=limit)
if err_msg is not None:
return (None, err_msg)
final_results = []
for result in results[0:limit]:
credentials, err_msg = self._credential_for_user(result.attrs)
if err_msg is not None:
continue
final_results.append(credentials)
logger.debug('For query %s found results %s', query, final_results)
return (final_results, None)
def verify_credentials(self, username_or_email, password): def verify_credentials(self, username_or_email, password):
""" Verify the credentials with LDAP. """ """ Verify the credentials with LDAP. """
@ -153,7 +212,7 @@ class LDAPUsers(FederatedUsers):
if not password: if not password:
return (None, 'Anonymous binding not allowed') return (None, 'Anonymous binding not allowed')
(found_user, err_msg) = self._ldap_user_search(username_or_email) (found_user, err_msg) = self._ldap_single_user_search(username_or_email)
if found_user is None: if found_user is None:
return (None, err_msg) return (None, err_msg)
@ -183,14 +242,4 @@ class LDAPUsers(FederatedUsers):
logger.exception('Invalid LDAP credentials') logger.exception('Invalid LDAP credentials')
return (None, 'Invalid password') return (None, 'Invalid password')
# Now check if we have a federated login for this user return self._credential_for_user(found_response)
if not found_response.get(self._uid_attr):
return (None, 'Missing uid field "%s" in user record' % self._uid_attr)
if not found_response.get(self._email_attr):
return (None, 'Missing mail field "%s" in user record' % self._email_attr)
username = found_response[self._uid_attr][0].decode('utf-8')
email = found_response[self._email_attr][0]
return (VerifiedCredentials(username=username, email=email), None)

View file

@ -4,6 +4,7 @@ from app import app
from initdb import setup_database_for_testing, finished_database_for_testing from initdb import setup_database_for_testing, finished_database_for_testing
from data.users import LDAPUsers from data.users import LDAPUsers
from mockldap import MockLdap from mockldap import MockLdap
from mock import patch
class TestLDAP(unittest.TestCase): class TestLDAP(unittest.TestCase):
def setUp(self): def setUp(self):
@ -198,6 +199,60 @@ class TestLDAP(unittest.TestCase):
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser') self.assertEquals(response.username, 'someuser')
def test_link_user(self):
# Link someuser.
user, error_message = self.ldap.link_user('someuser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('someuser', user.username)
# Link again. Should return the same user record.
user_again, _ = self.ldap.link_user('someuser')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertIsNotNone(result)
self.assertEquals('someuser', result.username)
def test_query(self):
def initializer(uri, trace_level=0):
obj = self.mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
with patch('ldap.initialize', new=initializer):
# Lookup cool.
(response, error_message) = self.ldap.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
user_info = response[0]
self.assertEquals("cool.user", user_info.username)
self.assertEquals("foo@bar.com", user_info.email)
# Lookup unknown.
(response, error_message) = self.ldap.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()