diff --git a/data/users/externalldap.py b/data/users/externalldap.py index 33d58c813..c56b24eed 100644 --- a/data/users/externalldap.py +++ b/data/users/externalldap.py @@ -1,9 +1,10 @@ import ldap import logging import os +import itertools from collections import namedtuple -from data.users.federated import FederatedUsers, VerifiedCredentials +from data.users.federated import FederatedUsers, UserInformation logger = logging.getLogger(__name__) @@ -25,11 +26,11 @@ class LDAPConnection(object): self._user_dn = user_dn self._user_pw = user_pw self._allow_tls_fallback = allow_tls_fallback - self._conn = None def __enter__(self): trace_level = 2 if os.environ.get('USERS_DEBUG') == '1' else 0 + self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) self._conn.set_option(ldap.OPT_REFERRALS, 1) @@ -43,6 +44,10 @@ class LDAPConnection(object): def __exit__(self, exc_type, value, tb): self._conn.unbind_s() +def _take(n, iterable): + "Return first n items of the iterable as a list" + return list(itertools.islice(iterable, n)) + class LDAPUsers(FederatedUsers): _LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs']) @@ -51,6 +56,7 @@ class LDAPUsers(FederatedUsers): allow_tls_fallback=False, secondary_user_rdns=None): super(LDAPUsers, self).__init__('ldap') + self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback) self._ldap_uri = ldap_uri self._uid_attr = uid_attr @@ -109,7 +115,10 @@ class LDAPUsers(FederatedUsers): logger.exception('LDAP search exception') return (None, 'Username not found') - def _ldap_user_search(self, username_or_email): + def _ldap_user_search(self, username_or_email, limit=20): + if not username_or_email: + return (None, 'Empty username/email') + # Verify the admin connection works first. We do this here to avoid wrapping # the entire block in the INVALID CREDENTIALS check. try: @@ -130,22 +139,72 @@ class LDAPUsers(FederatedUsers): return (None, err_msg) logger.debug('Found matching pairs: %s', pairs) - results = [LDAPUsers._LDAPResult(*pair) for pair in pairs] + results = [LDAPUsers._LDAPResult(*pair) for pair in _take(limit, pairs)] - # Filter out pairs without DNs. Some LDAP impls will return such - # pairs. + # Filter out pairs without DNs. Some LDAP impls will return such pairs. with_dns = [result for result in results if result.dn] - if len(with_dns) < 1: - return (None, 'Username not found') + return (with_dns, None) - # If we have found a single pair, then return it. - if len(with_dns) == 1: - return (with_dns[0], None) + def _ldap_single_user_search(self, username_or_email): + with_dns, err_msg = self._ldap_user_search(username_or_email) + if err_msg is not None: + return (None, err_msg) - # Otherwise, there are multiple pairs with DNs, so find the one with the mail - # attribute (if any). - with_mail = [result for result in results if result.attrs.get(self._email_attr)] - return (with_mail[0] if with_mail else with_dns[0], None) + # Make sure we have at least one result. + if len(with_dns) < 1: + return (None, 'Username not found') + + # If we have found a single pair, then return it. + if len(with_dns) == 1: + return (with_dns[0], None) + + # Otherwise, there are multiple pairs with DNs, so find the one with the mail + # attribute (if any). + with_mail = [result for result in with_dns if result.attrs.get(self._email_attr)] + return (with_mail[0] if with_mail else with_dns[0], None) + + def _credential_for_user(self, response): + if not response.get(self._uid_attr): + return (None, 'Missing uid field "%s" in user record' % self._uid_attr) + + if not response.get(self._email_attr): + return (None, 'Missing mail field "%s" in user record' % self._email_attr) + + username = response[self._uid_attr][0].decode('utf-8') + email = response[self._email_attr][0] + return (UserInformation(username=username, email=email, id=username), None) + + def get_user(self, username_or_email): + """ Looks up a username or email in LDAP. """ + logger.debug('Looking up LDAP username or email %s', username_or_email) + (found_user, err_msg) = self._ldap_single_user_search(username_or_email) + if err_msg is not None: + return (None, err_msg) + + logger.debug('Found user for LDAP username or email %s', username_or_email) + _, found_response = found_user + return self._credential_for_user(found_response) + + def query_users(self, query, limit=20): + """ Queries LDAP for matching users. """ + if not query: + return (None, 'Empty query') + + logger.debug('Got query %s with limit %s', query, limit) + (results, err_msg) = self._ldap_user_search(query + '*', limit=limit) + if err_msg is not None: + return (None, err_msg) + + final_results = [] + for result in results[0:limit]: + credentials, err_msg = self._credential_for_user(result.attrs) + if err_msg is not None: + continue + + final_results.append(credentials) + + logger.debug('For query %s found results %s', query, final_results) + return (final_results, None) def verify_credentials(self, username_or_email, password): """ Verify the credentials with LDAP. """ @@ -153,7 +212,7 @@ class LDAPUsers(FederatedUsers): if not password: return (None, 'Anonymous binding not allowed') - (found_user, err_msg) = self._ldap_user_search(username_or_email) + (found_user, err_msg) = self._ldap_single_user_search(username_or_email) if found_user is None: return (None, err_msg) @@ -183,14 +242,4 @@ class LDAPUsers(FederatedUsers): logger.exception('Invalid LDAP credentials') return (None, 'Invalid password') - # Now check if we have a federated login for this user - if not found_response.get(self._uid_attr): - return (None, 'Missing uid field "%s" in user record' % self._uid_attr) - - if not found_response.get(self._email_attr): - return (None, 'Missing mail field "%s" in user record' % self._email_attr) - - username = found_response[self._uid_attr][0].decode('utf-8') - email = found_response[self._email_attr][0] - return (VerifiedCredentials(username=username, email=email), None) - + return self._credential_for_user(found_response) diff --git a/test/test_ldap.py b/test/test_ldap.py index ed3217ed2..d6d9abf62 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -4,6 +4,7 @@ from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.users import LDAPUsers from mockldap import MockLdap +from mock import patch class TestLDAP(unittest.TestCase): def setUp(self): @@ -198,6 +199,60 @@ class TestLDAP(unittest.TestCase): (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') + def test_link_user(self): + # Link someuser. + user, error_message = self.ldap.link_user('someuser') + self.assertIsNone(error_message) + self.assertIsNotNone(user) + self.assertEquals('someuser', user.username) + + # Link again. Should return the same user record. + user_again, _ = self.ldap.link_user('someuser') + self.assertEquals(user_again.id, user.id) + + # Confirm someuser. + result, _ = self.ldap.confirm_existing_user('someuser', 'somepass') + self.assertIsNotNone(result) + self.assertEquals('someuser', result.username) + + def test_query(self): + def initializer(uri, trace_level=0): + obj = self.mockldap[uri] + + # Seed to "support" wildcard queries, which MockLDAP does not support natively. + obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([ + ('uid=cool.user,ou=employees,dc=quay,dc=io', { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['cool.user', 'referred'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] + }) + ]) + + obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([]) + + obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) + obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, + '(|(uid=unknown*)(mail=unknown*))')([]) + return obj + + with patch('ldap.initialize', new=initializer): + # Lookup cool. + (response, error_message) = self.ldap.query_users('cool') + self.assertIsNone(error_message) + self.assertEquals(1, len(response)) + + user_info = response[0] + self.assertEquals("cool.user", user_info.username) + self.assertEquals("foo@bar.com", user_info.email) + + # Lookup unknown. + (response, error_message) = self.ldap.query_users('unknown') + self.assertIsNone(error_message) + self.assertEquals(0, len(response)) + + if __name__ == '__main__': unittest.main()