Merge pull request #2206 from coreos-inc/ldap-user-search-fix

Fix external auth returns for query_user calls
This commit is contained in:
josephschorr 2016-12-07 17:53:04 -05:00 committed by GitHub
commit 111b7b0788
8 changed files with 834 additions and 651 deletions

View file

@ -187,12 +187,12 @@ class LDAPUsers(FederatedUsers):
def query_users(self, query, limit=20):
""" Queries LDAP for matching users. """
if not query:
return (None, 'Empty query')
return (None, self.federated_service, 'Empty query')
logger.debug('Got query %s with limit %s', query, limit)
(results, err_msg) = self._ldap_user_search(query + '*', limit=limit)
if err_msg is not None:
return (None, err_msg)
return (None, self.federated_service, err_msg)
final_results = []
for result in results[0:limit]:
@ -203,7 +203,7 @@ class LDAPUsers(FederatedUsers):
final_results.append(credentials)
logger.debug('For query %s found results %s', query, final_results)
return (final_results, None)
return (final_results, self.federated_service, None)
def verify_credentials(self, username_or_email, password):
""" Verify the credentials with LDAP. """

View file

@ -69,7 +69,7 @@ class KeystoneV2Users(FederatedUsers):
return (UserInformation(username=username_or_email, email=email, id=user_id), None)
def query_users(self, query, limit=20):
return (None, 'Unsupported in Keystone V2')
return (None, self.federated_service, 'Unsupported in Keystone V2')
def get_user(self, username_or_email):
return (None, 'Unsupported in Keystone V2')
@ -108,7 +108,7 @@ class KeystoneV3Users(FederatedUsers):
return (None, kut.message or 'Invalid username or password')
def get_user(self, username_or_email):
users_found, err_msg = self.query_users(username_or_email)
users_found, _, err_msg = self.query_users(username_or_email)
if err_msg is not None:
return (None, err_msg)
@ -128,7 +128,7 @@ class KeystoneV3Users(FederatedUsers):
def query_users(self, query, limit=20):
if len(query) < 3:
return ([], None)
return ([], self.federated_service, None)
try:
keystone_client = kv3client.Client(username=self.admin_username, password=self.admin_password,
@ -137,13 +137,13 @@ class KeystoneV3Users(FederatedUsers):
found_users = list(_take(limit, keystone_client.users.list(name=query)))
logger.debug('For Keystone query %s found users: %s', query, found_users)
if not found_users:
return ([], None)
return ([], self.federated_service, None)
return ([self._user_info(user) for user in found_users], None)
return ([self._user_info(user) for user in found_users], self.federated_service, None)
except KeystoneAuthorizationFailure as kaf:
logger.exception('Keystone auth failure for admin user for query %s', query)
return (None, kaf.message or 'Invalid admin username or password')
return (None, self.federated_service, kaf.message or 'Invalid admin username or password')
except KeystoneUnauthorized as kut:
logger.exception('Keystone unauthorized for admin user for query %s', query)
return (None, kut.message or 'Invalid admin username or password')
return (None, self.federated_service, kut.message or 'Invalid admin username or password')

View file

@ -10,6 +10,7 @@ from auth.permissions import (OrganizationMemberPermission, ReadRepositoryPermis
from auth.auth_context import get_authenticated_user
from auth import scopes
from app import avatar, authentication
from flask import abort
from operator import itemgetter
from stringscore import liquidmetal
from util.names import parse_robot_username
@ -17,13 +18,15 @@ from util.names import parse_robot_username
import anunidecode # Don't listen to pylint's lies. This import is required.
import math
@show_if(authentication.federated_service) # Only enabled for non-DB auth.
@resource('/v1/entities/link/<username>')
@internal_only
class LinkExternalEntity(ApiResource):
""" Resource for linking external entities to internal users. """
@nickname('linkExternalUser')
def post(self, username):
if not authentication.federated_service:
abort(404)
# Only allowed if there is a logged in user.
if not get_authenticated_user():
raise Unauthorized()

View file

@ -1,4 +1,9 @@
import multiprocessing
import time
import socket
from data.database import LogEntryKind, LogEntry
from contextlib import contextmanager
class assert_action_logged(object):
""" Specialized assertion for ensuring that a log entry of a particular kind was added under the
@ -20,3 +25,59 @@ class assert_action_logged(object):
updated_count = self._get_log_count()
error_msg = 'Missing new log entry of kind %s' % self.log_kind
assert self.existing_count == (updated_count - 1), error_msg
_LIVESERVER_TIMEOUT = 5
@contextmanager
def liveserver_app(flask_app, port):
"""
Based on https://github.com/jarus/flask-testing/blob/master/flask_testing/utils.py
Runs the given Flask app as a live web server locally, on the given port, starting it
when called and terminating after the yield.
Usage:
with liveserver_app(flask_app, port):
# Code that makes use of the app.
"""
shared = {}
def _can_ping_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('localhost', port))
except socket.error:
success = False
else:
success = True
finally:
sock.close()
return success
def _spawn_live_server():
worker = lambda app, port: app.run(port=port, use_reloader=False)
shared['process'] = multiprocessing.Process(target=worker, args=(flask_app, port))
shared['process'].start()
start_time = time.time()
while True:
elapsed_time = (time.time() - start_time)
if elapsed_time > _LIVESERVER_TIMEOUT:
_terminate_live_server()
raise RuntimeError("Failed to start the server after %d seconds. " % _LIVESERVER_TIMEOUT)
if _can_ping_server():
break
def _terminate_live_server():
if shared.get('process'):
shared.get('process').terminate()
shared.pop('process')
try:
_spawn_live_server()
yield
finally:
_terminate_live_server()

View file

@ -0,0 +1,57 @@
import unittest
from mock import patch
from endpoints.api.search import EntitySearch, LinkExternalEntity
from test.test_api_usage import ApiTestCase, ADMIN_ACCESS_USER
from test.test_ldap import mock_ldap
from test.test_external_jwt_authn import fake_jwt
from test.test_keystone_auth import fake_keystone
class EndToEndAuthMixin:
def test_entity_search(self):
with self.get_authentication() as auth:
with patch('endpoints.api.search.authentication', auth):
# Try an unknown prefix.
json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='unknown'))
results = json_data['results']
self.assertEquals(0, len(results))
# Try a known prefix.
json_data = self.getJsonResponse(EntitySearch, params=dict(prefix='cool'))
results = json_data['results']
self.assertEquals(1, len(results))
self.assertEquals('external', results[0]['kind'])
self.assertEquals('cool.user', results[0]['name'])
def test_link_external_entity(self):
with self.get_authentication() as auth:
with patch('endpoints.api.search.authentication', auth):
self.login(ADMIN_ACCESS_USER)
# Try an unknown user.
self.postResponse(LinkExternalEntity, params=dict(username='unknownuser'),
expected_code=400)
# Try a known user.
json_data = self.postJsonResponse(LinkExternalEntity, params=dict(username='cool.user'))
entity = json_data['entity']
self.assertEquals('cool_user', entity['name'])
self.assertEquals('user', entity['kind'])
class TestLDAPEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return mock_ldap()
class TestJWTEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return fake_jwt()
class TestKeystoneEndToEnd(ApiTestCase, EndToEndAuthMixin):
def get_authentication(self):
return fake_keystone(3)
if __name__ == '__main__':
unittest.main()

View file

@ -3,148 +3,172 @@ import unittest
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from contextlib import contextmanager
import jwt
import requests
from Crypto.PublicKey import RSA
from flask import Flask, jsonify, request, make_response
from flask_testing import LiveServerTestCase
from app import app
from data.users import ExternalJWTAuthN
from initdb import setup_database_for_testing, finished_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001
class JWTAuthTestMixin(object):
@contextmanager
def fake_jwt(requires_email=True):
""" Context manager which instantiates and runs a webserver with a fake JWT implementation,
until the result is yielded.
Usage:
with fake_jwt() as jwt_auth:
# Make jwt_auth requests.
"""
jwt_app, port, public_key = _create_app(requires_email)
server_url = 'http://' + jwt_app.config['SERVER_HOSTNAME']
verify_url = server_url + '/user/verify'
query_url = server_url + '/user/query'
getuser_url = server_url + '/user/get'
jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, public_key.name,
requires_email=requires_email)
with liveserver_app(jwt_app, port):
yield jwt_auth
def _generate_certs():
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
return (public_key, private_key_data)
def _create_app(emails=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
public_key, private_key_data = _generate_certs()
users = [
{'name': 'cool.user', 'email': 'user@domain.com', 'password': 'password'},
{'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
jwt_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
result = {
'username': user['name'],
}
if emails:
result['email'] = user['email']
results.append(result)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key_data, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app, _PORT_NUMBER, public_key
class JWTAuthTestMixin:
""" Mixin defining all the JWT auth tests. """
maxDiff = None
@property
def emails(self):
raise NotImplementedError
@classmethod
def setUpClass(cls):
public_key = NamedTemporaryFile(delete=True)
key = RSA.generate(1024)
private_key_data = key.exportKey('PEM')
pubkey = key.publickey()
public_key.write(pubkey.exportKey('OpenSSH'))
public_key.seek(0)
JWTAuthTestCase.public_key = public_key
JWTAuthTestCase.private_key_data = private_key_data
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
users = [
{'name': 'cooluser', 'email': 'user@domain.com', 'password': 'password'},
{'name': 'some.neat.user', 'email': 'neat@domain.com', 'password': 'foobar'}
]
jwt_app = Flask('testjwt')
private_key = JWTAuthTestCase.private_key_data
jwt_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
result = {
'username': user['name'],
}
if self.emails:
result['email'] = user['email']
results.append(result)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
if password != user['password']:
return make_response('', 404)
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
@ -153,139 +177,140 @@ class JWTAuthTestMixin(object):
self.session = requests.Session()
verify_url = self.get_server_url() + '/user/verify'
query_url = self.get_server_url() + '/user/query'
getuser_url = self.get_server_url() + '/user/get'
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name,
requires_email=self.emails)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def test_verify_and_link_user(self):
result, error_message = self.jwt_auth.verify_and_link_user('invaliduser', 'foobar')
self.assertEquals('Invalid username or password', error_message)
self.assertIsNone(result)
with fake_jwt(self.emails) as jwt_auth:
result, error_message = jwt_auth.verify_and_link_user('invaliduser', 'foobar')
self.assertEquals('Invalid username or password', error_message)
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'invalidpassword')
self.assertIsNone(result)
result, _ = jwt_auth.verify_and_link_user('cool.user', 'invalidpassword')
self.assertIsNone(result)
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
result, _ = jwt_auth.verify_and_link_user('cool.user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_confirm_existing_user(self):
# Create the users in the DB.
result, _ = self.jwt_auth.verify_and_link_user('cooluser', 'password')
self.assertIsNotNone(result)
with fake_jwt(self.emails) as jwt_auth:
# Create the users in the DB.
result, _ = jwt_auth.verify_and_link_user('cool.user', 'password')
self.assertIsNotNone(result)
result, _ = self.jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
result, _ = jwt_auth.verify_and_link_user('some.neat.user', 'foobar')
self.assertIsNotNone(result)
# Confirm a user with the same internal and external username.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'invalidpassword')
self.assertIsNone(result)
# Confirm a user with the same internal and external username.
result, _ = jwt_auth.confirm_existing_user('cool_user', 'invalidpassword')
self.assertIsNone(result)
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
result, _ = jwt_auth.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
# Fail to confirm the *external* username, which should return nothing.
result, _ = self.jwt_auth.confirm_existing_user('some.neat.user', 'password')
self.assertIsNone(result)
# Fail to confirm the *external* username, which should return nothing.
result, _ = jwt_auth.confirm_existing_user('some.neat.user', 'password')
self.assertIsNone(result)
# Now confirm the internal username.
result, _ = self.jwt_auth.confirm_existing_user('some_neat_user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
# Now confirm the internal username.
result, _ = jwt_auth.confirm_existing_user('some_neat_user', 'foobar')
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_disabled_user_custom_error(self):
result, error_message = self.jwt_auth.verify_and_link_user('disabled', 'password')
self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
with fake_jwt(self.emails) as jwt_auth:
result, error_message = jwt_auth.verify_and_link_user('disabled', 'password')
self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
def test_query(self):
# Lookup `cool`.
results, identifier, error_message = self.jwt_auth.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
with fake_jwt(self.emails) as jwt_auth:
# Lookup `cool`.
results, identifier, error_message = jwt_auth.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('cooluser', results[0].username)
self.assertEquals('user@domain.com' if self.emails else None, results[0].email)
self.assertEquals('cool.user', results[0].username)
self.assertEquals('user@domain.com' if self.emails else None, results[0].email)
# Lookup `some`.
results, identifier, error_message = self.jwt_auth.query_users('some')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
# Lookup `some`.
results, identifier, error_message = jwt_auth.query_users('some')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com' if self.emails else None, results[0].email)
self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com' if self.emails else None, results[0].email)
# Lookup `unknown`.
results, identifier, error_message = self.jwt_auth.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(0, len(results))
# Lookup `unknown`.
results, identifier, error_message = jwt_auth.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(0, len(results))
def test_get_user(self):
# Lookup cooluser.
result, error_message = self.jwt_auth.get_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
with fake_jwt(self.emails) as jwt_auth:
# Lookup cool.user.
result, error_message = jwt_auth.get_user('cool.user')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
self.assertEquals('user@domain.com', result.email)
self.assertEquals('cool.user', result.username)
self.assertEquals('user@domain.com', result.email)
# Lookup some.neat.user.
result, error_message = self.jwt_auth.get_user('some.neat.user')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
# Lookup some.neat.user.
result, error_message = jwt_auth.get_user('some.neat.user')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('some.neat.user', result.username)
self.assertEquals('neat@domain.com', result.email)
self.assertEquals('some.neat.user', result.username)
self.assertEquals('neat@domain.com', result.email)
# Lookup unknown user.
result, error_message = self.jwt_auth.get_user('unknownuser')
self.assertIsNone(result)
# Lookup unknown user.
result, error_message = jwt_auth.get_user('unknownuser')
self.assertIsNone(result)
def test_link_user(self):
# Link cooluser.
user, error_message = self.jwt_auth.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
with fake_jwt(self.emails) as jwt_auth:
# Link cool.user.
user, error_message = jwt_auth.link_user('cool.user')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cool_user', user.username)
# Link again. Should return the same user record.
user_again, _ = self.jwt_auth.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Link again. Should return the same user record.
user_again, _ = jwt_auth.link_user('cool.user')
self.assertEquals(user_again.id, user.id)
# Confirm cooluser.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
# Confirm cool.user.
result, _ = jwt_auth.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
def test_link_invalid_user(self):
user, error_message = self.jwt_auth.link_user('invaliduser')
self.assertIsNotNone(error_message)
self.assertIsNone(user)
with fake_jwt(self.emails) as jwt_auth:
user, error_message = jwt_auth.link_user('invaliduser')
self.assertIsNotNone(error_message)
self.assertIsNone(user)
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, LiveServerTestCase):
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, unittest.TestCase):
""" Test cases for JWT auth, with emails disabled. """
@property
def emails(self):
return False
class JWTAuthTestCase(JWTAuthTestMixin, LiveServerTestCase):
class JWTAuthTestCase(JWTAuthTestMixin, unittest.TestCase):
""" Test cases for JWT auth, with emails enabled. """
@property
def emails(self):
return True

View file

@ -5,197 +5,225 @@ import unittest
import requests
from flask import Flask, request, abort, make_response
from flask_testing import LiveServerTestCase
from contextlib import contextmanager
from helpers import liveserver_app
from data.users.keystone import get_keystone_users
from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
class KeystoneAuthTestsMixin():
@contextmanager
def fake_keystone(version, requires_email=True):
""" Context manager which instantiates and runs a webserver with a fake Keystone implementation,
until the result is yielded.
Usage:
with fake_keystone(version) as keystone_auth:
# Make keystone_auth requests.
"""
keystone_app, port = _create_app(requires_email)
server_url = 'http://' + keystone_app.config['SERVER_HOSTNAME']
endpoint_url = server_url + '/v3'
if version == 2:
endpoint_url = server_url + '/v2.0/auth'
keystone_auth = get_keystone_users(version, endpoint_url,
'adminuser', 'adminpass', 'admintenant',
requires_email=requires_email)
with liveserver_app(keystone_app, port):
yield keystone_auth
def _create_app(requires_email=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
server_url = 'http://localhost:%s' % (_PORT_NUMBER)
users = [
{'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'},
{'username': 'cool.user', 'name': 'Cool User', 'password': 'password'},
{'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'},
]
ks_app = Flask('testks')
ks_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
ks_app.config['DEBUG'] = True
@ks_app.route('/v2.0/admin/users/<userid>', methods=['GET'])
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if requires_email:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if requires_email:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": server_url + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
creds = request.json['auth'][u'passwordCredentials']
for user in users:
if creds['username'] == user['username'] and creds['password'] == user['password']:
return json.dumps({
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds['username'],
"tenant": {"id": "sometenant"},
},
"serviceCatalog":[
{
"endpoints": [
{
"adminURL": server_url + '/v2.0/admin',
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds['username'],
"roles_links": [],
"id": creds['username'],
"roles": [],
"name": user['name'],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
})
abort(403)
return ks_app, _PORT_NUMBER
class KeystoneAuthTestsMixin:
maxDiff = None
@property
def emails(self):
raise NotImplementedError
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
users = [
{'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'},
{'username': 'cooluser', 'name': 'Cool User', 'password': 'password'},
{'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'},
]
ks_app = Flask('testks')
ks_app.config['LIVESERVER_PORT'] = _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
ks_app.config['DEBUG'] = True
@ks_app.route('/v2.0/admin/users/<userid>', methods=['GET'])
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if self.emails:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if self.emails:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": self.get_server_url() + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
creds = request.json['auth'][u'passwordCredentials']
for user in users:
if creds['username'] == user['username'] and creds['password'] == user['password']:
return json.dumps({
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds['username'],
"tenant": {"id": "sometenant"},
},
"serviceCatalog":[
{
"endpoints": [
{
"adminURL": self.get_server_url() + '/v2.0/admin',
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds['username'],
"roles_links": [],
"id": creds['username'],
"roles": [],
"name": user['name'],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
})
abort(403)
return ks_app
def fake_keystone(self):
raise NotImplementedError
def setUp(self):
setup_database_for_testing(self)
@ -204,100 +232,95 @@ class KeystoneAuthTestsMixin():
def tearDown(self):
finished_database_for_testing(self)
@property
def keystone(self):
raise NotImplementedError
def test_invalid_user(self):
(user, _) = self.keystone.verify_credentials('unknownuser', 'password')
self.assertIsNone(user)
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('unknownuser', 'password')
self.assertIsNone(user)
def test_invalid_password(self):
(user, _) = self.keystone.verify_credentials('cooluser', 'notpassword')
self.assertIsNone(user)
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('cool.user', 'notpassword')
self.assertIsNone(user)
def test_cooluser(self):
(user, _) = self.keystone.verify_credentials('cooluser', 'password')
self.assertEquals(user.username, 'cooluser')
self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None)
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('cool.user', 'password')
self.assertEquals(user.username, 'cool.user')
self.assertEquals(user.email, 'cool.user@example.com' if self.emails else None)
def test_neatuser(self):
(user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar')
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('some.neat.user', 'foobar')
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=False)
@property
def emails(self):
return False
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=False)
@property
def emails(self):
return False
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=True)
@property
def emails(self):
return True
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=True)
def emails(self):
return True
def test_query(self):
# Lookup cool.
(response, error_message) = self.keystone.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
with self.fake_keystone() as keystone:
# Lookup cool.
(response, federated_id, error_message) = keystone.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
self.assertEquals('keystone', federated_id)
user_info = response[0]
self.assertEquals("cooluser", user_info.username)
user_info = response[0]
self.assertEquals("cool.user", user_info.username)
# Lookup unknown.
(response, error_message) = self.keystone.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
# Lookup unknown.
(response, federated_id, error_message) = keystone.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
self.assertEquals('keystone', federated_id)
def test_link_user(self):
# Link someuser.
user, error_message = self.keystone.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
self.assertEquals('cooluser@example.com', user.email)
with self.fake_keystone() as keystone:
# Link someuser.
user, error_message = keystone.link_user('cool.user')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cool_user', user.username)
self.assertEquals('cool.user@example.com', user.email)
# Link again. Should return the same user record.
user_again, _ = self.keystone.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Link again. Should return the same user record.
user_again, _ = keystone.link_user('cool.user')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = keystone.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
# Confirm someuser.
result, _ = self.keystone.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
if __name__ == '__main__':
unittest.main()

View file

@ -6,6 +6,112 @@ from data.users import LDAPUsers
from data import model
from mockldap import MockLdap
from mock import patch
from contextlib import contextmanager
def _create_ldap(requires_email=True):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
requires_email=requires_email)
return ldap
@contextmanager
def mock_ldap(requires_email=True):
mockldap = MockLdap({
'dc=quay,dc=io': {'dc': ['quay', 'io']},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees'
},
'uid=testy,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': 'testy',
'userPassword': ['password']
},
'uid=someuser,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['someuser'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=nomail,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['nomail'],
'userPassword': ['somepass']
},
'uid=cool.user,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=referred,ou=employees,dc=quay,dc=io': {
'uid': ['referred'],
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
},
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
'uid': ['invalidreferred'],
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
},
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'mail': ['foo@bar.com'],
'userPassword': ['somepass'],
},
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'another': ['key']
},
'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees',
'uid': ['secondaryuser'],
'userPassword': ['somepass'],
'mail': ['foosecondary@bar.com']
},
})
def initializer(uri, trace_level=0):
obj = mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
mockldap.start()
with patch('ldap.initialize', new=initializer):
yield _create_ldap(requires_email=requires_email)
mockldap.stop()
class TestLDAP(unittest.TestCase):
def setUp(self):
@ -14,90 +120,10 @@ class TestLDAP(unittest.TestCase):
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.mockldap = MockLdap({
'dc=quay,dc=io': {'dc': ['quay', 'io']},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees'
},
'uid=testy,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': 'testy',
'userPassword': ['password']
},
'uid=someuser,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['someuser'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=nomail,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['nomail'],
'userPassword': ['somepass']
},
'uid=cool.user,ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
},
'uid=referred,ou=employees,dc=quay,dc=io': {
'uid': ['referred'],
'_referral': 'ldap:///uid=cool.user,ou=employees,dc=quay,dc=io'
},
'uid=invalidreferred,ou=employees,dc=quay,dc=io': {
'uid': ['invalidreferred'],
'_referral': 'ldap:///uid=someinvaliduser,ou=employees,dc=quay,dc=io'
},
'uid=multientry,ou=subgroup1,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'mail': ['foo@bar.com'],
'userPassword': ['somepass'],
},
'uid=multientry,ou=subgroup2,ou=employees,dc=quay,dc=io': {
'uid': ['multientry'],
'another': ['key']
},
'uid=secondaryuser,ou=otheremployees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'otheremployees',
'uid': ['secondaryuser'],
'userPassword': ['somepass'],
'mail': ['foosecondary@bar.com']
},
})
self.mockldap.start()
self.ldap = self._create_ldap(requires_email=True)
def tearDown(self):
self.mockldap.stop()
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def _create_ldap(self, requires_email=True):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = ['ou=employees']
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
requires_email=requires_email)
return ldap
def test_invalid_admin_password(self):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
@ -106,160 +132,148 @@ class TestLDAP(unittest.TestCase):
uid_attr = 'uid'
email_attr = 'mail'
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
with mock_ldap():
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr)
self.ldap = ldap
# Try to login.
(response, err_msg) = self.ldap.verify_and_link_user('someuser', 'somepass')
self.assertIsNone(response)
self.assertEquals('LDAP Admin dn or password is invalid', err_msg)
# Try to login.
(response, err_msg) = ldap.verify_and_link_user('someuser', 'somepass')
self.assertIsNone(response)
self.assertEquals('LDAP Admin dn or password is invalid', err_msg)
def test_login(self):
# Verify we can login.
(response, _) = self.ldap.verify_and_link_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
self.assertTrue(model.user.has_user_prompt(response, 'confirm_username'))
with mock_ldap() as ldap:
# Verify we can login.
(response, _) = ldap.verify_and_link_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
self.assertTrue(model.user.has_user_prompt(response, 'confirm_username'))
# Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can confirm the user.
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
def test_login_secondary(self):
# Verify we can login.
(response, _) = self.ldap.verify_and_link_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser')
with mock_ldap() as ldap:
# Verify we can login.
(response, _) = ldap.verify_and_link_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser')
# Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser')
# Verify we can confirm the user.
(response, _) = ldap.confirm_existing_user('secondaryuser', 'somepass')
self.assertEquals(response.username, 'secondaryuser')
def test_invalid_password(self):
# Verify we cannot login with an invalid password.
(response, err_msg) = self.ldap.verify_and_link_user('someuser', 'invalidpass')
self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid password')
with mock_ldap() as ldap:
# Verify we cannot login with an invalid password.
(response, err_msg) = ldap.verify_and_link_user('someuser', 'invalidpass')
self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid password')
# Verify we cannot confirm the user.
(response, err_msg) = self.ldap.confirm_existing_user('someuser', 'invalidpass')
self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid user')
# Verify we cannot confirm the user.
(response, err_msg) = ldap.confirm_existing_user('someuser', 'invalidpass')
self.assertIsNone(response)
self.assertEquals(err_msg, 'Invalid user')
def test_missing_mail(self):
(response, err_msg) = self.ldap.get_user('nomail')
self.assertIsNone(response)
self.assertEquals('Missing mail field "mail" in user record', err_msg)
with mock_ldap() as ldap:
(response, err_msg) = ldap.get_user('nomail')
self.assertIsNone(response)
self.assertEquals('Missing mail field "mail" in user record', err_msg)
def test_missing_mail_allowed(self):
ldap = self._create_ldap(requires_email=False)
(response, _) = ldap.get_user('nomail')
self.assertEquals(response.username, 'nomail')
with mock_ldap(requires_email=False) as ldap:
(response, _) = ldap.get_user('nomail')
self.assertEquals(response.username, 'nomail')
def test_confirm_different_username(self):
# Verify that the user is logged in and their username was adjusted.
(response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass')
self.assertEquals(response.username, 'cool_user')
with mock_ldap() as ldap:
# Verify that the user is logged in and their username was adjusted.
(response, _) = ldap.verify_and_link_user('cool.user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify that we *cannot* confirm the LDAP username.
(response, _) = self.ldap.confirm_existing_user('cool.user', 'somepass')
self.assertIsNone(response)
# Verify that we *cannot* confirm the LDAP username.
(response, _) = ldap.confirm_existing_user('cool.user', 'somepass')
self.assertIsNone(response)
def test_referral(self):
(response, _) = self.ldap.verify_and_link_user('referred', 'somepass')
self.assertEquals(response.username, 'cool_user')
with mock_ldap() as ldap:
(response, _) = ldap.verify_and_link_user('referred', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = self.ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
# Verify we can confirm the user's quay username.
(response, _) = ldap.confirm_existing_user('cool_user', 'somepass')
self.assertEquals(response.username, 'cool_user')
def test_invalid_referral(self):
(response, _) = self.ldap.verify_and_link_user('invalidreferred', 'somepass')
self.assertIsNone(response)
with mock_ldap() as ldap:
(response, _) = ldap.verify_and_link_user('invalidreferred', 'somepass')
self.assertIsNone(response)
def test_multientry(self):
(response, _) = self.ldap.verify_and_link_user('multientry', 'somepass')
self.assertEquals(response.username, 'multientry')
with mock_ldap() as ldap:
(response, _) = ldap.verify_and_link_user('multientry', 'somepass')
self.assertEquals(response.username, 'multientry')
def test_login_empty_userdn(self):
base_dn = ['ou=employees', 'dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = []
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
with mock_ldap():
base_dn = ['ou=employees', 'dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
user_rdn = []
uid_attr = 'uid'
email_attr = 'mail'
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)
self.ldap = ldap
# Verify we can login.
(response, _) = ldap.verify_and_link_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can login.
(response, _) = self.ldap.verify_and_link_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can confirm the user.
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
# Verify we can confirm the user.
(response, _) = ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
def test_link_user(self):
# Link someuser.
user, error_message = self.ldap.link_user('someuser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('someuser', user.username)
with mock_ldap() as ldap:
# Link someuser.
user, error_message = ldap.link_user('someuser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('someuser', user.username)
# Link again. Should return the same user record.
user_again, _ = self.ldap.link_user('someuser')
self.assertEquals(user_again.id, user.id)
# Link again. Should return the same user record.
user_again, _ = ldap.link_user('someuser')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertIsNotNone(result)
self.assertEquals('someuser', result.username)
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
# Confirm someuser.
result, _ = ldap.confirm_existing_user('someuser', 'somepass')
self.assertIsNotNone(result)
self.assertEquals('someuser', result.username)
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
def test_query(self):
def initializer(uri, trace_level=0):
obj = self.mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
with patch('ldap.initialize', new=initializer):
with mock_ldap() as ldap:
# Lookup cool.
(response, error_message) = self.ldap.query_users('cool')
(response, federated_id, error_message) = ldap.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
self.assertEquals('ldap', federated_id)
user_info = response[0]
self.assertEquals("cool.user", user_info.username)
self.assertEquals("foo@bar.com", user_info.email)
# Lookup unknown.
(response, error_message) = self.ldap.query_users('unknown')
(response, federated_id, error_message) = ldap.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
self.assertEquals('ldap', federated_id)
if __name__ == '__main__':