Add pagination tests for LDAP
This commit is contained in:
parent
541aa722c2
commit
8c07f733eb
2 changed files with 67 additions and 2 deletions
|
@ -333,7 +333,7 @@ class LDAPUsers(FederatedUsers):
|
||||||
cookie = lc.cookie = pctrls[0].cookie
|
cookie = lc.cookie = pctrls[0].cookie
|
||||||
if cookie:
|
if cookie:
|
||||||
msgid = conn.search_ext(user_search_dn, ldap.SCOPE_SUBTREE, search_flt,
|
msgid = conn.search_ext(user_search_dn, ldap.SCOPE_SUBTREE, search_flt,
|
||||||
serverctrls=[lc])
|
serverctrls=[lc], attrlist=attributes)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# No additional results.
|
# No additional results.
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
import ldap
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from data.users import LDAPUsers
|
from data.users import LDAPUsers
|
||||||
|
@ -19,7 +21,7 @@ def _create_ldap(requires_email=True):
|
||||||
|
|
||||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||||
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
|
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
|
||||||
requires_email=requires_email, force_no_pagination=True)
|
requires_email=requires_email)
|
||||||
return ldap
|
return ldap
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -123,6 +125,45 @@ def mock_ldap(requires_email=True):
|
||||||
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
|
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
|
||||||
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
|
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
|
||||||
'(|(uid=unknown*)(mail=unknown*))')([])
|
'(|(uid=unknown*)(mail=unknown*))')([])
|
||||||
|
|
||||||
|
obj._results = {}
|
||||||
|
|
||||||
|
def result3(messageid):
|
||||||
|
if messageid is None:
|
||||||
|
return None, [], None, None
|
||||||
|
|
||||||
|
return obj._results[messageid]
|
||||||
|
|
||||||
|
def search_ext(user_search_dn, scope, search_flt, serverctrls=None, attrlist=None):
|
||||||
|
if scope != ldap.SCOPE_SUBTREE:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not serverctrls:
|
||||||
|
return None
|
||||||
|
|
||||||
|
page_control = serverctrls[0]
|
||||||
|
if page_control.controlType != ldap.controls.SimplePagedResultsControl.controlType:
|
||||||
|
return None
|
||||||
|
|
||||||
|
msgid = obj.search(user_search_dn, scope, search_flt, attrlist=attrlist)
|
||||||
|
_, rdata = obj.result(msgid)
|
||||||
|
|
||||||
|
msgid = 'messageid'
|
||||||
|
cookie = int(page_control.cookie) if page_control.cookie else 0
|
||||||
|
|
||||||
|
results = rdata[cookie:cookie+page_control.size]
|
||||||
|
cookie = cookie + page_control.size
|
||||||
|
if cookie > len(results):
|
||||||
|
page_control.cookie = None
|
||||||
|
else:
|
||||||
|
page_control.cookie = cookie
|
||||||
|
|
||||||
|
obj._results['messageid'] = (None, results, None, [page_control])
|
||||||
|
return msgid
|
||||||
|
|
||||||
|
obj.search_ext = search_ext
|
||||||
|
obj.result3 = result3
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
mockldap.start()
|
mockldap.start()
|
||||||
|
@ -333,6 +374,30 @@ class TestLDAP(unittest.TestCase):
|
||||||
self.assertEquals('someuser', someuser.username)
|
self.assertEquals('someuser', someuser.username)
|
||||||
self.assertEquals('foo@bar.com', someuser.email)
|
self.assertEquals('foo@bar.com', someuser.email)
|
||||||
|
|
||||||
|
def test_iterate_group_members_with_pagination(self):
|
||||||
|
with mock_ldap() as ldap:
|
||||||
|
(it, err) = ldap.iterate_group_members({'group_dn': 'cn=AwesomeFolk'}, page_size=1)
|
||||||
|
self.assertIsNone(err)
|
||||||
|
|
||||||
|
results = list(it)
|
||||||
|
self.assertEquals(2, len(results))
|
||||||
|
|
||||||
|
first = results[0][0]
|
||||||
|
second = results[1][0]
|
||||||
|
|
||||||
|
if first.id == 'testy':
|
||||||
|
testy, someuser = first, second
|
||||||
|
else:
|
||||||
|
testy, someuser = second, first
|
||||||
|
|
||||||
|
self.assertEquals('testy', testy.id)
|
||||||
|
self.assertEquals('testy', testy.username)
|
||||||
|
self.assertEquals('bar@baz.com', testy.email)
|
||||||
|
|
||||||
|
self.assertEquals('someuser', someuser.id)
|
||||||
|
self.assertEquals('someuser', someuser.username)
|
||||||
|
self.assertEquals('foo@bar.com', someuser.email)
|
||||||
|
|
||||||
def test_check_group_lookup_args(self):
|
def test_check_group_lookup_args(self):
|
||||||
with mock_ldap() as ldap:
|
with mock_ldap() as ldap:
|
||||||
(result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=invalid'},
|
(result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=invalid'},
|
||||||
|
|
Reference in a new issue