Make sure to escape LDAP queries
Fixes an issue in team sync around group names that contain *s Fixes https://www.pivotaltracker.com/story/show/144628235
This commit is contained in:
parent
02c4d75634
commit
30a681343f
2 changed files with 46 additions and 26 deletions
|
@ -3,6 +3,7 @@ import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from ldap.controls import SimplePagedResultsControl
|
from ldap.controls import SimplePagedResultsControl
|
||||||
|
from ldap.filter import filter_format, escape_filter_chars
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from data.users.federated import FederatedUsers, UserInformation
|
from data.users.federated import FederatedUsers, UserInformation
|
||||||
|
@ -109,9 +110,10 @@ class LDAPUsers(FederatedUsers):
|
||||||
referral_dn = referral_uri[len('ldap:///'):]
|
referral_dn = referral_uri[len('ldap:///'):]
|
||||||
return referral_dn
|
return referral_dn
|
||||||
|
|
||||||
def _ldap_user_search_with_rdn(self, conn, username_or_email, user_search_dn):
|
def _ldap_user_search_with_rdn(self, conn, username_or_email, user_search_dn, suffix=''):
|
||||||
query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr,
|
query = u'(|({0}={2}{3})({1}={2}{3}))'.format(self._uid_attr, self._email_attr,
|
||||||
username_or_email)
|
escape_filter_chars(username_or_email),
|
||||||
|
suffix)
|
||||||
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
|
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
|
||||||
try:
|
try:
|
||||||
return (conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8')), None)
|
return (conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8')), None)
|
||||||
|
@ -131,7 +133,7 @@ class LDAPUsers(FederatedUsers):
|
||||||
logger.debug('LDAP search exception')
|
logger.debug('LDAP search exception')
|
||||||
return (None, 'Username not found')
|
return (None, 'Username not found')
|
||||||
|
|
||||||
def _ldap_user_search(self, username_or_email, limit=20):
|
def _ldap_user_search(self, username_or_email, limit=20, suffix=''):
|
||||||
if not username_or_email:
|
if not username_or_email:
|
||||||
return (None, 'Empty username/email')
|
return (None, 'Empty username/email')
|
||||||
|
|
||||||
|
@ -147,7 +149,8 @@ class LDAPUsers(FederatedUsers):
|
||||||
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
||||||
|
|
||||||
for user_search_dn in self._user_dns:
|
for user_search_dn in self._user_dns:
|
||||||
(pairs, err_msg) = self._ldap_user_search_with_rdn(conn, username_or_email, user_search_dn)
|
(pairs, err_msg) = self._ldap_user_search_with_rdn(conn, username_or_email, user_search_dn,
|
||||||
|
suffix=suffix)
|
||||||
if pairs is not None and len(pairs) > 0:
|
if pairs is not None and len(pairs) > 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -207,7 +210,7 @@ class LDAPUsers(FederatedUsers):
|
||||||
return (None, self.federated_service, 'Empty query')
|
return (None, self.federated_service, 'Empty query')
|
||||||
|
|
||||||
logger.debug('Got query %s with limit %s', query, limit)
|
logger.debug('Got query %s with limit %s', query, limit)
|
||||||
(results, err_msg) = self._ldap_user_search(query + '*', limit=limit)
|
(results, err_msg) = self._ldap_user_search(query, limit=limit, suffix='*')
|
||||||
if err_msg is not None:
|
if err_msg is not None:
|
||||||
return (None, self.federated_service, err_msg)
|
return (None, self.federated_service, err_msg)
|
||||||
|
|
||||||
|
@ -296,7 +299,7 @@ class LDAPUsers(FederatedUsers):
|
||||||
lc = ldap.controls.libldap.SimplePagedResultsControl(criticality=True, size=page_size,
|
lc = ldap.controls.libldap.SimplePagedResultsControl(criticality=True, size=page_size,
|
||||||
cookie='')
|
cookie='')
|
||||||
|
|
||||||
search_flt = '(memberOf=%s,%s)' % (group_dn, self._base_dn)
|
search_flt = filter_format('(memberOf=%s,%s)', (group_dn, self._base_dn))
|
||||||
attributes = [self._uid_attr, self._email_attr]
|
attributes = [self._uid_attr, self._email_attr]
|
||||||
|
|
||||||
for user_search_dn in self._user_dns:
|
for user_search_dn in self._user_dns:
|
||||||
|
|
|
@ -46,7 +46,7 @@ def mock_ldap(requires_email=True):
|
||||||
'uid': ['testy'],
|
'uid': ['testy'],
|
||||||
'userPassword': ['password'],
|
'userPassword': ['password'],
|
||||||
'mail': ['bar@baz.com'],
|
'mail': ['bar@baz.com'],
|
||||||
'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io'],
|
'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'],
|
||||||
},
|
},
|
||||||
'uid=someuser,ou=employees,dc=quay,dc=io': {
|
'uid=someuser,ou=employees,dc=quay,dc=io': {
|
||||||
'dc': ['quay', 'io'],
|
'dc': ['quay', 'io'],
|
||||||
|
@ -54,7 +54,7 @@ def mock_ldap(requires_email=True):
|
||||||
'uid': ['someuser'],
|
'uid': ['someuser'],
|
||||||
'userPassword': ['somepass'],
|
'userPassword': ['somepass'],
|
||||||
'mail': ['foo@bar.com'],
|
'mail': ['foo@bar.com'],
|
||||||
'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io'],
|
'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'],
|
||||||
},
|
},
|
||||||
'uid=nomail,ou=employees,dc=quay,dc=io': {
|
'uid=nomail,ou=employees,dc=quay,dc=io': {
|
||||||
'dc': ['quay', 'io'],
|
'dc': ['quay', 'io'],
|
||||||
|
@ -235,7 +235,6 @@ class TestLDAP(unittest.TestCase):
|
||||||
self.assertIsNone(response)
|
self.assertIsNone(response)
|
||||||
self.assertEquals(err_msg, 'Invalid user')
|
self.assertEquals(err_msg, 'Invalid user')
|
||||||
|
|
||||||
|
|
||||||
def test_login_secondary(self):
|
def test_login_secondary(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
# Verify we can login.
|
# Verify we can login.
|
||||||
|
@ -246,6 +245,18 @@ class TestLDAP(unittest.TestCase):
|
||||||
(response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass')
|
(response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass')
|
||||||
self.assertEquals(response.username, 'secondaryuser')
|
self.assertEquals(response.username, 'secondaryuser')
|
||||||
|
|
||||||
|
def test_invalid_wildcard(self):
|
||||||
|
with mock_ldap() as ldap:
|
||||||
|
# Verify we cannot login with a wildcard.
|
||||||
|
(response, err_msg) = ldap.verify_and_link_user('some*', 'somepass')
|
||||||
|
self.assertIsNone(response)
|
||||||
|
self.assertEquals(err_msg, 'Username not found')
|
||||||
|
|
||||||
|
# Verify we cannot confirm the user.
|
||||||
|
(response, err_msg) = ldap.confirm_existing_user('some*', 'somepass')
|
||||||
|
self.assertIsNone(response)
|
||||||
|
self.assertEquals(err_msg, 'Invalid user')
|
||||||
|
|
||||||
def test_invalid_password(self):
|
def test_invalid_password(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
# Verify we cannot login with an invalid password.
|
# Verify we cannot login with an invalid password.
|
||||||
|
@ -401,27 +412,28 @@ class TestLDAP(unittest.TestCase):
|
||||||
|
|
||||||
def test_iterate_group_members_with_pagination(self):
|
def test_iterate_group_members_with_pagination(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
(it, err) = ldap.iterate_group_members({'group_dn': 'cn=AwesomeFolk'}, page_size=1)
|
for dn in ['cn=AwesomeFolk', 'cn=*Guys']:
|
||||||
self.assertIsNone(err)
|
(it, err) = ldap.iterate_group_members({'group_dn': dn}, page_size=1)
|
||||||
|
self.assertIsNone(err)
|
||||||
|
|
||||||
results = list(it)
|
results = list(it)
|
||||||
self.assertEquals(2, len(results))
|
self.assertEquals(2, len(results))
|
||||||
|
|
||||||
first = results[0][0]
|
first = results[0][0]
|
||||||
second = results[1][0]
|
second = results[1][0]
|
||||||
|
|
||||||
if first.id == 'testy':
|
if first.id == 'testy':
|
||||||
testy, someuser = first, second
|
testy, someuser = first, second
|
||||||
else:
|
else:
|
||||||
testy, someuser = second, first
|
testy, someuser = second, first
|
||||||
|
|
||||||
self.assertEquals('testy', testy.id)
|
self.assertEquals('testy', testy.id)
|
||||||
self.assertEquals('testy', testy.username)
|
self.assertEquals('testy', testy.username)
|
||||||
self.assertEquals('bar@baz.com', testy.email)
|
self.assertEquals('bar@baz.com', testy.email)
|
||||||
|
|
||||||
self.assertEquals('someuser', someuser.id)
|
self.assertEquals('someuser', someuser.id)
|
||||||
self.assertEquals('someuser', someuser.username)
|
self.assertEquals('someuser', someuser.username)
|
||||||
self.assertEquals('foo@bar.com', someuser.email)
|
self.assertEquals('foo@bar.com', someuser.email)
|
||||||
|
|
||||||
def test_check_group_lookup_args(self):
|
def test_check_group_lookup_args(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
|
@ -435,6 +447,11 @@ class TestLDAP(unittest.TestCase):
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
self.assertIsNone(err)
|
self.assertIsNone(err)
|
||||||
|
|
||||||
|
(result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=*Guys'},
|
||||||
|
disable_pagination=True)
|
||||||
|
self.assertTrue(result)
|
||||||
|
self.assertIsNone(err)
|
||||||
|
|
||||||
def test_metadata(self):
|
def test_metadata(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
assert 'base_dn' in ldap.service_metadata()
|
assert 'base_dn' in ldap.service_metadata()
|
||||||
|
|
Reference in a new issue