diff --git a/data/users/externalldap.py b/data/users/externalldap.py index 60c69a2b0..eae3dcea2 100644 --- a/data/users/externalldap.py +++ b/data/users/externalldap.py @@ -187,12 +187,12 @@ class LDAPUsers(FederatedUsers): def query_users(self, query, limit=20): """ Queries LDAP for matching users. """ if not query: - return (None, 'Empty query') + return (None, self.federated_service, 'Empty query') logger.debug('Got query %s with limit %s', query, limit) (results, err_msg) = self._ldap_user_search(query + '*', limit=limit) if err_msg is not None: - return (None, err_msg) + return (None, self.federated_service, err_msg) final_results = [] for result in results[0:limit]: @@ -203,7 +203,7 @@ class LDAPUsers(FederatedUsers): final_results.append(credentials) logger.debug('For query %s found results %s', query, final_results) - return (final_results, None) + return (final_results, self.federated_service, None) def verify_credentials(self, username_or_email, password): """ Verify the credentials with LDAP. """ diff --git a/data/users/keystone.py b/data/users/keystone.py index c018c83d7..6c8487a35 100644 --- a/data/users/keystone.py +++ b/data/users/keystone.py @@ -69,7 +69,7 @@ class KeystoneV2Users(FederatedUsers): return (UserInformation(username=username_or_email, email=email, id=user_id), None) def query_users(self, query, limit=20): - return (None, 'Unsupported in Keystone V2') + return (None, self.federated_service, 'Unsupported in Keystone V2') def get_user(self, username_or_email): return (None, 'Unsupported in Keystone V2') @@ -108,7 +108,7 @@ class KeystoneV3Users(FederatedUsers): return (None, kut.message or 'Invalid username or password') def get_user(self, username_or_email): - users_found, err_msg = self.query_users(username_or_email) + users_found, _, err_msg = self.query_users(username_or_email) if err_msg is not None: return (None, err_msg) @@ -128,7 +128,7 @@ class KeystoneV3Users(FederatedUsers): def query_users(self, query, limit=20): if len(query) < 3: - return ([], None) + return ([], self.federated_service, None) try: keystone_client = kv3client.Client(username=self.admin_username, password=self.admin_password, @@ -137,13 +137,13 @@ class KeystoneV3Users(FederatedUsers): found_users = list(_take(limit, keystone_client.users.list(name=query))) logger.debug('For Keystone query %s found users: %s', query, found_users) if not found_users: - return ([], None) + return ([], self.federated_service, None) - return ([self._user_info(user) for user in found_users], None) + return ([self._user_info(user) for user in found_users], self.federated_service, None) except KeystoneAuthorizationFailure as kaf: logger.exception('Keystone auth failure for admin user for query %s', query) - return (None, kaf.message or 'Invalid admin username or password') + return (None, self.federated_service, kaf.message or 'Invalid admin username or password') except KeystoneUnauthorized as kut: logger.exception('Keystone unauthorized for admin user for query %s', query) - return (None, kut.message or 'Invalid admin username or password') + return (None, self.federated_service, kut.message or 'Invalid admin username or password') diff --git a/endpoints/api/search.py b/endpoints/api/search.py index 8f0cc82b6..018ab713c 100644 --- a/endpoints/api/search.py +++ b/endpoints/api/search.py @@ -10,6 +10,7 @@ from auth.permissions import (OrganizationMemberPermission, ReadRepositoryPermis from auth.auth_context import get_authenticated_user from auth import scopes from app import avatar, authentication +from flask import abort from operator import itemgetter from stringscore import liquidmetal from util.names import parse_robot_username @@ -17,13 +18,15 @@ from util.names import parse_robot_username import anunidecode # Don't listen to pylint's lies. This import is required. import math -@show_if(authentication.federated_service) # Only enabled for non-DB auth. @resource('/v1/entities/link/') @internal_only class LinkExternalEntity(ApiResource): """ Resource for linking external entities to internal users. """ @nickname('linkExternalUser') def post(self, username): + if not authentication.federated_service: + abort(404) + # Only allowed if there is a logged in user. if not get_authenticated_user(): raise Unauthorized() diff --git a/test/helpers.py b/test/helpers.py index ee35152aa..29cc2ae10 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -1,4 +1,9 @@ +import multiprocessing +import time +import socket + from data.database import LogEntryKind, LogEntry +from contextlib import contextmanager class assert_action_logged(object): """ Specialized assertion for ensuring that a log entry of a particular kind was added under the @@ -20,3 +25,59 @@ class assert_action_logged(object): updated_count = self._get_log_count() error_msg = 'Missing new log entry of kind %s' % self.log_kind assert self.existing_count == (updated_count - 1), error_msg + +_LIVESERVER_TIMEOUT = 5 + +@contextmanager +def liveserver_app(flask_app, port): + """ + Based on https://github.com/jarus/flask-testing/blob/master/flask_testing/utils.py + + Runs the given Flask app as a live web server locally, on the given port, starting it + when called and terminating after the yield. + + Usage: + with liveserver_app(flask_app, port): + # Code that makes use of the app. + """ + shared = {} + + def _can_ping_server(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + sock.connect(('localhost', port)) + except socket.error: + success = False + else: + success = True + finally: + sock.close() + + return success + + def _spawn_live_server(): + worker = lambda app, port: app.run(port=port, use_reloader=False) + shared['process'] = multiprocessing.Process(target=worker, args=(flask_app, port)) + shared['process'].start() + + start_time = time.time() + while True: + elapsed_time = (time.time() - start_time) + if elapsed_time > _LIVESERVER_TIMEOUT: + _terminate_live_server() + raise RuntimeError("Failed to start the server after %d seconds. " % _LIVESERVER_TIMEOUT) + + if _can_ping_server(): + break + + def _terminate_live_server(): + if shared.get('process'): + shared.get('process').terminate() + shared.pop('process') + + try: + _spawn_live_server() + yield + finally: + _terminate_live_server() diff --git a/test/test_endtoend_auth.py b/test/test_endtoend_auth.py new file mode 100644 index 000000000..33df27bda --- /dev/null +++ b/test/test_endtoend_auth.py @@ -0,0 +1,57 @@ +import unittest +from mock import patch + +from endpoints.api.search import EntitySearch, LinkExternalEntity +from test.test_api_usage import ApiTestCase, ADMIN_ACCESS_USER + +from test.test_ldap import mock_ldap +from test.test_external_jwt_authn import fake_jwt +from test.test_keystone_auth import fake_keystone + +class EndToEndAuthMixin: + def test_entity_search(self): + with self.get_authentication() as auth: + with patch('endpoints.api.search.authentication', auth): + # Try an unknown prefix. + json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='unknown')) + results = json_data['results'] + self.assertEquals(0, len(results)) + + # Try a known prefix. + json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='cool')) + results = json_data['results'] + self.assertEquals(1, len(results)) + self.assertEquals('external', results[0]['kind']) + self.assertEquals('cool.user', results[0]['name']) + + def test_link_external_entity(self): + with self.get_authentication() as auth: + with patch('endpoints.api.search.authentication', auth): + self.login(ADMIN_ACCESS_USER) + + # Try an unknown user. + self.postResponse(LinkExternalEntity, params=dict(username='unknownuser'), + expected_code=400) + + # Try a known user. + json_data = self.postJsonResponse(LinkExternalEntity, params=dict(username='cool.user')) + entity = json_data['entity'] + self.assertEquals('cool_user', entity['name']) + self.assertEquals('user', entity['kind']) + + +class TestLDAPEndToEnd(ApiTestCase, EndToEndAuthMixin): + def get_authentication(self): + return mock_ldap() + +class TestJWTEndToEnd(ApiTestCase, EndToEndAuthMixin): + def get_authentication(self): + return fake_jwt() + +class TestKeystoneEndToEnd(ApiTestCase, EndToEndAuthMixin): + def get_authentication(self): + return fake_keystone(3) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/test_external_jwt_authn.py b/test/test_external_jwt_authn.py index 84141fca1..47b1fa4bf 100644 --- a/test/test_external_jwt_authn.py +++ b/test/test_external_jwt_authn.py @@ -3,148 +3,172 @@ import unittest from datetime import datetime, timedelta from tempfile import NamedTemporaryFile +from contextlib import contextmanager import jwt import requests from Crypto.PublicKey import RSA from flask import Flask, jsonify, request, make_response -from flask_testing import LiveServerTestCase from app import app from data.users import ExternalJWTAuthN from initdb import setup_database_for_testing, finished_database_for_testing +from test.helpers import liveserver_app _PORT_NUMBER = 5001 -class JWTAuthTestMixin(object): +@contextmanager +def fake_jwt(requires_email=True): + """ Context manager which instantiates and runs a webserver with a fake JWT implementation, + until the result is yielded. + + Usage: + with fake_jwt() as jwt_auth: + # Make jwt_auth requests. + """ + jwt_app, port, public_key = _create_app(requires_email) + server_url = 'http://' + jwt_app.config['SERVER_HOSTNAME'] + + verify_url = server_url + '/user/verify' + query_url = server_url + '/user/query' + getuser_url = server_url + '/user/get' + + jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '', + app.config['HTTPCLIENT'], 300, public_key.name, + requires_email=requires_email) + + with liveserver_app(jwt_app, port): + yield jwt_auth + +def _generate_certs(): + public_key = NamedTemporaryFile(delete=True) + + key = RSA.generate(1024) + private_key_data = key.exportKey('PEM') + + pubkey = key.publickey() + public_key.write(pubkey.exportKey('OpenSSH')) + public_key.seek(0) + + return (public_key, private_key_data) + +def _create_app(emails=True): + global _PORT_NUMBER + _PORT_NUMBER = _PORT_NUMBER + 1 + + public_key, private_key_data = _generate_certs() + + users = [ + {'name': 'cool.user', 'email': 'user@domain.com', 'password': 'password'}, + {'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'} + ] + + jwt_app = Flask('testjwt') + jwt_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER + + def _get_basic_auth(): + data = base64.b64decode(request.headers['Authorization'][len('Basic '):]) + return data.split(':', 1) + + @jwt_app.route('/user/query', methods=['GET']) + def query_users(): + query = request.args.get('query') + results = [] + + for user in users: + if user['name'].startswith(query): + result = { + 'username': user['name'], + } + + if emails: + result['email'] = user['email'] + + results.append(result) + + token_data = { + 'iss': 'authy', + 'aud': 'quay.io/jwtauthn/query', + 'nbf': datetime.utcnow(), + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + timedelta(seconds=60), + 'results': results, + } + + encoded = jwt.encode(token_data, private_key_data, 'RS256') + return jsonify({ + 'token': encoded + }) + + @jwt_app.route('/user/get', methods=['GET']) + def get_user(): + username = request.args.get('username') + + if username == 'disabled': + return make_response('User is currently disabled', 401) + + for user in users: + if user['name'] == username or user['email'] == username: + token_data = { + 'iss': 'authy', + 'aud': 'quay.io/jwtauthn/getuser', + 'nbf': datetime.utcnow(), + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + timedelta(seconds=60), + 'sub': user['name'], + 'email': user['email'], + } + + encoded = jwt.encode(token_data, private_key_data, 'RS256') + return jsonify({ + 'token': encoded + }) + + return make_response('Invalid username or password', 404) + + @jwt_app.route('/user/verify', methods=['GET']) + def verify_user(): + username, password = _get_basic_auth() + + if username == 'disabled': + return make_response('User is currently disabled', 401) + + for user in users: + if user['name'] == username or user['email'] == username: + if password != user['password']: + return make_response('', 404) + + token_data = { + 'iss': 'authy', + 'aud': 'quay.io/jwtauthn', + 'nbf': datetime.utcnow(), + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + timedelta(seconds=60), + 'sub': user['name'], + 'email': user['email'], + } + + encoded = jwt.encode(token_data, private_key_data, 'RS256') + return jsonify({ + 'token': encoded + }) + + return make_response('Invalid username or password', 404) + + jwt_app.config['TESTING'] = True + return jwt_app, _PORT_NUMBER, public_key + + +class JWTAuthTestMixin: + """ Mixin defining all the JWT auth tests. """ maxDiff = None @property def emails(self): raise NotImplementedError - @classmethod - def setUpClass(cls): - public_key = NamedTemporaryFile(delete=True) - - key = RSA.generate(1024) - private_key_data = key.exportKey('PEM') - - pubkey = key.publickey() - public_key.write(pubkey.exportKey('OpenSSH')) - public_key.seek(0) - - JWTAuthTestCase.public_key = public_key - JWTAuthTestCase.private_key_data = private_key_data - - def create_app(self): - global _PORT_NUMBER - _PORT_NUMBER = _PORT_NUMBER + 1 - - users = [ - {'name': 'cooluser', 'email': 'user@domain.com', 'password': 'password'}, - {'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'} - ] - - jwt_app = Flask('testjwt') - private_key = JWTAuthTestCase.private_key_data - jwt_app.config['LIVESERVER_PORT'] = _PORT_NUMBER - - def _get_basic_auth(): - data = base64.b64decode(request.headers['Authorization'][len('Basic '):]) - return data.split(':', 1) - - @jwt_app.route('/user/query', methods=['GET']) - def query_users(): - query = request.args.get('query') - results = [] - - for user in users: - if user['name'].startswith(query): - result = { - 'username': user['name'], - } - - if self.emails: - result['email'] = user['email'] - - results.append(result) - - token_data = { - 'iss': 'authy', - 'aud': 'quay.io/jwtauthn/query', - 'nbf': datetime.utcnow(), - 'iat': datetime.utcnow(), - 'exp': datetime.utcnow() + timedelta(seconds=60), - 'results': results, - } - - encoded = jwt.encode(token_data, private_key, 'RS256') - return jsonify({ - 'token': encoded - }) - - @jwt_app.route('/user/get', methods=['GET']) - def get_user(): - username = request.args.get('username') - - if username == 'disabled': - return make_response('User is currently disabled', 401) - - for user in users: - if user['name'] == username or user['email'] == username: - token_data = { - 'iss': 'authy', - 'aud': 'quay.io/jwtauthn/getuser', - 'nbf': datetime.utcnow(), - 'iat': datetime.utcnow(), - 'exp': datetime.utcnow() + timedelta(seconds=60), - 'sub': user['name'], - 'email': user['email'], - } - - encoded = jwt.encode(token_data, private_key, 'RS256') - return jsonify({ - 'token': encoded - }) - - return make_response('Invalid username or password', 404) - - @jwt_app.route('/user/verify', methods=['GET']) - def verify_user(): - username, password = _get_basic_auth() - - if username == 'disabled': - return make_response('User is currently disabled', 401) - - for user in users: - if user['name'] == username or user['email'] == username: - if password != user['password']: - return make_response('', 404) - - token_data = { - 'iss': 'authy', - 'aud': 'quay.io/jwtauthn', - 'nbf': datetime.utcnow(), - 'iat': datetime.utcnow(), - 'exp': datetime.utcnow() + timedelta(seconds=60), - 'sub': user['name'], - 'email': user['email'], - } - - encoded = jwt.encode(token_data, private_key, 'RS256') - return jsonify({ - 'token': encoded - }) - - return make_response('Invalid username or password', 404) - - jwt_app.config['TESTING'] = True - return jwt_app - - def setUp(self): setup_database_for_testing(self) self.app = app.test_client() @@ -153,139 +177,140 @@ class JWTAuthTestMixin(object): self.session = requests.Session() - verify_url = self.get_server_url() + '/user/verify' - query_url = self.get_server_url() + '/user/query' - getuser_url = self.get_server_url() + '/user/get' - - self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '', - app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name, - requires_email=self.emails) - def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def test_verify_and_link_user(self): - result, error_message = self.jwt_auth.verify_and_link_user('invaliduser', 'foobar') - self.assertEquals('Invalid username or password', error_message) - self.assertIsNone(result) + with fake_jwt(self.emails) as jwt_auth: + result, error_message = jwt_auth.verify_and_link_user('invaliduser', 'foobar') + self.assertEquals('Invalid username or password', error_message) + self.assertIsNone(result) - result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'invalidpassword') - self.assertIsNone(result) + result, _ = jwt_auth.verify_and_link_user('cool.user', 'invalidpassword') + self.assertIsNone(result) - result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password') - self.assertIsNotNone(result) - self.assertEquals('cooluser', result.username) + result, _ = jwt_auth.verify_and_link_user('cool.user', 'password') + self.assertIsNotNone(result) + self.assertEquals('cool_user', result.username) - result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar') - self.assertIsNotNone(result) - self.assertEquals('some_neat_user', result.username) + result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar') + self.assertIsNotNone(result) + self.assertEquals('some_neat_user', result.username) def test_confirm_existing_user(self): - # Create the users in the DB. - result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password') - self.assertIsNotNone(result) + with fake_jwt(self.emails) as jwt_auth: + # Create the users in the DB. + result, _ = jwt_auth.verify_and_link_user('cool.user', 'password') + self.assertIsNotNone(result) - result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar') - self.assertIsNotNone(result) + result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar') + self.assertIsNotNone(result) - # Confirm a user with the same internal and external username. - result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword') - self.assertIsNone(result) + # Confirm a user with the same internal and external username. + result, _ = jwt_auth.confirm_existing_user('cool_user', 'invalidpassword') + self.assertIsNone(result) - result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password') - self.assertIsNotNone(result) - self.assertEquals('cooluser', result.username) + result, _ = jwt_auth.confirm_existing_user('cool_user', 'password') + self.assertIsNotNone(result) + self.assertEquals('cool_user', result.username) - # Fail to confirm the *external* username, which should return nothing. - result, _ = self.jwt_auth.confirm_existing_user('some.neat.user', 'password') - self.assertIsNone(result) + # Fail to confirm the *external* username, which should return nothing. + result, _ = jwt_auth.confirm_existing_user('some.neat.user', 'password') + self.assertIsNone(result) - # Now confirm the internal username. - result, _ = self.jwt_auth.confirm_existing_user('some_neat_user', 'foobar') - self.assertIsNotNone(result) - self.assertEquals('some_neat_user', result.username) + # Now confirm the internal username. + result, _ = jwt_auth.confirm_existing_user('some_neat_user', 'foobar') + self.assertIsNotNone(result) + self.assertEquals('some_neat_user', result.username) def test_disabled_user_custom_error(self): - result, error_message = self.jwt_auth.verify_and_link_user('disabled', 'password') - self.assertIsNone(result) - self.assertEquals('User is currently disabled', error_message) + with fake_jwt(self.emails) as jwt_auth: + result, error_message = jwt_auth.verify_and_link_user('disabled', 'password') + self.assertIsNone(result) + self.assertEquals('User is currently disabled', error_message) def test_query(self): - # Lookup `cool`. - results, identifier, error_message = self.jwt_auth.query_users('cool') - self.assertIsNone(error_message) - self.assertEquals('jwtauthn', identifier) - self.assertEquals(1, len(results)) + with fake_jwt(self.emails) as jwt_auth: + # Lookup `cool`. + results, identifier, error_message = jwt_auth.query_users('cool') + self.assertIsNone(error_message) + self.assertEquals('jwtauthn', identifier) + self.assertEquals(1, len(results)) - self.assertEquals('cooluser', results[0].username) - self.assertEquals('user@domain.com' if self.emails else None, results[0].email) + self.assertEquals('cool.user', results[0].username) + self.assertEquals('user@domain.com' if self.emails else None, results[0].email) - # Lookup `some`. - results, identifier, error_message = self.jwt_auth.query_users('some') - self.assertIsNone(error_message) - self.assertEquals('jwtauthn', identifier) - self.assertEquals(1, len(results)) + # Lookup `some`. + results, identifier, error_message = jwt_auth.query_users('some') + self.assertIsNone(error_message) + self.assertEquals('jwtauthn', identifier) + self.assertEquals(1, len(results)) - self.assertEquals('some.neat.user', results[0].username) - self.assertEquals('neat@domain.com' if self.emails else None, results[0].email) + self.assertEquals('some.neat.user', results[0].username) + self.assertEquals('neat@domain.com' if self.emails else None, results[0].email) - # Lookup `unknown`. - results, identifier, error_message = self.jwt_auth.query_users('unknown') - self.assertIsNone(error_message) - self.assertEquals('jwtauthn', identifier) - self.assertEquals(0, len(results)) + # Lookup `unknown`. + results, identifier, error_message = jwt_auth.query_users('unknown') + self.assertIsNone(error_message) + self.assertEquals('jwtauthn', identifier) + self.assertEquals(0, len(results)) def test_get_user(self): - # Lookup cooluser. - result, error_message = self.jwt_auth.get_user('cooluser') - self.assertIsNone(error_message) - self.assertIsNotNone(result) + with fake_jwt(self.emails) as jwt_auth: + # Lookup cool.user. + result, error_message = jwt_auth.get_user('cool.user') + self.assertIsNone(error_message) + self.assertIsNotNone(result) - self.assertEquals('cooluser', result.username) - self.assertEquals('user@domain.com', result.email) + self.assertEquals('cool.user', result.username) + self.assertEquals('user@domain.com', result.email) - # Lookup some.neat.user. - result, error_message = self.jwt_auth.get_user('some.neat.user') - self.assertIsNone(error_message) - self.assertIsNotNone(result) + # Lookup some.neat.user. + result, error_message = jwt_auth.get_user('some.neat.user') + self.assertIsNone(error_message) + self.assertIsNotNone(result) - self.assertEquals('some.neat.user', result.username) - self.assertEquals('neat@domain.com', result.email) + self.assertEquals('some.neat.user', result.username) + self.assertEquals('neat@domain.com', result.email) - # Lookup unknown user. - result, error_message = self.jwt_auth.get_user('unknownuser') - self.assertIsNone(result) + # Lookup unknown user. + result, error_message = jwt_auth.get_user('unknownuser') + self.assertIsNone(result) def test_link_user(self): - # Link cooluser. - user, error_message = self.jwt_auth.link_user('cooluser') - self.assertIsNone(error_message) - self.assertIsNotNone(user) - self.assertEquals('cooluser', user.username) + with fake_jwt(self.emails) as jwt_auth: + # Link cool.user. + user, error_message = jwt_auth.link_user('cool.user') + self.assertIsNone(error_message) + self.assertIsNotNone(user) + self.assertEquals('cool_user', user.username) - # Link again. Should return the same user record. - user_again, _ = self.jwt_auth.link_user('cooluser') - self.assertEquals(user_again.id, user.id) + # Link again. Should return the same user record. + user_again, _ = jwt_auth.link_user('cool.user') + self.assertEquals(user_again.id, user.id) - # Confirm cooluser. - result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password') - self.assertIsNotNone(result) - self.assertEquals('cooluser', result.username) + # Confirm cool.user. + result, _ = jwt_auth.confirm_existing_user('cool_user', 'password') + self.assertIsNotNone(result) + self.assertEquals('cool_user', result.username) def test_link_invalid_user(self): - user, error_message = self.jwt_auth.link_user('invaliduser') - self.assertIsNotNone(error_message) - self.assertIsNone(user) + with fake_jwt(self.emails) as jwt_auth: + user, error_message = jwt_auth.link_user('invaliduser') + self.assertIsNotNone(error_message) + self.assertIsNone(user) -class JWTAuthNoEmailTestCase(JWTAuthTestMixin, LiveServerTestCase): +class JWTAuthNoEmailTestCase(JWTAuthTestMixin, unittest.TestCase): + """ Test cases for JWT auth, with emails disabled. """ @property def emails(self): return False -class JWTAuthTestCase(JWTAuthTestMixin, LiveServerTestCase): +class JWTAuthTestCase(JWTAuthTestMixin, unittest.TestCase): + """ Test cases for JWT auth, with emails enabled. """ @property def emails(self): return True diff --git a/test/test_keystone_auth.py b/test/test_keystone_auth.py index 9d1254016..c70261241 100644 --- a/test/test_keystone_auth.py +++ b/test/test_keystone_auth.py @@ -5,197 +5,225 @@ import unittest import requests from flask import Flask, request, abort, make_response -from flask_testing import LiveServerTestCase +from contextlib import contextmanager +from helpers import liveserver_app from data.users.keystone import get_keystone_users from initdb import setup_database_for_testing, finished_database_for_testing _PORT_NUMBER = 5001 -class KeystoneAuthTestsMixin(): +@contextmanager +def fake_keystone(version, requires_email=True): + """ Context manager which instantiates and runs a webserver with a fake Keystone implementation, + until the result is yielded. + + Usage: + with fake_keystone(version) as keystone_auth: + # Make keystone_auth requests. + """ + keystone_app, port = _create_app(requires_email) + server_url = 'http://' + keystone_app.config['SERVER_HOSTNAME'] + endpoint_url = server_url + '/v3' + if version == 2: + endpoint_url = server_url + '/v2.0/auth' + + keystone_auth = get_keystone_users(version, endpoint_url, + 'adminuser', 'adminpass', 'admintenant', + requires_email=requires_email) + with liveserver_app(keystone_app, port): + yield keystone_auth + + +def _create_app(requires_email=True): + global _PORT_NUMBER + _PORT_NUMBER = _PORT_NUMBER + 1 + + server_url = 'http://localhost:%s' % (_PORT_NUMBER) + + users = [ + {'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'}, + {'username': 'cool.user', 'name': 'Cool User', 'password': 'password'}, + {'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'}, + ] + + ks_app = Flask('testks') + ks_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER + if os.environ.get('DEBUG') == 'true': + ks_app.config['DEBUG'] = True + + @ks_app.route('/v2.0/admin/users/', methods=['GET']) + def getuser(userid): + for user in users: + if user['username'] == userid: + user_data = {} + if requires_email: + user_data['email'] = userid + '@example.com' + + return json.dumps({ + 'user': user_data + }) + + abort(404) + + @ks_app.route('/v3/identity/users/', methods=['GET']) + def getv3user(userid): + for user in users: + if user['username'] == userid: + user_data = { + "domain_id": "default", + "enabled": True, + "id": user['username'], + "links": {}, + "name": user['username'], + } + + if requires_email: + user_data['email'] = user['username'] + '@example.com' + + return json.dumps({ + 'user': user_data + }) + + abort(404) + + @ks_app.route('/v3/identity/users', methods=['GET']) + def v3identity(): + returned = [] + for user in users: + if not request.args.get('name') or user['username'].startswith(request.args.get('name')): + returned.append({ + "domain_id": "default", + "enabled": True, + "id": user['username'], + "links": {}, + "name": user['username'], + "email": user['username'] + '@example.com', + }) + + return json.dumps({"users": returned}) + + @ks_app.route('/v3/auth/tokens', methods=['POST']) + def v3tokens(): + creds = request.json['auth']['identity']['password']['user'] + for user in users: + if creds['name'] == user['username'] and creds['password'] == user['password']: + data = json.dumps({ + "token": { + "methods": [ + "password" + ], + "roles": [ + { + "id": "9fe2ff9ee4384b1894a90878d3e92bab", + "name": "_member_" + }, + { + "id": "c703057be878458588961ce9a0ce686b", + "name": "admin" + } + ], + "project": { + "domain": { + "id": "default", + "name": "Default" + }, + "id": "8538a3f13f9541b28c2620eb19065e45", + "name": "admin" + }, + "catalog": [ + { + "endpoints": [ + { + "url": server_url + '/v3/identity', + "region": "RegionOne", + "interface": "admin", + "id": "29beb2f1567642eb810b042b6719ea88" + }, + ], + "type": "identity", + "id": "bd73972c0e14fb69bae8ff76e112a90", + "name": "keystone" + } + ], + "extras": { + + }, + "user": { + "domain": { + "id": "default", + "name": "Default" + }, + "id": user['username'], + "name": "admin" + }, + "audit_ids": [ + "yRt0UrxJSs6-WYJgwEMMmg" + ], + "issued_at": "2014-06-16T22:24:26.089380", + "expires_at": "2020-06-16T23:24:26Z", + } + }) + + response = make_response(data, 200) + response.headers['X-Subject-Token'] = 'sometoken' + return response + + abort(403) + + @ks_app.route('/v2.0/auth/tokens', methods=['POST']) + def tokens(): + creds = request.json['auth'][u'passwordCredentials'] + for user in users: + if creds['username'] == user['username'] and creds['password'] == user['password']: + return json.dumps({ + "access": { + "token": { + "issued_at": "2014-06-16T22:24:26.089380", + "expires": "2020-06-16T23:24:26Z", + "id": creds['username'], + "tenant": {"id": "sometenant"}, + }, + "serviceCatalog":[ + { + "endpoints": [ + { + "adminURL": server_url + '/v2.0/admin', + } + ], + "endpoints_links": [], + "type": "identity", + "name": "admin", + }, + ], + "user": { + "username": creds['username'], + "roles_links": [], + "id": creds['username'], + "roles": [], + "name": user['name'], + }, + "metadata": { + "is_admin": 0, + "roles": [], + }, + }, + }) + + abort(403) + + return ks_app, _PORT_NUMBER + + +class KeystoneAuthTestsMixin: maxDiff = None @property def emails(self): raise NotImplementedError - def create_app(self): - global _PORT_NUMBER - _PORT_NUMBER = _PORT_NUMBER + 1 - - users = [ - {'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'}, - {'username': 'cooluser', 'name': 'Cool User', 'password': 'password'}, - {'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'}, - ] - - ks_app = Flask('testks') - ks_app.config['LIVESERVER_PORT'] = _PORT_NUMBER - - if os.environ.get('DEBUG') == 'true': - ks_app.config['DEBUG'] = True - - @ks_app.route('/v2.0/admin/users/', methods=['GET']) - def getuser(userid): - for user in users: - if user['username'] == userid: - user_data = {} - if self.emails: - user_data['email'] = userid + '@example.com' - - return json.dumps({ - 'user': user_data - }) - - abort(404) - - @ks_app.route('/v3/identity/users/', methods=['GET']) - def getv3user(userid): - for user in users: - if user['username'] == userid: - user_data = { - "domain_id": "default", - "enabled": True, - "id": user['username'], - "links": {}, - "name": user['username'], - } - - if self.emails: - user_data['email'] = user['username'] + '@example.com' - - return json.dumps({ - 'user': user_data - }) - - abort(404) - - @ks_app.route('/v3/identity/users', methods=['GET']) - def v3identity(): - returned = [] - for user in users: - if not request.args.get('name') or user['username'].startswith(request.args.get('name')): - returned.append({ - "domain_id": "default", - "enabled": True, - "id": user['username'], - "links": {}, - "name": user['username'], - "email": user['username'] + '@example.com', - }) - - return json.dumps({"users": returned}) - - @ks_app.route('/v3/auth/tokens', methods=['POST']) - def v3tokens(): - creds = request.json['auth']['identity']['password']['user'] - for user in users: - if creds['name'] == user['username'] and creds['password'] == user['password']: - data = json.dumps({ - "token": { - "methods": [ - "password" - ], - "roles": [ - { - "id": "9fe2ff9ee4384b1894a90878d3e92bab", - "name": "_member_" - }, - { - "id": "c703057be878458588961ce9a0ce686b", - "name": "admin" - } - ], - "project": { - "domain": { - "id": "default", - "name": "Default" - }, - "id": "8538a3f13f9541b28c2620eb19065e45", - "name": "admin" - }, - "catalog": [ - { - "endpoints": [ - { - "url": self.get_server_url() + '/v3/identity', - "region": "RegionOne", - "interface": "admin", - "id": "29beb2f1567642eb810b042b6719ea88" - }, - ], - "type": "identity", - "id": "bd73972c0e14fb69bae8ff76e112a90", - "name": "keystone" - } - ], - "extras": { - - }, - "user": { - "domain": { - "id": "default", - "name": "Default" - }, - "id": user['username'], - "name": "admin" - }, - "audit_ids": [ - "yRt0UrxJSs6-WYJgwEMMmg" - ], - "issued_at": "2014-06-16T22:24:26.089380", - "expires_at": "2020-06-16T23:24:26Z", - } - }) - - response = make_response(data, 200) - response.headers['X-Subject-Token'] = 'sometoken' - return response - - abort(403) - - @ks_app.route('/v2.0/auth/tokens', methods=['POST']) - def tokens(): - creds = request.json['auth'][u'passwordCredentials'] - for user in users: - if creds['username'] == user['username'] and creds['password'] == user['password']: - return json.dumps({ - "access": { - "token": { - "issued_at": "2014-06-16T22:24:26.089380", - "expires": "2020-06-16T23:24:26Z", - "id": creds['username'], - "tenant": {"id": "sometenant"}, - }, - "serviceCatalog":[ - { - "endpoints": [ - { - "adminURL": self.get_server_url() + '/v2.0/admin', - } - ], - "endpoints_links": [], - "type": "identity", - "name": "admin", - }, - ], - "user": { - "username": creds['username'], - "roles_links": [], - "id": creds['username'], - "roles": [], - "name": user['name'], - }, - "metadata": { - "is_admin": 0, - "roles": [], - }, - }, - }) - - abort(403) - - return ks_app + def fake_keystone(self): + raise NotImplementedError def setUp(self): setup_database_for_testing(self) @@ -204,100 +232,95 @@ class KeystoneAuthTestsMixin(): def tearDown(self): finished_database_for_testing(self) - @property - def keystone(self): - raise NotImplementedError - def test_invalid_user(self): - (user, _) = self.keystone.verify_credentials('unknownuser', 'password') - self.assertIsNone(user) + with self.fake_keystone() as keystone: + (user, _) = keystone.verify_credentials('unknownuser', 'password') + self.assertIsNone(user) def test_invalid_password(self): - (user, _) = self.keystone.verify_credentials('cooluser', 'notpassword') - self.assertIsNone(user) + with self.fake_keystone() as keystone: + (user, _) = keystone.verify_credentials('cool.user', 'notpassword') + self.assertIsNone(user) def test_cooluser(self): - (user, _) = self.keystone.verify_credentials('cooluser', 'password') - self.assertEquals(user.username, 'cooluser') - self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None) + with self.fake_keystone() as keystone: + (user, _) = keystone.verify_credentials('cool.user', 'password') + self.assertEquals(user.username, 'cool.user') + self.assertEquals(user.email, 'cool.user@example.com' if self.emails else None) def test_neatuser(self): - (user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar') - self.assertEquals(user.username, 'some.neat.user') - self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None) + with self.fake_keystone() as keystone: + (user, _) = keystone.verify_credentials('some.neat.user', 'foobar') + self.assertEquals(user.username, 'some.neat.user') + self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None) -class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase): - @property - def keystone(self): - return get_keystone_users(2, self.get_server_url() + '/v2.0/auth', - 'adminuser', 'adminpass', 'admintenant', - requires_email=False) + +class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase): + def fake_keystone(self): + return fake_keystone(2, requires_email=False) @property def emails(self): return False -class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase): - @property - def keystone(self): - return get_keystone_users(3, self.get_server_url() + '/v3', - 'adminuser', 'adminpass', 'admintenant', - requires_email=False) +class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase): + def fake_keystone(self): + return fake_keystone(3, requires_email=False) + @property def emails(self): return False -class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): - @property - def keystone(self): - return get_keystone_users(2, self.get_server_url() + '/v2.0/auth', - 'adminuser', 'adminpass', 'admintenant', - requires_email=True) +class KeystoneV2AuthTests(KeystoneAuthTestsMixin, unittest.TestCase): + def fake_keystone(self): + return fake_keystone(2, requires_email=True) @property def emails(self): return True -class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): - @property - def keystone(self): - return get_keystone_users(3, self.get_server_url() + '/v3', - 'adminuser', 'adminpass', 'admintenant', - requires_email=True) +class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase): + def fake_keystone(self): + return fake_keystone(3, requires_email=True) def emails(self): return True def test_query(self): - # Lookup cool. - (response, error_message) = self.keystone.query_users('cool') - self.assertIsNone(error_message) - self.assertEquals(1, len(response)) + with self.fake_keystone() as keystone: + # Lookup cool. + (response, federated_id, error_message) = keystone.query_users('cool') + self.assertIsNone(error_message) + self.assertEquals(1, len(response)) + self.assertEquals('keystone', federated_id) - user_info = response[0] - self.assertEquals("cooluser", user_info.username) + user_info = response[0] + self.assertEquals("cool.user", user_info.username) - # Lookup unknown. - (response, error_message) = self.keystone.query_users('unknown') - self.assertIsNone(error_message) - self.assertEquals(0, len(response)) + # Lookup unknown. + (response, federated_id, error_message) = keystone.query_users('unknown') + self.assertIsNone(error_message) + self.assertEquals(0, len(response)) + self.assertEquals('keystone', federated_id) def test_link_user(self): - # Link someuser. - user, error_message = self.keystone.link_user('cooluser') - self.assertIsNone(error_message) - self.assertIsNotNone(user) - self.assertEquals('cooluser', user.username) - self.assertEquals('cooluser@example.com', user.email) + with self.fake_keystone() as keystone: + # Link someuser. + user, error_message = keystone.link_user('cool.user') + self.assertIsNone(error_message) + self.assertIsNotNone(user) + self.assertEquals('cool_user', user.username) + self.assertEquals('cool.user@example.com', user.email) - # Link again. Should return the same user record. - user_again, _ = self.keystone.link_user('cooluser') - self.assertEquals(user_again.id, user.id) + # Link again. Should return the same user record. + user_again, _ = keystone.link_user('cool.user') + self.assertEquals(user_again.id, user.id) + + # Confirm someuser. + result, _ = keystone.confirm_existing_user('cool_user', 'password') + self.assertIsNotNone(result) + self.assertEquals('cool_user', result.username) - # Confirm someuser. - result, _ = self.keystone.confirm_existing_user('cooluser', 'password') - self.assertIsNotNone(result) - self.assertEquals('cooluser', result.username) if __name__ == '__main__': unittest.main() diff --git a/test/test_ldap.py b/test/test_ldap.py index bb264a2b2..68b26c083 100644 --- a/test/test_ldap.py +++ b/test/test_ldap.py @@ -6,6 +6,112 @@ from data.users import LDAPUsers from data import model from mockldap import MockLdap from mock import patch +from contextlib import contextmanager + +def _create_ldap(requires_email=True): + base_dn = ['dc=quay', 'dc=io'] + admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' + admin_passwd = 'password' + user_rdn = ['ou=employees'] + uid_attr = 'uid' + email_attr = 'mail' + secondary_user_rdns = ['ou=otheremployees'] + + ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, + uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns, + requires_email=requires_email) + return ldap + +@contextmanager +def mock_ldap(requires_email=True): + mockldap = MockLdap({ + 'dc=quay,dc=io': {'dc': ['quay', 'io']}, + 'ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees' + }, + 'ou=otheremployees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'otheremployees' + }, + 'uid=testy,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': 'testy', + 'userPassword': ['password'] + }, + 'uid=someuser,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['someuser'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] + }, + 'uid=nomail,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['nomail'], + 'userPassword': ['somepass'] + }, + 'uid=cool.user,ou=employees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['cool.user', 'referred'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] + }, + 'uid=referred,ou=employees,dc=quay,dc=io': { + 'uid': ['referred'], + '_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io' + }, + 'uid=invalidreferred,ou=employees,dc=quay,dc=io': { + 'uid': ['invalidreferred'], + '_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io' + }, + 'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': { + 'uid': ['multientry'], + 'mail': ['foo@bar.com'], + 'userPassword': ['somepass'], + }, + 'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': { + 'uid': ['multientry'], + 'another': ['key'] + }, + 'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': { + 'dc': ['quay', 'io'], + 'ou': 'otheremployees', + 'uid': ['secondaryuser'], + 'userPassword': ['somepass'], + 'mail': ['foosecondary@bar.com'] + }, + }) + + def initializer(uri, trace_level=0): + obj = mockldap[uri] + + # Seed to "support" wildcard queries, which MockLDAP does not support natively. + obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([ + ('uid=cool.user,ou=employees,dc=quay,dc=io', { + 'dc': ['quay', 'io'], + 'ou': 'employees', + 'uid': ['cool.user', 'referred'], + 'userPassword': ['somepass'], + 'mail': ['foo@bar.com'] + }) + ]) + + obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([]) + + obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) + obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, + '(|(uid=unknown*)(mail=unknown*))')([]) + return obj + + mockldap.start() + with patch('ldap.initialize', new=initializer): + yield _create_ldap(requires_email=requires_email) + mockldap.stop() + class TestLDAP(unittest.TestCase): def setUp(self): @@ -14,90 +120,10 @@ class TestLDAP(unittest.TestCase): self.ctx = app.test_request_context() self.ctx.__enter__() - self.mockldap = MockLdap({ - 'dc=quay,dc=io': {'dc': ['quay', 'io']}, - 'ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees' - }, - 'ou=otheremployees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'otheremployees' - }, - 'uid=testy,ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': 'testy', - 'userPassword': ['password'] - }, - 'uid=someuser,ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': ['someuser'], - 'userPassword': ['somepass'], - 'mail': ['foo@bar.com'] - }, - 'uid=nomail,ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': ['nomail'], - 'userPassword': ['somepass'] - }, - 'uid=cool.user,ou=employees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': ['cool.user', 'referred'], - 'userPassword': ['somepass'], - 'mail': ['foo@bar.com'] - }, - 'uid=referred,ou=employees,dc=quay,dc=io': { - 'uid': ['referred'], - '_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io' - }, - 'uid=invalidreferred,ou=employees,dc=quay,dc=io': { - 'uid': ['invalidreferred'], - '_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io' - }, - 'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': { - 'uid': ['multientry'], - 'mail': ['foo@bar.com'], - 'userPassword': ['somepass'], - }, - 'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': { - 'uid': ['multientry'], - 'another': ['key'] - }, - 'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': { - 'dc': ['quay', 'io'], - 'ou': 'otheremployees', - 'uid': ['secondaryuser'], - 'userPassword': ['somepass'], - 'mail': ['foosecondary@bar.com'] - }, - }) - - self.mockldap.start() - self.ldap = self._create_ldap(requires_email=True) - def tearDown(self): - self.mockldap.stop() finished_database_for_testing(self) self.ctx.__exit__(True, None, None) - def _create_ldap(self, requires_email=True): - base_dn = ['dc=quay', 'dc=io'] - admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' - admin_passwd = 'password' - user_rdn = ['ou=employees'] - uid_attr = 'uid' - email_attr = 'mail' - secondary_user_rdns = ['ou=otheremployees'] - - ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, - uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns, - requires_email=requires_email) - return ldap - def test_invalid_admin_password(self): base_dn = ['dc=quay', 'dc=io'] admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' @@ -106,160 +132,148 @@ class TestLDAP(unittest.TestCase): uid_attr = 'uid' email_attr = 'mail' - ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, - uid_attr, email_attr) + with mock_ldap(): + ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, + uid_attr, email_attr) - self.ldap = ldap - - # Try to login. - (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'somepass') - self.assertIsNone(response) - self.assertEquals('LDAP Admin dn or password is invalid', err_msg) + # Try to login. + (response, err_msg) = ldap.verify_and_link_user('someuser', 'somepass') + self.assertIsNone(response) + self.assertEquals('LDAP Admin dn or password is invalid', err_msg) def test_login(self): - # Verify we can login. - (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') - self.assertEquals(response.username, 'someuser') - self.assertTrue(model.user.has_user_prompt(response, 'confirm_username')) + with mock_ldap() as ldap: + # Verify we can login. + (response, _) = ldap.verify_and_link_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') + self.assertTrue(model.user.has_user_prompt(response, 'confirm_username')) - # Verify we can confirm the user. - (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') - self.assertEquals(response.username, 'someuser') + # Verify we can confirm the user. + (response, _) = ldap.confirm_existing_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') def test_login_secondary(self): - # Verify we can login. - (response, _) = self.ldap.verify_and_link_user('secondaryuser', 'somepass') - self.assertEquals(response.username, 'secondaryuser') + with mock_ldap() as ldap: + # Verify we can login. + (response, _) = ldap.verify_and_link_user('secondaryuser', 'somepass') + self.assertEquals(response.username, 'secondaryuser') - # Verify we can confirm the user. - (response, _) = self.ldap.confirm_existing_user('secondaryuser', 'somepass') - self.assertEquals(response.username, 'secondaryuser') + # Verify we can confirm the user. + (response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass') + self.assertEquals(response.username, 'secondaryuser') def test_invalid_password(self): - # Verify we cannot login with an invalid password. - (response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass') - self.assertIsNone(response) - self.assertEquals(err_msg, 'Invalid password') + with mock_ldap() as ldap: + # Verify we cannot login with an invalid password. + (response, err_msg) = ldap.verify_and_link_user('someuser', 'invalidpass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Invalid password') - # Verify we cannot confirm the user. - (response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass') - self.assertIsNone(response) - self.assertEquals(err_msg, 'Invalid user') + # Verify we cannot confirm the user. + (response, err_msg) = ldap.confirm_existing_user('someuser', 'invalidpass') + self.assertIsNone(response) + self.assertEquals(err_msg, 'Invalid user') def test_missing_mail(self): - (response, err_msg) = self.ldap.get_user('nomail') - self.assertIsNone(response) - self.assertEquals('Missing mail field "mail" in user record', err_msg) + with mock_ldap() as ldap: + (response, err_msg) = ldap.get_user('nomail') + self.assertIsNone(response) + self.assertEquals('Missing mail field "mail" in user record', err_msg) def test_missing_mail_allowed(self): - ldap = self._create_ldap(requires_email=False) - (response, _) = ldap.get_user('nomail') - self.assertEquals(response.username, 'nomail') + with mock_ldap(requires_email=False) as ldap: + (response, _) = ldap.get_user('nomail') + self.assertEquals(response.username, 'nomail') def test_confirm_different_username(self): - # Verify that the user is logged in and their username was adjusted. - (response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass') - self.assertEquals(response.username, 'cool_user') + with mock_ldap() as ldap: + # Verify that the user is logged in and their username was adjusted. + (response, _) = ldap.verify_and_link_user('cool.user', 'somepass') + self.assertEquals(response.username, 'cool_user') - # Verify we can confirm the user's quay username. - (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') - self.assertEquals(response.username, 'cool_user') + # Verify we can confirm the user's quay username. + (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') + self.assertEquals(response.username, 'cool_user') - # Verify that we *cannot* confirm the LDAP username. - (response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass') - self.assertIsNone(response) + # Verify that we *cannot* confirm the LDAP username. + (response, _) = ldap.confirm_existing_user('cool.user', 'somepass') + self.assertIsNone(response) def test_referral(self): - (response, _) = self.ldap.verify_and_link_user('referred', 'somepass') - self.assertEquals(response.username, 'cool_user') + with mock_ldap() as ldap: + (response, _) = ldap.verify_and_link_user('referred', 'somepass') + self.assertEquals(response.username, 'cool_user') - # Verify we can confirm the user's quay username. - (response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') - self.assertEquals(response.username, 'cool_user') + # Verify we can confirm the user's quay username. + (response, _) = ldap.confirm_existing_user('cool_user', 'somepass') + self.assertEquals(response.username, 'cool_user') def test_invalid_referral(self): - (response, _) = self.ldap.verify_and_link_user('invalidreferred', 'somepass') - self.assertIsNone(response) + with mock_ldap() as ldap: + (response, _) = ldap.verify_and_link_user('invalidreferred', 'somepass') + self.assertIsNone(response) def test_multientry(self): - (response, _) = self.ldap.verify_and_link_user('multientry', 'somepass') - self.assertEquals(response.username, 'multientry') + with mock_ldap() as ldap: + (response, _) = ldap.verify_and_link_user('multientry', 'somepass') + self.assertEquals(response.username, 'multientry') def test_login_empty_userdn(self): - base_dn = ['ou=employees', 'dc=quay', 'dc=io'] - admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' - admin_passwd = 'password' - user_rdn = [] - uid_attr = 'uid' - email_attr = 'mail' - secondary_user_rdns = ['ou=otheremployees'] + with mock_ldap(): + base_dn = ['ou=employees', 'dc=quay', 'dc=io'] + admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' + admin_passwd = 'password' + user_rdn = [] + uid_attr = 'uid' + email_attr = 'mail' + secondary_user_rdns = ['ou=otheremployees'] - ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, - uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) + ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, + uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) - self.ldap = ldap + # Verify we can login. + (response, _) = ldap.verify_and_link_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') - # Verify we can login. - (response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') - self.assertEquals(response.username, 'someuser') - - # Verify we can confirm the user. - (response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') - self.assertEquals(response.username, 'someuser') + # Verify we can confirm the user. + (response, _) = ldap.confirm_existing_user('someuser', 'somepass') + self.assertEquals(response.username, 'someuser') def test_link_user(self): - # Link someuser. - user, error_message = self.ldap.link_user('someuser') - self.assertIsNone(error_message) - self.assertIsNotNone(user) - self.assertEquals('someuser', user.username) + with mock_ldap() as ldap: + # Link someuser. + user, error_message = ldap.link_user('someuser') + self.assertIsNone(error_message) + self.assertIsNotNone(user) + self.assertEquals('someuser', user.username) - # Link again. Should return the same user record. - user_again, _ = self.ldap.link_user('someuser') - self.assertEquals(user_again.id, user.id) + # Link again. Should return the same user record. + user_again, _ = ldap.link_user('someuser') + self.assertEquals(user_again.id, user.id) - # Confirm someuser. - result, _ = self.ldap.confirm_existing_user('someuser', 'somepass') - self.assertIsNotNone(result) - self.assertEquals('someuser', result.username) - self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) + # Confirm someuser. + result, _ = ldap.confirm_existing_user('someuser', 'somepass') + self.assertIsNotNone(result) + self.assertEquals('someuser', result.username) + self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) def test_query(self): - def initializer(uri, trace_level=0): - obj = self.mockldap[uri] - - # Seed to "support" wildcard queries, which MockLDAP does not support natively. - obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([ - ('uid=cool.user,ou=employees,dc=quay,dc=io', { - 'dc': ['quay', 'io'], - 'ou': 'employees', - 'uid': ['cool.user', 'referred'], - 'userPassword': ['somepass'], - 'mail': ['foo@bar.com'] - }) - ]) - - obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([]) - - obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([]) - obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, - '(|(uid=unknown*)(mail=unknown*))')([]) - return obj - - with patch('ldap.initialize', new=initializer): + with mock_ldap() as ldap: # Lookup cool. - (response, error_message) = self.ldap.query_users('cool') + (response, federated_id, error_message) = ldap.query_users('cool') self.assertIsNone(error_message) self.assertEquals(1, len(response)) + self.assertEquals('ldap', federated_id) user_info = response[0] self.assertEquals("cool.user", user_info.username) self.assertEquals("foo@bar.com", user_info.email) # Lookup unknown. - (response, error_message) = self.ldap.query_users('unknown') + (response, federated_id, error_message) = ldap.query_users('unknown') self.assertIsNone(error_message) self.assertEquals(0, len(response)) + self.assertEquals('ldap', federated_id) if __name__ == '__main__':