import pytest from endpoints.oauthlogin import _conduct_oauth_login from endpoints.api.test.conftest import app, appconfig, database_uri, init_db_path, sqlitedb_file from oauth.services.github import GithubOAuthService from data import model, database from data.users import get_users_handler, DatabaseUsers from test.test_ldap import mock_ldap @pytest.fixture(params=[None, 'username', 'email']) def login_service(request, app): config = {'GITHUB': {}} if request is not None: config['GITHUB']['LOGIN_BINDING_FIELD'] = request.param return GithubOAuthService(config, 'GITHUB') @pytest.fixture(params=['Database', 'LDAP']) def auth_system(request): return _get_users_handler(request.param) def _get_users_handler(auth_type): config = {} config['AUTHENTICATION_TYPE'] = auth_type config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io'] config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io' config['LDAP_ADMIN_PASSWD'] = 'password' config['LDAP_USER_RDN'] = ['ou=employees'] return get_users_handler(config, None, None) def test_existing_account(auth_system, login_service): login_service_lid = 'someexternaluser' # Create an existing bound federated user. created_user = model.user.create_federated_user('someuser', 'example@example.com', login_service.service_id(), login_service_lid, False) existing_user_count = database.User.select().count() with mock_ldap(): result = _conduct_oauth_login(auth_system, login_service, login_service_lid, login_service_lid, 'example@example.com') assert result.to_login == created_user # Ensure that no addtional users were created. current_user_count = database.User.select().count() assert current_user_count == existing_user_count def test_new_account_via_database(login_service): existing_user_count = database.User.select().count() login_service_lid = 'someexternaluser' internal_auth = DatabaseUsers() # Conduct login. Since the external user doesn't (yet) bind to a user in the database, # a new user should be created and bound to the external service. result = _conduct_oauth_login(internal_auth, login_service, login_service_lid, login_service_lid, 'example@example.com') assert result.to_login is not None current_user_count = database.User.select().count() assert current_user_count == existing_user_count + 1 # Find the user and ensure it is bound. new_user = model.user.get_user(login_service_lid) federated_login = model.user.lookup_federated_login(new_user, login_service.service_id()) assert federated_login is not None @pytest.mark.parametrize('binding_field, lusername, lemail, expected_error', [ # No binding field + newly seen user -> New unlinked user (None, 'someunknownuser', 'someemail@example.com', None), # username binding field + unknown username -> Error. ('username', 'someunknownuser', 'foo@bar.com', 'username someunknownuser not found in backing auth system'), # email binding field + unknown email address -> Error. ('email', 'someuser', 'someemail@example.com', 'email someemail@example.com not found in backing auth system'), # No binding field + newly seen user -> New unlinked user. (None, 'someuser', 'foo@bar.com', None), # username binding field + valid username -> fully bound user. ('username', 'someuser', 'foo@bar.com', None), # email binding field + valid email -> fully bound user. ('email', 'someuser', 'foo@bar.com', None), # username binding field + valid username + invalid email -> fully bound user. ('username', 'someuser', 'another@email.com', None), # email binding field + valid email + invalid username -> fully bound user. ('email', 'someotherusername', 'foo@bar.com', None), ]) def test_new_account_via_ldap(binding_field, lusername, lemail, expected_error, app): existing_user_count = database.User.select().count() config = {'GITHUB': {}} if binding_field is not None: config['GITHUB']['LOGIN_BINDING_FIELD'] = binding_field external_auth = GithubOAuthService(config, 'GITHUB') internal_auth = _get_users_handler('LDAP') with mock_ldap(): # Conduct OAuth login. result = _conduct_oauth_login(internal_auth, external_auth, 'someid', lusername, lemail) assert result.error_message == expected_error current_user_count = database.User.select().count() if expected_error is None: # Ensure that the new user was created and that it is bound to both the # external login service and to LDAP (if a binding_field was given). assert current_user_count == existing_user_count + 1 assert result.to_login is not None # Check the service bindings. external_login = model.user.lookup_federated_login(result.to_login, external_auth.service_id()) assert external_login is not None internal_login = model.user.lookup_federated_login(result.to_login, internal_auth.federated_service) if binding_field is not None: assert internal_login is not None else: assert internal_login is None else: # Ensure that no addtional users were created. assert current_user_count == existing_user_count def test_existing_account_in_ldap(app): config = {'GITHUB': {'LOGIN_BINDING_FIELD': 'username'}} external_auth = GithubOAuthService(config, 'GITHUB') internal_auth = _get_users_handler('LDAP') # Add an existing federated user bound to the LDAP account associated with `someuser`. bound_user = model.user.create_federated_user('someuser', 'foo@bar.com', internal_auth.federated_service, 'someuser', False) existing_user_count = database.User.select().count() with mock_ldap(): # Conduct OAuth login with the same lid and bound field. This should find the existing LDAP # user (via the `username` binding), and then bind Github to it as well. result = _conduct_oauth_login(internal_auth, external_auth, bound_user.username, bound_user.username, bound_user.email) assert result.error_message is None # Ensure that the same user was returned, and that it is now bound to the Github account # as well. assert result.to_login.id == bound_user.id # Ensure that no additional users were created. current_user_count = database.User.select().count() assert current_user_count == existing_user_count # Check the service bindings. external_login = model.user.lookup_federated_login(result.to_login, external_auth.service_id()) assert external_login is not None internal_login = model.user.lookup_federated_login(result.to_login, internal_auth.federated_service) assert internal_login is not None