import unittest import ldap from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.users import LDAPUsers from data import model from mockldap import MockLdap from mock import patch from contextlib import contextmanager def _create_ldap(requires_email=True): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'password' user_rdn = ['ou=employees'] uid_attr = 'uid' email_attr = 'mail' secondary_user_rdns = ['ou=otheremployees'] ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns, requires_email=requires_email) return ldap @contextmanager def mock_ldap(requires_email=True): mock_data = { 'dc=quay,dc=io': {'dc': ['quay', 'io']}, 'ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees' }, 'ou=otheremployees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'otheremployees' }, 'cn=AwesomeFolk,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'cn': 'AwesomeFolk' }, 'uid=testy,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['testy'], 'userPassword': ['password'], 'mail': ['bar@baz.com'], 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'], }, 'uid=someuser,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['someuser'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'], 'memberOf': ['cn=AwesomeFolk,dc=quay,dc=io', 'cn=*Guys,dc=quay,dc=io'], }, 'uid=nomail,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['nomail'], 'userPassword': ['somepass'] }, 'uid=cool.user,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['cool.user', 'referred'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'] }, 'uid=referred,ou=employees,dc=quay,dc=io': { 'uid': ['referred'], '_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io' }, 'uid=invalidreferred,ou=employees,dc=quay,dc=io': { 'uid': ['invalidreferred'], '_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io' }, 'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': { 'uid': ['multientry'], 'mail': ['foo@bar.com'], 'userPassword': ['somepass'], }, 'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': { 'uid': ['multientry'], 'another': ['key'] }, 'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'otheremployees', 'uid': ['secondaryuser'], 'userPassword': ['somepass'], 'mail': ['foosecondary@bar.com'] }, } if not requires_email: for path in mock_data: mock_data[path].pop('mail', None) mockldap = MockLdap(mock_data) def initializer(uri, trace_level=0): obj = mockldap[uri] # Seed to "support" wildcard queries, which MockLDAP does not support natively. cool_block = { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['cool.user', 'referred'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'] } if not requires_email: cool_block.pop('mail', None) obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([ ('uid=cool.user,ou=employees,dc=quay,dc=io', cool_block) ]) obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([]) obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) obj._results = {} def result3(messageid): if messageid is None: return None, [], None, None return obj._results[messageid] def search_ext(user_search_dn, scope, search_flt, serverctrls=None, attrlist=None): if scope != ldap.SCOPE_SUBTREE: return None if not serverctrls: return None page_control = serverctrls[0] if page_control.controlType != ldap.controls.SimplePagedResultsControl.controlType: return None msgid = obj.search(user_search_dn, scope, search_flt, attrlist=attrlist) _, rdata = obj.result(msgid) msgid = 'messageid' cookie = int(page_control.cookie) if page_control.cookie else 0 results = rdata[cookie:cookie+page_control.size] cookie = cookie + page_control.size if cookie > len(results): page_control.cookie = None else: page_control.cookie = cookie obj._results['messageid'] = (None, results, None, [page_control]) return msgid obj.search_ext = search_ext obj.result3 = result3 return obj mockldap.start() with patch('ldap.initialize', new=initializer): yield _create_ldap(requires_email=requires_email) mockldap.stop() class TestLDAP(unittest.TestCase): def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def test_invalid_admin_password(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'INVALIDPASSWORD' user_rdn = ['ou=employees'] uid_attr = 'uid' email_attr = 'mail' with mock_ldap(): ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) # Try to login. (response, err_msg) = ldap.verify_and_link_user('someuser', 'somepass') self.assertIsNone(response) self.assertEquals('LDAP Admin dn or password is invalid', err_msg) def test_login(self): with mock_ldap() as ldap: # Verify we can login. (response, _) = ldap.verify_and_link_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') self.assertTrue(model.user.has_user_prompt(response, 'confirm_username')) # Verify we can confirm the user. (response, _) = ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') def test_login_empty_password(self): with mock_ldap() as ldap: # Verify we cannot login. (response, err_msg) = ldap.verify_and_link_user('someuser', '') self.assertIsNone(response) self.assertEquals(err_msg, 'Anonymous binding not allowed') # Verify we cannot confirm the user. (response, err_msg) = ldap.confirm_existing_user('someuser', '') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') def test_login_whitespace_password(self): with mock_ldap() as ldap: # Verify we cannot login. (response, err_msg) = ldap.verify_and_link_user('someuser', ' ') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid password') # Verify we cannot confirm the user. (response, err_msg) = ldap.confirm_existing_user('someuser', ' ') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') def test_login_secondary(self): with mock_ldap() as ldap: # Verify we can login. (response, _) = ldap.verify_and_link_user('secondaryuser', 'somepass') self.assertEquals(response.username, 'secondaryuser') # Verify we can confirm the user. (response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass') self.assertEquals(response.username, 'secondaryuser') def test_invalid_wildcard(self): with mock_ldap() as ldap: # Verify we cannot login with a wildcard. (response, err_msg) = ldap.verify_and_link_user('some*', 'somepass') self.assertIsNone(response) self.assertEquals(err_msg, 'Username not found') # Verify we cannot confirm the user. (response, err_msg) = ldap.confirm_existing_user('some*', 'somepass') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') def test_invalid_password(self): with mock_ldap() as ldap: # Verify we cannot login with an invalid password. (response, err_msg) = ldap.verify_and_link_user('someuser', 'invalidpass') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid password') # Verify we cannot confirm the user. (response, err_msg) = ldap.confirm_existing_user('someuser', 'invalidpass') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') def test_missing_mail(self): with mock_ldap() as ldap: (response, err_msg) = ldap.get_user('nomail') self.assertIsNone(response) self.assertEquals('Missing mail field "mail" in user record', err_msg) def test_missing_mail_allowed(self): with mock_ldap(requires_email=False) as ldap: (response, _) = ldap.get_user('nomail') self.assertEquals(response.username, 'nomail') def test_confirm_different_username(self): with mock_ldap() as ldap: # Verify that the user is logged in and their username was adjusted. (response, _) = ldap.verify_and_link_user('cool.user', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify we can confirm the user's quay username. (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify that we *cannot* confirm the LDAP username. (response, _) = ldap.confirm_existing_user('cool.user', 'somepass') self.assertIsNone(response) def test_referral(self): with mock_ldap() as ldap: (response, _) = ldap.verify_and_link_user('referred', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify we can confirm the user's quay username. (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') self.assertEquals(response.username, 'cool_user') def test_invalid_referral(self): with mock_ldap() as ldap: (response, _) = ldap.verify_and_link_user('invalidreferred', 'somepass') self.assertIsNone(response) def test_multientry(self): with mock_ldap() as ldap: (response, _) = ldap.verify_and_link_user('multientry', 'somepass') self.assertEquals(response.username, 'multientry') def test_login_empty_userdn(self): with mock_ldap(): base_dn = ['ou=employees', 'dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'password' user_rdn = [] uid_attr = 'uid' email_attr = 'mail' secondary_user_rdns = ['ou=otheremployees'] ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) # Verify we can login. (response, _) = ldap.verify_and_link_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') # Verify we can confirm the user. (response, _) = ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') def test_link_user(self): with mock_ldap() as ldap: # Link someuser. user, error_message = ldap.link_user('someuser') self.assertIsNone(error_message) self.assertIsNotNone(user) self.assertEquals('someuser', user.username) # Link again. Should return the same user record. user_again, _ = ldap.link_user('someuser') self.assertEquals(user_again.id, user.id) # Confirm someuser. result, _ = ldap.confirm_existing_user('someuser', 'somepass') self.assertIsNotNone(result) self.assertEquals('someuser', result.username) self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) def test_query(self): with mock_ldap() as ldap: # Lookup cool. (response, federated_id, error_message) = ldap.query_users('cool') self.assertIsNone(error_message) self.assertEquals(1, len(response)) self.assertEquals('ldap', federated_id) user_info = response[0] self.assertEquals("cool.user", user_info.username) self.assertEquals("foo@bar.com", user_info.email) # Lookup unknown. (response, federated_id, error_message) = ldap.query_users('unknown') self.assertIsNone(error_message) self.assertEquals(0, len(response)) self.assertEquals('ldap', federated_id) def test_timeout(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'password' user_rdn = ['ou=employees'] uid_attr = 'uid' email_attr = 'mail' secondary_user_rdns = ['ou=otheremployees'] with self.assertRaisesRegexp(Exception, "Can't contact LDAP server"): ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns, requires_email=False, timeout=5) ldap.query_users('cool') def test_iterate_group_members(self): with mock_ldap() as ldap: (it, err) = ldap.iterate_group_members({'group_dn': 'cn=AwesomeFolk'}, disable_pagination=True) self.assertIsNone(err) results = list(it) self.assertEquals(2, len(results)) first = results[0][0] second = results[1][0] if first.id == 'testy': testy, someuser = first, second else: testy, someuser = second, first self.assertEquals('testy', testy.id) self.assertEquals('testy', testy.username) self.assertEquals('bar@baz.com', testy.email) self.assertEquals('someuser', someuser.id) self.assertEquals('someuser', someuser.username) self.assertEquals('foo@bar.com', someuser.email) def test_iterate_group_members_with_pagination(self): with mock_ldap() as ldap: for dn in ['cn=AwesomeFolk', 'cn=*Guys']: (it, err) = ldap.iterate_group_members({'group_dn': dn}, page_size=1) self.assertIsNone(err) results = list(it) self.assertEquals(2, len(results)) first = results[0][0] second = results[1][0] if first.id == 'testy': testy, someuser = first, second else: testy, someuser = second, first self.assertEquals('testy', testy.id) self.assertEquals('testy', testy.username) self.assertEquals('bar@baz.com', testy.email) self.assertEquals('someuser', someuser.id) self.assertEquals('someuser', someuser.username) self.assertEquals('foo@bar.com', someuser.email) def test_check_group_lookup_args(self): with mock_ldap() as ldap: (result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=invalid'}, disable_pagination=True) self.assertFalse(result) self.assertIsNotNone(err) (result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=AwesomeFolk'}, disable_pagination=True) self.assertTrue(result) self.assertIsNone(err) (result, err) = ldap.check_group_lookup_args({'group_dn': 'cn=*Guys'}, disable_pagination=True) self.assertTrue(result) self.assertIsNone(err) def test_metadata(self): with mock_ldap() as ldap: assert 'base_dn' in ldap.service_metadata() if __name__ == '__main__': unittest.main()