import unittest from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.users import LDAPUsers from mockldap import MockLdap from mock import patch class TestLDAP(unittest.TestCase): def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() self.mockldap = MockLdap({ 'dc=quay,dc=io': {'dc': ['quay', 'io']}, 'ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees' }, 'ou=otheremployees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'otheremployees' }, 'uid=testy,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': 'testy', 'userPassword': ['password'] }, 'uid=someuser,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['someuser'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'] }, 'uid=nomail,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['nomail'], 'userPassword': ['somepass'] }, 'uid=cool.user,ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['cool.user', 'referred'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'] }, 'uid=referred,ou=employees,dc=quay,dc=io': { 'uid': ['referred'], '_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io' }, 'uid=invalidreferred,ou=employees,dc=quay,dc=io': { 'uid': ['invalidreferred'], '_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io' }, 'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': { 'uid': ['multientry'], 'mail': ['foo@bar.com'], 'userPassword': ['somepass'], }, 'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': { 'uid': ['multientry'], 'another': ['key'] }, 'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'otheremployees', 'uid': ['secondaryuser'], 'userPassword': ['somepass'], 'mail': ['foosecondary@bar.com'] }, }) self.mockldap.start() base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'password' user_rdn = ['ou=employees'] uid_attr = 'uid' email_attr = 'mail' secondary_user_rdns = ['ou=otheremployees'] ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) self.ldap = ldap def tearDown(self): self.mockldap.stop() finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def test_invalid_admin_password(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'INVALIDPASSWORD' user_rdn = ['ou=employees'] uid_attr = 'uid' email_attr = 'mail' ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) self.ldap = ldap # Try to login. (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'somepass') self.assertIsNone(response) self.assertEquals('LDAP Admin dn or password is invalid', err_msg) def test_login(self): # Verify we can login. (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') # Verify we can confirm the user. (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') def test_login_secondary(self): # Verify we can login. (response, _) = self.ldap.verify_and_link_user('secondaryuser', 'somepass') self.assertEquals(response.username, 'secondaryuser') # Verify we can confirm the user. (response, _) = self.ldap.confirm_existing_user('secondaryuser', 'somepass') self.assertEquals(response.username, 'secondaryuser') def test_invalid_password(self): # Verify we cannot login with an invalid password. (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid password') # Verify we cannot confirm the user. (response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass') self.assertIsNone(response) self.assertEquals(err_msg, 'Invalid user') def test_missing_mail(self): (response, err_msg) = self.ldap.verify_and_link_user('nomail', 'somepass') self.assertIsNone(response) self.assertEquals('Missing mail field "mail" in user record', err_msg) def test_confirm_different_username(self): # Verify that the user is logged in and their username was adjusted. (response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify we can confirm the user's quay username. (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify that we *cannot* confirm the LDAP username. (response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass') self.assertIsNone(response) def test_referral(self): (response, _) = self.ldap.verify_and_link_user('referred', 'somepass') self.assertEquals(response.username, 'cool_user') # Verify we can confirm the user's quay username. (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') self.assertEquals(response.username, 'cool_user') def test_invalid_referral(self): (response, _) = self.ldap.verify_and_link_user('invalidreferred', 'somepass') self.assertIsNone(response) def test_multientry(self): (response, _) = self.ldap.verify_and_link_user('multientry', 'somepass') self.assertEquals(response.username, 'multientry') def test_login_empty_userdn(self): base_dn = ['ou=employees', 'dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_passwd = 'password' user_rdn = [] uid_attr = 'uid' email_attr = 'mail' secondary_user_rdns = ['ou=otheremployees'] ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) self.ldap = ldap # Verify we can login. (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') # Verify we can confirm the user. (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') self.assertEquals(response.username, 'someuser') def test_link_user(self): # Link someuser. user, error_message = self.ldap.link_user('someuser') self.assertIsNone(error_message) self.assertIsNotNone(user) self.assertEquals('someuser', user.username) # Link again. Should return the same user record. user_again, _ = self.ldap.link_user('someuser') self.assertEquals(user_again.id, user.id) # Confirm someuser. result, _ = self.ldap.confirm_existing_user('someuser', 'somepass') self.assertIsNotNone(result) self.assertEquals('someuser', result.username) def test_query(self): def initializer(uri, trace_level=0): obj = self.mockldap[uri] # Seed to "support" wildcard queries, which MockLDAP does not support natively. obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([ ('uid=cool.user,ou=employees,dc=quay,dc=io', { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': ['cool.user', 'referred'], 'userPassword': ['somepass'], 'mail': ['foo@bar.com'] }) ]) obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([]) obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) return obj with patch('ldap.initialize', new=initializer): # Lookup cool. (response, error_message) = self.ldap.query_users('cool') self.assertIsNone(error_message) self.assertEquals(1, len(response)) user_info = response[0] self.assertEquals("cool.user", user_info.username) self.assertEquals("foo@bar.com", user_info.email) # Lookup unknown. (response, error_message) = self.ldap.query_users('unknown') self.assertIsNone(error_message) self.assertEquals(0, len(response)) if __name__ == '__main__': unittest.main()