Fix external auth returns for query_user calls

Adds the missing field on the query_user calls, updates the external auth tests to ensure it is returned properly, and adds new end-to-end tests which call the external auth engines via the *API*, to ensure this doesn't break again
This commit is contained in:
Joseph Schorr 2016-12-05 17:19:38 -05:00
parent f0b19b26c9
commit 3203fd6de1
8 changed files with 834 additions and 651 deletions

View file

@ -187,12 +187,12 @@ class LDAPUsers(FederatedUsers):
def query_users(self, query, limit=20): def query_users(self, query, limit=20):
""" Queries LDAP for matching users. """ """ Queries LDAP for matching users. """
if not query: if not query:
return (None, 'Empty query') return (None, self.federated_service, 'Empty query')
logger.debug('Got query %s with limit %s', query, limit) logger.debug('Got query %s with limit %s', query, limit)
(results, err_msg) = self._ldap_user_search(query + '*', limit=limit) (results, err_msg) = self._ldap_user_search(query + '*', limit=limit)
if err_msg is not None: if err_msg is not None:
return (None, err_msg) return (None, self.federated_service, err_msg)
final_results = [] final_results = []
for result in results[0:limit]: for result in results[0:limit]:
@ -203,7 +203,7 @@ class LDAPUsers(FederatedUsers):
final_results.append(credentials) final_results.append(credentials)
logger.debug('For query %s found results %s', query, final_results) logger.debug('For query %s found results %s', query, final_results)
return (final_results, None) return (final_results, self.federated_service, None)
def verify_credentials(self, username_or_email, password): def verify_credentials(self, username_or_email, password):
""" Verify the credentials with LDAP. """ """ Verify the credentials with LDAP. """

View file

@ -69,7 +69,7 @@ class KeystoneV2Users(FederatedUsers):
return (UserInformation(username=username_or_email, email=email, id=user_id), None) return (UserInformation(username=username_or_email, email=email, id=user_id), None)
def query_users(self, query, limit=20): def query_users(self, query, limit=20):
return (None, 'Unsupported in Keystone V2') return (None, self.federated_service, 'Unsupported in Keystone V2')
def get_user(self, username_or_email): def get_user(self, username_or_email):
return (None, 'Unsupported in Keystone V2') return (None, 'Unsupported in Keystone V2')
@ -108,7 +108,7 @@ class KeystoneV3Users(FederatedUsers):
return (None, kut.message or 'Invalid username or password') return (None, kut.message or 'Invalid username or password')
def get_user(self, username_or_email): def get_user(self, username_or_email):
users_found, err_msg = self.query_users(username_or_email) users_found, _, err_msg = self.query_users(username_or_email)
if err_msg is not None: if err_msg is not None:
return (None, err_msg) return (None, err_msg)
@ -128,7 +128,7 @@ class KeystoneV3Users(FederatedUsers):
def query_users(self, query, limit=20): def query_users(self, query, limit=20):
if len(query) < 3: if len(query) < 3:
return ([], None) return ([], self.federated_service, None)
try: try:
keystone_client = kv3client.Client(username=self.admin_username, password=self.admin_password, keystone_client = kv3client.Client(username=self.admin_username, password=self.admin_password,
@ -137,13 +137,13 @@ class KeystoneV3Users(FederatedUsers):
found_users = list(_take(limit, keystone_client.users.list(name=query))) found_users = list(_take(limit, keystone_client.users.list(name=query)))
logger.debug('For Keystone query %s found users: %s', query, found_users) logger.debug('For Keystone query %s found users: %s', query, found_users)
if not found_users: if not found_users:
return ([], None) return ([], self.federated_service, None)
return ([self._user_info(user) for user in found_users], None) return ([self._user_info(user) for user in found_users], self.federated_service, None)
except KeystoneAuthorizationFailure as kaf: except KeystoneAuthorizationFailure as kaf:
logger.exception('Keystone auth failure for admin user for query %s', query) logger.exception('Keystone auth failure for admin user for query %s', query)
return (None, kaf.message or 'Invalid admin username or password') return (None, self.federated_service, kaf.message or 'Invalid admin username or password')
except KeystoneUnauthorized as kut: except KeystoneUnauthorized as kut:
logger.exception('Keystone unauthorized for admin user for query %s', query) logger.exception('Keystone unauthorized for admin user for query %s', query)
return (None, kut.message or 'Invalid admin username or password') return (None, self.federated_service, kut.message or 'Invalid admin username or password')

View file

@ -10,6 +10,7 @@ from auth.permissions import (OrganizationMemberPermission, ReadRepositoryPermis
from auth.auth_context import get_authenticated_user from auth.auth_context import get_authenticated_user
from auth import scopes from auth import scopes
from app import avatar, authentication from app import avatar, authentication
from flask import abort
from operator import itemgetter from operator import itemgetter
from stringscore import liquidmetal from stringscore import liquidmetal
from util.names import parse_robot_username from util.names import parse_robot_username
@ -17,13 +18,15 @@ from util.names import parse_robot_username
import anunidecode # Don't listen to pylint's lies. This import is required. import anunidecode # Don't listen to pylint's lies. This import is required.
import math import math
@show_if(authentication.federated_service) # Only enabled for non-DB auth.
@resource('/v1/entities/link/<username>') @resource('/v1/entities/link/<username>')
@internal_only @internal_only
class LinkExternalEntity(ApiResource): class LinkExternalEntity(ApiResource):
""" Resource for linking external entities to internal users. """ """ Resource for linking external entities to internal users. """
@nickname('linkExternalUser') @nickname('linkExternalUser')
def post(self, username): def post(self, username):
if not authentication.federated_service:
abort(404)
# Only allowed if there is a logged in user. # Only allowed if there is a logged in user.
if not get_authenticated_user(): if not get_authenticated_user():
raise Unauthorized() raise Unauthorized()

View file

@ -1,4 +1,9 @@
import multiprocessing
import time
import socket
from data.database import LogEntryKind, LogEntry from data.database import LogEntryKind, LogEntry
from contextlib import contextmanager
class assert_action_logged(object): class assert_action_logged(object):
""" Specialized assertion for ensuring that a log entry of a particular kind was added under the """ Specialized assertion for ensuring that a log entry of a particular kind was added under the
@ -20,3 +25,59 @@ class assert_action_logged(object):
updated_count = self._get_log_count() updated_count = self._get_log_count()
error_msg = 'Missing new log entry of kind %s' % self.log_kind error_msg = 'Missing new log entry of kind %s' % self.log_kind
assert self.existing_count == (updated_count - 1), error_msg assert self.existing_count == (updated_count - 1), error_msg
_LIVESERVER_TIMEOUT = 5
@contextmanager
def liveserver_app(flask_app, port):
"""
Based on https://github.com/jarus/flask-testing/blob/master/flask_testing/utils.py
Runs the given Flask app as a live web server locally, on the given port, starting it
when called and terminating after the yield.
Usage:
with liveserver_app(flask_app, port):
# Code that makes use of the app.
"""
shared = {}
def _can_ping_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('localhost', port))
except socket.error:
success = False
else:
success = True
finally:
sock.close()
return success
def _spawn_live_server():
worker = lambda app, port: app.run(port=port, use_reloader=False)
shared['process'] = multiprocessing.Process(target=worker, args=(flask_app, port))
shared['process'].start()
start_time = time.time()
while True:
elapsed_time = (time.time() - start_time)
if elapsed_time > _LIVESERVER_TIMEOUT:
_terminate_live_server()
raise RuntimeError("Failed to start the server after %d seconds. " % _LIVESERVER_TIMEOUT)
if _can_ping_server():
break
def _terminate_live_server():
if shared.get('process'):
shared.get('process').terminate()
shared.pop('process')
try:
_spawn_live_server()
yield
finally:
_terminate_live_server()

View file

@ -0,0 +1,57 @@
import unittest
from mock import patch
from endpoints.api.search import EntitySearch, LinkExternalEntity
from test.test_api_usage import ApiTestCase, ADMIN_ACCESS_USER
from test.test_ldap import mock_ldap
from test.test_external_jwt_authn import fake_jwt
from test.test_keystone_auth import fake_keystone
class EndToEndAuthMixin:
def test_entity_search(self):
with self.get_authentication() as auth:
with patch('endpoints.api.search.authentication', auth):
# Try an unknown prefix.
json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='unknown'))
results = json_data['results']
self.assertEquals(0, len(results))
# Try a known prefix.
json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='cool'))
results = json_data['results']
self.assertEquals(1, len(results))
self.assertEquals('external', results[0]['kind'])
self.assertEquals('cool.user', results[0]['name'])
def test_link_external_entity(self):
with self.get_authentication() as auth:
with patch('endpoints.api.search.authentication', auth):
self.login(ADMIN_ACCESS_USER)
# Try an unknown user.
self.postResponse(LinkExternalEntity, params=dict(username='unknownuser'),
expected_code=400)
# Try a known user.
json_data = self.postJsonResponse(LinkExternalEntity, params=dict(username='cool.user'))
entity = json_data['entity']
self.assertEquals('cool_user', entity['name'])
self.assertEquals('user', entity['kind'])
class TestLDAPEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return mock_ldap()
class TestJWTEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return fake_jwt()
class TestKeystoneEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return fake_keystone(3)
if __name__ == '__main__':
unittest.main()

View file

@ -3,148 +3,172 @@ import unittest
from datetime import datetime, timedelta from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from contextlib import contextmanager
import jwt import jwt
import requests import requests
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from flask import Flask, jsonify, request, make_response from flask import Flask, jsonify, request, make_response
from flask_testing import LiveServerTestCase
from app import app from app import app
from data.users import ExternalJWTAuthN from data.users import ExternalJWTAuthN
from initdb import setup_database_for_testing, finished_database_for_testing from initdb import setup_database_for_testing, finished_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001 _PORT_NUMBER = 5001
class JWTAuthTestMixin(object): @contextmanager
def fake_jwt(requires_email=True):
""" Context manager which instantiates and runs a webserver with a fake JWT implementation,
until the result is yielded.
Usage:
with fake_jwt() as jwt_auth:
# Make jwt_auth requests.
"""
jwt_app, port, public_key = _create_app(requires_email)
server_url = 'http://' + jwt_app.config['SERVER_HOSTNAME']
verify_url = server_url + '/user/verify'
query_url = server_url + '/user/query'
getuser_url = server_url + '/user/get'
jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, public_key.name,
requires_email=requires_email)
with liveserver_app(jwt_app, port):
yield jwt_auth
def _generate_certs():
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
return (public_key, private_key_data)
def _create_app(emails=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
public_key, private_key_data = _generate_certs()
users = [
{'name': 'cool.user', 'email': 'user@domain.com', 'password': 'password'},
{'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
jwt_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
result = {
'username': user['name'],
}
if emails:
result['email'] = user['email']
results.append(result)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app, _PORT_NUMBER, public_key
class JWTAuthTestMixin:
""" Mixin defining all the JWT auth tests. """
maxDiff = None maxDiff = None
@property @property
def emails(self): def emails(self):
raise NotImplementedError raise NotImplementedError
@classmethod
def setUpClass(cls):
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
JWTAuthTestCase.public_key = public_key
JWTAuthTestCase.private_key_data = private_key_data
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
users = [
{'name': 'cooluser', 'email': 'user@domain.com', 'password': 'password'},
{'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
private_key = JWTAuthTestCase.private_key_data
jwt_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
result = {
'username': user['name'],
}
if self.emails:
result['email'] = user['email']
results.append(result)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app
def setUp(self): def setUp(self):
setup_database_for_testing(self) setup_database_for_testing(self)
self.app = app.test_client() self.app = app.test_client()
@ -153,139 +177,140 @@ class JWTAuthTestMixin(object):
self.session = requests.Session() self.session = requests.Session()
verify_url = self.get_server_url() + '/user/verify'
query_url = self.get_server_url() + '/user/query'
getuser_url = self.get_server_url() + '/user/get'
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name,
requires_email=self.emails)
def tearDown(self): def tearDown(self):
finished_database_for_testing(self) finished_database_for_testing(self)
self.ctx.__exit__(True, None, None) self.ctx.__exit__(True, None, None)
def test_verify_and_link_user(self): def test_verify_and_link_user(self):
result, error_message = self.jwt_auth.verify_and_link_user('invaliduser', 'foobar') with fake_jwt(self.emails) as jwt_auth:
self.assertEquals('Invalid username or password', error_message) result, error_message = jwt_auth.verify_and_link_user('invaliduser', 'foobar')
self.assertIsNone(result) self.assertEquals('Invalid username or password', error_message)
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'invalidpassword') result, _ = jwt_auth.verify_and_link_user('cool.user', 'invalidpassword')
self.assertIsNone(result) self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password') result, _ = jwt_auth.verify_and_link_user('cool.user', 'password')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username) self.assertEquals('cool_user', result.username)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar') result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username) self.assertEquals('some_neat_user', result.username)
def test_confirm_existing_user(self): def test_confirm_existing_user(self):
# Create the users in the DB. with fake_jwt(self.emails) as jwt_auth:
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password') # Create the users in the DB.
self.assertIsNotNone(result) result, _ = jwt_auth.verify_and_link_user('cool.user', 'password')
self.assertIsNotNone(result)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar') result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result) self.assertIsNotNone(result)
# Confirm a user with the same internal and external username. # Confirm a user with the same internal and external username.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword') result, _ = jwt_auth.confirm_existing_user('cool_user', 'invalidpassword')
self.assertIsNone(result) self.assertIsNone(result)
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password') result, _ = jwt_auth.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username) self.assertEquals('cool_user', result.username)
# Fail to confirm the *external* username, which should return nothing. # Fail to confirm the *external* username, which should return nothing.
result, _ = self.jwt_auth.confirm_existing_user('some.neat.user', 'password') result, _ = jwt_auth.confirm_existing_user('some.neat.user', 'password')
self.assertIsNone(result) self.assertIsNone(result)
# Now confirm the internal username. # Now confirm the internal username.
result, _ = self.jwt_auth.confirm_existing_user('some_neat_user', 'foobar') result, _ = jwt_auth.confirm_existing_user('some_neat_user', 'foobar')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username) self.assertEquals('some_neat_user', result.username)
def test_disabled_user_custom_error(self): def test_disabled_user_custom_error(self):
result, error_message = self.jwt_auth.verify_and_link_user('disabled', 'password') with fake_jwt(self.emails) as jwt_auth:
self.assertIsNone(result) result, error_message = jwt_auth.verify_and_link_user('disabled', 'password')
self.assertEquals('User is currently disabled', error_message) self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
def test_query(self): def test_query(self):
# Lookup `cool`. with fake_jwt(self.emails) as jwt_auth:
results, identifier, error_message = self.jwt_auth.query_users('cool') # Lookup `cool`.
self.assertIsNone(error_message) results, identifier, error_message = jwt_auth.query_users('cool')
self.assertEquals('jwtauthn', identifier) self.assertIsNone(error_message)
self.assertEquals(1, len(results)) self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('cooluser', results[0].username) self.assertEquals('cool.user', results[0].username)
self.assertEquals('user@domain.com' if self.emails else None, results[0].email) self.assertEquals('user@domain.com' if self.emails else None, results[0].email)
# Lookup `some`. # Lookup `some`.
results, identifier, error_message = self.jwt_auth.query_users('some') results, identifier, error_message = jwt_auth.query_users('some')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier) self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results)) self.assertEquals(1, len(results))
self.assertEquals('some.neat.user', results[0].username) self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com' if self.emails else None, results[0].email) self.assertEquals('neat@domain.com' if self.emails else None, results[0].email)
# Lookup `unknown`. # Lookup `unknown`.
results, identifier, error_message = self.jwt_auth.query_users('unknown') results, identifier, error_message = jwt_auth.query_users('unknown')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier) self.assertEquals('jwtauthn', identifier)
self.assertEquals(0, len(results)) self.assertEquals(0, len(results))
def test_get_user(self): def test_get_user(self):
# Lookup cooluser. with fake_jwt(self.emails) as jwt_auth:
result, error_message = self.jwt_auth.get_user('cooluser') # Lookup cool.user.
self.assertIsNone(error_message) result, error_message = jwt_auth.get_user('cool.user')
self.assertIsNotNone(result) self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username) self.assertEquals('cool.user', result.username)
self.assertEquals('user@domain.com', result.email) self.assertEquals('user@domain.com', result.email)
# Lookup some.neat.user. # Lookup some.neat.user.
result, error_message = self.jwt_auth.get_user('some.neat.user') result, error_message = jwt_auth.get_user('some.neat.user')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('some.neat.user', result.username) self.assertEquals('some.neat.user', result.username)
self.assertEquals('neat@domain.com', result.email) self.assertEquals('neat@domain.com', result.email)
# Lookup unknown user. # Lookup unknown user.
result, error_message = self.jwt_auth.get_user('unknownuser') result, error_message = jwt_auth.get_user('unknownuser')
self.assertIsNone(result) self.assertIsNone(result)
def test_link_user(self): def test_link_user(self):
# Link cooluser. with fake_jwt(self.emails) as jwt_auth:
user, error_message = self.jwt_auth.link_user('cooluser') # Link cool.user.
self.assertIsNone(error_message) user, error_message = jwt_auth.link_user('cool.user')
self.assertIsNotNone(user) self.assertIsNone(error_message)
self.assertEquals('cooluser', user.username) self.assertIsNotNone(user)
self.assertEquals('cool_user', user.username)
# Link again. Should return the same user record. # Link again. Should return the same user record.
user_again, _ = self.jwt_auth.link_user('cooluser') user_again, _ = jwt_auth.link_user('cool.user')
self.assertEquals(user_again.id, user.id) self.assertEquals(user_again.id, user.id)
# Confirm cooluser. # Confirm cool.user.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password') result, _ = jwt_auth.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username) self.assertEquals('cool_user', result.username)
def test_link_invalid_user(self): def test_link_invalid_user(self):
user, error_message = self.jwt_auth.link_user('invaliduser') with fake_jwt(self.emails) as jwt_auth:
self.assertIsNotNone(error_message) user, error_message = jwt_auth.link_user('invaliduser')
self.assertIsNone(user) self.assertIsNotNone(error_message)
self.assertIsNone(user)
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, LiveServerTestCase): class JWTAuthNoEmailTestCase(JWTAuthTestMixin, unittest.TestCase):
""" Test cases for JWT auth, with emails disabled. """
@property @property
def emails(self): def emails(self):
return False return False
class JWTAuthTestCase(JWTAuthTestMixin, LiveServerTestCase): class JWTAuthTestCase(JWTAuthTestMixin, unittest.TestCase):
""" Test cases for JWT auth, with emails enabled. """
@property @property
def emails(self): def emails(self):
return True return True

View file

@ -5,197 +5,225 @@ import unittest
import requests import requests
from flask import Flask, request, abort, make_response from flask import Flask, request, abort, make_response
from flask_testing import LiveServerTestCase from contextlib import contextmanager
from helpers import liveserver_app
from data.users.keystone import get_keystone_users from data.users.keystone import get_keystone_users
from initdb import setup_database_for_testing, finished_database_for_testing from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001 _PORT_NUMBER = 5001
class KeystoneAuthTestsMixin(): @contextmanager
def fake_keystone(version, requires_email=True):
""" Context manager which instantiates and runs a webserver with a fake Keystone implementation,
until the result is yielded.
Usage:
with fake_keystone(version) as keystone_auth:
# Make keystone_auth requests.
"""
keystone_app, port = _create_app(requires_email)
server_url = 'http://' + keystone_app.config['SERVER_HOSTNAME']
endpoint_url = server_url + '/v3'
if version == 2:
endpoint_url = server_url + '/v2.0/auth'
keystone_auth = get_keystone_users(version, endpoint_url,
'adminuser', 'adminpass', 'admintenant',
requires_email=requires_email)
with liveserver_app(keystone_app, port):
yield keystone_auth
def _create_app(requires_email=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
server_url = 'http://localhost:%s' % (_PORT_NUMBER)
users = [
{'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'},
{'username': 'cool.user', 'name': 'Cool User', 'password': 'password'},
{'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'},
]
ks_app = Flask('testks')
ks_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
ks_app.config['DEBUG'] = True
@ks_app.route('/v2.0/admin/users/<userid>', methods=['GET'])
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if requires_email:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if requires_email:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": server_url + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
creds = request.json['auth'][u'passwordCredentials']
for user in users:
if creds['username'] == user['username'] and creds['password'] == user['password']:
return json.dumps({
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds['username'],
"tenant": {"id": "sometenant"},
},
"serviceCatalog":[
{
"endpoints": [
{
"adminURL": server_url + '/v2.0/admin',
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds['username'],
"roles_links": [],
"id": creds['username'],
"roles": [],
"name": user['name'],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
})
abort(403)
return ks_app, _PORT_NUMBER
class KeystoneAuthTestsMixin:
maxDiff = None maxDiff = None
@property @property
def emails(self): def emails(self):
raise NotImplementedError raise NotImplementedError
def create_app(self): def fake_keystone(self):
global _PORT_NUMBER raise NotImplementedError
_PORT_NUMBER = _PORT_NUMBER + 1
users = [
{'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'},
{'username': 'cooluser', 'name': 'Cool User', 'password': 'password'},
{'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'},
]
ks_app = Flask('testks')
ks_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
ks_app.config['DEBUG'] = True
@ks_app.route('/v2.0/admin/users/<userid>', methods=['GET'])
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if self.emails:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if self.emails:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": self.get_server_url() + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
creds = request.json['auth'][u'passwordCredentials']
for user in users:
if creds['username'] == user['username'] and creds['password'] == user['password']:
return json.dumps({
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds['username'],
"tenant": {"id": "sometenant"},
},
"serviceCatalog":[
{
"endpoints": [
{
"adminURL": self.get_server_url() + '/v2.0/admin',
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds['username'],
"roles_links": [],
"id": creds['username'],
"roles": [],
"name": user['name'],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
})
abort(403)
return ks_app
def setUp(self): def setUp(self):
setup_database_for_testing(self) setup_database_for_testing(self)
@ -204,100 +232,95 @@ class KeystoneAuthTestsMixin():
def tearDown(self): def tearDown(self):
finished_database_for_testing(self) finished_database_for_testing(self)
@property
def keystone(self):
raise NotImplementedError
def test_invalid_user(self): def test_invalid_user(self):
(user, _) = self.keystone.verify_credentials('unknownuser', 'password') with self.fake_keystone() as keystone:
self.assertIsNone(user) (user, _) = keystone.verify_credentials('unknownuser', 'password')
self.assertIsNone(user)
def test_invalid_password(self): def test_invalid_password(self):
(user, _) = self.keystone.verify_credentials('cooluser', 'notpassword') with self.fake_keystone() as keystone:
self.assertIsNone(user) (user, _) = keystone.verify_credentials('cool.user', 'notpassword')
self.assertIsNone(user)
def test_cooluser(self): def test_cooluser(self):
(user, _) = self.keystone.verify_credentials('cooluser', 'password') with self.fake_keystone() as keystone:
self.assertEquals(user.username, 'cooluser') (user, _) = keystone.verify_credentials('cool.user', 'password')
self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None) self.assertEquals(user.username, 'cool.user')
self.assertEquals(user.email, 'cool.user@example.com' if self.emails else None)
def test_neatuser(self): def test_neatuser(self):
(user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar') with self.fake_keystone() as keystone:
self.assertEquals(user.username, 'some.neat.user') (user, _) = keystone.verify_credentials('some.neat.user', 'foobar')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None) self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def keystone(self): def fake_keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth', return fake_keystone(2, requires_email=False)
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
@property @property
def emails(self): def emails(self):
return False return False
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase): class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
@property def fake_keystone(self):
def keystone(self): return fake_keystone(3, requires_email=False)
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
@property @property
def emails(self): def emails(self):
return False return False
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): class KeystoneV2AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
@property def fake_keystone(self):
def keystone(self): return fake_keystone(2, requires_email=True)
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
@property @property
def emails(self): def emails(self):
return True return True
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
@property def fake_keystone(self):
def keystone(self): return fake_keystone(3, requires_email=True)
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
def emails(self): def emails(self):
return True return True
def test_query(self): def test_query(self):
# Lookup cool. with self.fake_keystone() as keystone:
(response, error_message) = self.keystone.query_users('cool') # Lookup cool.
self.assertIsNone(error_message) (response, federated_id, error_message) = keystone.query_users('cool')
self.assertEquals(1, len(response)) self.assertIsNone(error_message)
self.assertEquals(1, len(response))
self.assertEquals('keystone', federated_id)
user_info = response[0] user_info = response[0]
self.assertEquals("cooluser", user_info.username) self.assertEquals("cool.user", user_info.username)
# Lookup unknown. # Lookup unknown.
(response, error_message) = self.keystone.query_users('unknown') (response, federated_id, error_message) = keystone.query_users('unknown')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertEquals(0, len(response)) self.assertEquals(0, len(response))
self.assertEquals('keystone', federated_id)
def test_link_user(self): def test_link_user(self):
# Link someuser. with self.fake_keystone() as keystone:
user, error_message = self.keystone.link_user('cooluser') # Link someuser.
self.assertIsNone(error_message) user, error_message = keystone.link_user('cool.user')
self.assertIsNotNone(user) self.assertIsNone(error_message)
self.assertEquals('cooluser', user.username) self.assertIsNotNone(user)
self.assertEquals('cooluser@example.com', user.email) self.assertEquals('cool_user', user.username)
self.assertEquals('cool.user@example.com', user.email)
# Link again. Should return the same user record. # Link again. Should return the same user record.
user_again, _ = self.keystone.link_user('cooluser') user_again, _ = keystone.link_user('cool.user')
self.assertEquals(user_again.id, user.id) self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = keystone.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
# Confirm someuser.
result, _ = self.keystone.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -6,6 +6,112 @@ from data.users import LDAPUsers
from data import model from data import model
from mockldap import MockLdap from mockldap import MockLdap
from mock import patch from mock import patch
from contextlib import contextmanager
def _create_ldap(requires_email=True):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
requires_email=requires_email)
return ldap
@contextmanager
def mock_ldap(requires_email=True):
mockldap = MockLdap({
'dc=quay,dc=io': {'dc': ['quay', 'io']},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees'
},
'uid=testy,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': 'testy',
'userPassword': ['password']
},
'uid=someuser,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['someuser'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=nomail,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['nomail'],
'userPassword': ['somepass']
},
'uid=cool.user,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=referred,ou=employees,dc=quay,dc=io': {
'uid': ['referred'],
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
},
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
'uid': ['invalidreferred'],
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
},
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'mail': ['foo@bar.com'],
'userPassword': ['somepass'],
},
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'another': ['key']
},
'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees',
'uid': ['secondaryuser'],
'userPassword': ['somepass'],
'mail': ['foosecondary@bar.com']
},
})
def initializer(uri, trace_level=0):
obj = mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
mockldap.start()
with patch('ldap.initialize', new=initializer):
yield _create_ldap(requires_email=requires_email)
mockldap.stop()
class TestLDAP(unittest.TestCase): class TestLDAP(unittest.TestCase):
def setUp(self): def setUp(self):
@ -14,90 +120,10 @@ class TestLDAP(unittest.TestCase):
self.ctx = app.test_request_context() self.ctx = app.test_request_context()
self.ctx.__enter__() self.ctx.__enter__()
self.mockldap = MockLdap({
'dc=quay,dc=io': {'dc': ['quay', 'io']},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees'
},
'uid=testy,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': 'testy',
'userPassword': ['password']
},
'uid=someuser,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['someuser'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=nomail,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['nomail'],
'userPassword': ['somepass']
},
'uid=cool.user,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=referred,ou=employees,dc=quay,dc=io': {
'uid': ['referred'],
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
},
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
'uid': ['invalidreferred'],
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
},
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'mail': ['foo@bar.com'],
'userPassword': ['somepass'],
},
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'another': ['key']
},
'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees',
'uid': ['secondaryuser'],
'userPassword': ['somepass'],
'mail': ['foosecondary@bar.com']
},
})
self.mockldap.start()
self.ldap = self._create_ldap(requires_email=True)
def tearDown(self): def tearDown(self):
self.mockldap.stop()
finished_database_for_testing(self) finished_database_for_testing(self)
self.ctx.__exit__(True, None, None) self.ctx.__exit__(True, None, None)
def _create_ldap(self, requires_email=True):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
requires_email=requires_email)
return ldap
def test_invalid_admin_password(self): def test_invalid_admin_password(self):
base_dn = ['dc=quay', 'dc=io'] base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
@ -106,160 +132,148 @@ class TestLDAP(unittest.TestCase):
uid_attr = 'uid' uid_attr = 'uid'
email_attr = 'mail' email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, with mock_ldap():
uid_attr, email_attr) ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
self.ldap = ldap # Try to login.
(response, err_msg) = ldap.verify_and_link_user('someuser', 'somepass')
# Try to login. self.assertIsNone(response)
(response, err_msg) = self.ldap.verify_and_link_user('someuser', 'somepass') self.assertEquals('LDAP Admin dn or password is invalid', err_msg)
self.assertIsNone(response)
self.assertEquals('LDAP Admin dn or password is invalid', err_msg)
def test_login(self): def test_login(self):
# Verify we can login. with mock_ldap() as ldap:
(response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') # Verify we can login.
self.assertEquals(response.username, 'someuser') (response, _) = ldap.verify_and_link_user('someuser', 'somepass')
self.assertTrue(model.user.has_user_prompt(response, 'confirm_username')) self.assertEquals(response.username, 'someuser')
self.assertTrue(model.user.has_user_prompt(response, 'confirm_username'))
# Verify we can confirm the user. # Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass') (response, _) = ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser') self.assertEquals(response.username, 'someuser')
def test_login_secondary(self): def test_login_secondary(self):
# Verify we can login. with mock_ldap() as ldap:
(response, _) = self.ldap.verify_and_link_user('secondaryuser', 'somepass') # Verify we can login.
self.assertEquals(response.username, 'secondaryuser') (response, _) = ldap.verify_and_link_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser')
# Verify we can confirm the user. # Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('secondaryuser', 'somepass') (response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser') self.assertEquals(response.username, 'secondaryuser')
def test_invalid_password(self): def test_invalid_password(self):
# Verify we cannot login with an invalid password. with mock_ldap() as ldap:
(response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass') # Verify we cannot login with an invalid password.
self.assertIsNone(response) (response, err_msg) = ldap.verify_and_link_user('someuser', 'invalidpass')
self.assertEquals(err_msg, 'Invalid password') self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid password')
# Verify we cannot confirm the user. # Verify we cannot confirm the user.
(response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass') (response, err_msg) = ldap.confirm_existing_user('someuser', 'invalidpass')
self.assertIsNone(response) self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid user') self.assertEquals(err_msg, 'Invalid user')
def test_missing_mail(self): def test_missing_mail(self):
(response, err_msg) = self.ldap.get_user('nomail') with mock_ldap() as ldap:
self.assertIsNone(response) (response, err_msg) = ldap.get_user('nomail')
self.assertEquals('Missing mail field "mail" in user record', err_msg) self.assertIsNone(response)
self.assertEquals('Missing mail field "mail" in user record', err_msg)
def test_missing_mail_allowed(self): def test_missing_mail_allowed(self):
ldap = self._create_ldap(requires_email=False) with mock_ldap(requires_email=False) as ldap:
(response, _) = ldap.get_user('nomail') (response, _) = ldap.get_user('nomail')
self.assertEquals(response.username, 'nomail') self.assertEquals(response.username, 'nomail')
def test_confirm_different_username(self): def test_confirm_different_username(self):
# Verify that the user is logged in and their username was adjusted. with mock_ldap() as ldap:
(response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass') # Verify that the user is logged in and their username was adjusted.
self.assertEquals(response.username, 'cool_user') (response, _) = ldap.verify_and_link_user('cool.user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username. # Verify we can confirm the user's quay username.
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') (response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user') self.assertEquals(response.username, 'cool_user')
# Verify that we *cannot* confirm the LDAP username. # Verify that we *cannot* confirm the LDAP username.
(response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass') (response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
self.assertIsNone(response) self.assertIsNone(response)
def test_referral(self): def test_referral(self):
(response, _) = self.ldap.verify_and_link_user('referred', 'somepass') with mock_ldap() as ldap:
self.assertEquals(response.username, 'cool_user') (response, _) = ldap.verify_and_link_user('referred', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username. # Verify we can confirm the user's quay username.
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass') (response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user') self.assertEquals(response.username, 'cool_user')
def test_invalid_referral(self): def test_invalid_referral(self):
(response, _) = self.ldap.verify_and_link_user('invalidreferred', 'somepass') with mock_ldap() as ldap:
self.assertIsNone(response) (response, _) = ldap.verify_and_link_user('invalidreferred', 'somepass')
self.assertIsNone(response)
def test_multientry(self): def test_multientry(self):
(response, _) = self.ldap.verify_and_link_user('multientry', 'somepass') with mock_ldap() as ldap:
self.assertEquals(response.username, 'multientry') (response, _) = ldap.verify_and_link_user('multientry', 'somepass')
self.assertEquals(response.username, 'multientry')
def test_login_empty_userdn(self): def test_login_empty_userdn(self):
base_dn = ['ou=employees', 'dc=quay', 'dc=io'] with mock_ldap():
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io' base_dn = ['ou=employees', 'dc=quay', 'dc=io']
admin_passwd = 'password' admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
user_rdn = [] admin_passwd = 'password'
uid_attr = 'uid' user_rdn = []
email_attr = 'mail' uid_attr = 'uid'
secondary_user_rdns = ['ou=otheremployees'] email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn, ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns) uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)
self.ldap = ldap # Verify we can login.
(response, _) = ldap.verify_and_link_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can login. # Verify we can confirm the user.
(response, _) = self.ldap.verify_and_link_user('someuser', 'somepass') (response, _) = ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser') self.assertEquals(response.username, 'someuser')
# Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
def test_link_user(self): def test_link_user(self):
# Link someuser. with mock_ldap() as ldap:
user, error_message = self.ldap.link_user('someuser') # Link someuser.
self.assertIsNone(error_message) user, error_message = ldap.link_user('someuser')
self.assertIsNotNone(user) self.assertIsNone(error_message)
self.assertEquals('someuser', user.username) self.assertIsNotNone(user)
self.assertEquals('someuser', user.username)
# Link again. Should return the same user record. # Link again. Should return the same user record.
user_again, _ = self.ldap.link_user('someuser') user_again, _ = ldap.link_user('someuser')
self.assertEquals(user_again.id, user.id) self.assertEquals(user_again.id, user.id)
# Confirm someuser. # Confirm someuser.
result, _ = self.ldap.confirm_existing_user('someuser', 'somepass') result, _ = ldap.confirm_existing_user('someuser', 'somepass')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEquals('someuser', result.username) self.assertEquals('someuser', result.username)
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
def test_query(self): def test_query(self):
def initializer(uri, trace_level=0): with mock_ldap() as ldap:
obj = self.mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
with patch('ldap.initialize', new=initializer):
# Lookup cool. # Lookup cool.
(response, error_message) = self.ldap.query_users('cool') (response, federated_id, error_message) = ldap.query_users('cool')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertEquals(1, len(response)) self.assertEquals(1, len(response))
self.assertEquals('ldap', federated_id)
user_info = response[0] user_info = response[0]
self.assertEquals("cool.user", user_info.username) self.assertEquals("cool.user", user_info.username)
self.assertEquals("foo@bar.com", user_info.email) self.assertEquals("foo@bar.com", user_info.email)
# Lookup unknown. # Lookup unknown.
(response, error_message) = self.ldap.query_users('unknown') (response, federated_id, error_message) = ldap.query_users('unknown')
self.assertIsNone(error_message) self.assertIsNone(error_message)
self.assertEquals(0, len(response)) self.assertEquals(0, len(response))
self.assertEquals('ldap', federated_id)
if __name__ == '__main__': if __name__ == '__main__':