diff --git a/data/users/externalldap.py b/data/users/externalldap.py index 6e740a25a..d792b6eb4 100644 --- a/data/users/externalldap.py +++ b/data/users/externalldap.py @@ -3,6 +3,7 @@ import logging import os from ldap.controls import SimplePagedResultsControl +from ldap.filter import filter_format, escape_filter_chars from collections import namedtuple from data.users.federated import FederatedUsers, UserInformation @@ -109,9 +110,10 @@ class LDAPUsers(FederatedUsers): referral_dn = referral_uri[len('ldap:///'):] return referral_dn - def _ldap_user_search_with_rdn(self, conn, username_or_email, user_search_dn): - query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, - username_or_email) + def _ldap_user_search_with_rdn(self, conn, username_or_email, user_search_dn, suffix=''): + query = u'(|({0}={2}{3})({1}={2}{3}))'.format(self._uid_attr, self._email_attr, + escape_filter_chars(username_or_email), + suffix) logger.debug('Conducting user search: %s under %s', query, user_search_dn) try: return (conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8')), None) @@ -131,7 +133,7 @@ class LDAPUsers(FederatedUsers): logger.debug('LDAP search exception') return (None, 'Username not found') - def _ldap_user_search(self, username_or_email, limit=20): + def _ldap_user_search(self, username_or_email, limit=20, suffix=''): if not username_or_email: return (None, 'Empty username/email') @@ -147,7 +149,8 @@ class LDAPUsers(FederatedUsers): logger.debug('Incoming username or email param: %s', username_or_email.__repr__()) for user_search_dn in self._user_dns: - (pairs, err_msg) = self._ldap_user_search_with_rdn(conn, username_or_email, user_search_dn) + (pairs, err_msg) = self._ldap_user_search_with_rdn(conn, username_or_email, user_search_dn, + suffix=suffix) if pairs is not None and len(pairs) > 0: break @@ -207,7 +210,7 @@ class LDAPUsers(FederatedUsers): return (None, self.federated_service, 'Empty query') logger.debug('Got query %s with limit %s', query, limit) - (results, err_msg) = self._ldap_user_search(query + '*', limit=limit) + (results, err_msg) = self._ldap_user_search(query, limit=limit, suffix='*') if err_msg is not None: return (None, self.federated_service, err_msg) @@ -296,7 +299,7 @@ class LDAPUsers(FederatedUsers): lc = ldap.controls.libldap.SimplePagedResultsControl(criticality=True, size=page_size, cookie='') - search_flt = '(memberOf=%s,%s)' % (group_dn, self._base_dn) + search_flt = filter_format('(memberOf=%s,%s)', (group_dn, self._base_dn)) attributes = [self._uid_attr, self._email_attr] for user_search_dn in self._user_dns: diff --git a/test/test_ldap.py b/test/test_ldap.py index 4e35fd667..2b71b58ce 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -46,7 +46,7 @@ def mock_ldap(requires_email=True): 'uid': ['testy'], 'userPassword': ['password'], 'mail': ['bar@baz.com'], - 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io'], + 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'], }, 'uid=someuser,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], @@ -54,7 +54,7 @@ def mock_ldap(requires_email=True): 'uid': ['someuser'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'], - 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io'], + 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'], }, 'uid=nomail,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], @@ -235,7 +235,6 @@ class TestLDAP(unittest.TestCase): self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') - def test_login_secondary(self): with mock_ldap() as ldap: # Verify we can login. @@ -246,6 +245,18 @@ class TestLDAP(unittest.TestCase): (response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass') self.assertEquals(response.username, 'secondaryuser') + def test_invalid_wildcard(self): + with mock_ldap() as ldap: + # Verify we cannot login with a wildcard. + (response, err_msg) = ldap.verify_and_link_user('some*', 'somepass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Username not found') + + # Verify we cannot confirm the user. + (response, err_msg) = ldap.confirm_existing_user('some*', 'somepass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Invalid user') + def test_invalid_password(self): with mock_ldap() as ldap: # Verify we cannot login with an invalid password. @@ -401,27 +412,28 @@ class TestLDAP(unittest.TestCase): def test_iterate_group_members_with_pagination(self): with mock_ldap() as ldap: - (it, err) = ldap.iterate_group_members({'group_dn': 'cn=AwesomeFolk'}, page_size=1) - self.assertIsNone(err) + for dn in ['cn=AwesomeFolk', 'cn=*Guys']: + (it, err) = ldap.iterate_group_members({'group_dn': dn}, page_size=1) + self.assertIsNone(err) - results = list(it) - self.assertEquals(2, len(results)) + results = list(it) + self.assertEquals(2, len(results)) - first = results[0][0] - second = results[1][0] + first = results[0][0] + second = results[1][0] - if first.id == 'testy': - testy, someuser = first, second - else: - testy, someuser = second, first + if first.id == 'testy': + testy, someuser = first, second + else: + testy, someuser = second, first - self.assertEquals('testy', testy.id) - self.assertEquals('testy', testy.username) - self.assertEquals('bar@baz.com', testy.email) + self.assertEquals('testy', testy.id) + self.assertEquals('testy', testy.username) + self.assertEquals('bar@baz.com', testy.email) - self.assertEquals('someuser', someuser.id) - self.assertEquals('someuser', someuser.username) - self.assertEquals('foo@bar.com', someuser.email) + self.assertEquals('someuser', someuser.id) + self.assertEquals('someuser', someuser.username) + self.assertEquals('foo@bar.com', someuser.email) def test_check_group_lookup_args(self): with mock_ldap() as ldap: @@ -435,6 +447,11 @@ class TestLDAP(unittest.TestCase): self.assertTrue(result) self.assertIsNone(err) + (result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=*Guys'}, + disable_pagination=True) + self.assertTrue(result) + self.assertIsNone(err) + def test_metadata(self): with mock_ldap() as ldap: assert 'base_dn' in ldap.service_metadata()