import json import os import unittest import requests from flask import Flask, request, abort, make_response from flask_testing import LiveServerTestCase from data.users.keystone import get_keystone_users from initdb import setup_database_for_testing, finished_database_for_testing _PORT_NUMBER = 5001 class KeystoneAuthTestsMixin(): maxDiff = None @property def emails(self): raise NotImplementedError def create_app(self): global _PORT_NUMBER _PORT_NUMBER = _PORT_NUMBER + 1 users = [ {'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'}, {'username': 'cooluser', 'name': 'Cool User', 'password': 'password'}, {'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'}, ] ks_app = Flask('testks') ks_app.config['LIVESERVER_PORT'] = _PORT_NUMBER if os.environ.get('DEBUG') == 'true': ks_app.config['DEBUG'] = True @ks_app.route('/v2.0/admin/users/', methods=['GET']) def getuser(userid): for user in users: if user['username'] == userid: user_data = {} if self.emails: user_data['email'] = userid + '@example.com' return json.dumps({ 'user': user_data }) abort(404) @ks_app.route('/v3/identity/users/', methods=['GET']) def getv3user(userid): for user in users: if user['username'] == userid: user_data = { "domain_id": "default", "enabled": True, "id": user['username'], "links": {}, "name": user['username'], } if self.emails: user_data['email'] = user['username'] + '@example.com' return json.dumps({ 'user': user_data }) abort(404) @ks_app.route('/v3/identity/users', methods=['GET']) def v3identity(): returned = [] for user in users: if not request.args.get('name') or user['username'].startswith(request.args.get('name')): returned.append({ "domain_id": "default", "enabled": True, "id": user['username'], "links": {}, "name": user['username'], "email": user['username'] + '@example.com', }) return json.dumps({"users": returned}) @ks_app.route('/v3/auth/tokens', methods=['POST']) def v3tokens(): creds = request.json['auth']['identity']['password']['user'] for user in users: if creds['name'] == user['username'] and creds['password'] == user['password']: data = json.dumps({ "token": { "methods": [ "password" ], "roles": [ { "id": "9fe2ff9ee4384b1894a90878d3e92bab", "name": "_member_" }, { "id": "c703057be878458588961ce9a0ce686b", "name": "admin" } ], "project": { "domain": { "id": "default", "name": "Default" }, "id": "8538a3f13f9541b28c2620eb19065e45", "name": "admin" }, "catalog": [ { "endpoints": [ { "url": self.get_server_url() + '/v3/identity', "region": "RegionOne", "interface": "admin", "id": "29beb2f1567642eb810b042b6719ea88" }, ], "type": "identity", "id": "bd73972c0e14fb69bae8ff76e112a90", "name": "keystone" } ], "extras": { }, "user": { "domain": { "id": "default", "name": "Default" }, "id": user['username'], "name": "admin" }, "audit_ids": [ "yRt0UrxJSs6-WYJgwEMMmg" ], "issued_at": "2014-06-16T22:24:26.089380", "expires_at": "2020-06-16T23:24:26Z", } }) response = make_response(data, 200) response.headers['X-Subject-Token'] = 'sometoken' return response abort(403) @ks_app.route('/v2.0/auth/tokens', methods=['POST']) def tokens(): creds = request.json['auth'][u'passwordCredentials'] for user in users: if creds['username'] == user['username'] and creds['password'] == user['password']: return json.dumps({ "access": { "token": { "issued_at": "2014-06-16T22:24:26.089380", "expires": "2020-06-16T23:24:26Z", "id": creds['username'], "tenant": {"id": "sometenant"}, }, "serviceCatalog":[ { "endpoints": [ { "adminURL": self.get_server_url() + '/v2.0/admin', } ], "endpoints_links": [], "type": "identity", "name": "admin", }, ], "user": { "username": creds['username'], "roles_links": [], "id": creds['username'], "roles": [], "name": user['name'], }, "metadata": { "is_admin": 0, "roles": [], }, }, }) abort(403) return ks_app def setUp(self): setup_database_for_testing(self) self.session = requests.Session() def tearDown(self): finished_database_for_testing(self) @property def keystone(self): raise NotImplementedError def test_invalid_user(self): (user, _) = self.keystone.verify_credentials('unknownuser', 'password') self.assertIsNone(user) def test_invalid_password(self): (user, _) = self.keystone.verify_credentials('cooluser', 'notpassword') self.assertIsNone(user) def test_cooluser(self): (user, _) = self.keystone.verify_credentials('cooluser', 'password') self.assertEquals(user.username, 'cooluser') self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None) def test_neatuser(self): (user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar') self.assertEquals(user.username, 'some.neat.user') self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None) class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase): @property def keystone(self): return get_keystone_users(2, self.get_server_url() + '/v2.0/auth', 'adminuser', 'adminpass', 'admintenant', requires_email=False) @property def emails(self): return False class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase): @property def keystone(self): return get_keystone_users(3, self.get_server_url() + '/v3', 'adminuser', 'adminpass', 'admintenant', requires_email=False) @property def emails(self): return False class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): @property def keystone(self): return get_keystone_users(2, self.get_server_url() + '/v2.0/auth', 'adminuser', 'adminpass', 'admintenant', requires_email=True) @property def emails(self): return True class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase): @property def keystone(self): return get_keystone_users(3, self.get_server_url() + '/v3', 'adminuser', 'adminpass', 'admintenant', requires_email=True) def emails(self): return True def test_query(self): # Lookup cool. (response, error_message) = self.keystone.query_users('cool') self.assertIsNone(error_message) self.assertEquals(1, len(response)) user_info = response[0] self.assertEquals("cooluser", user_info.username) # Lookup unknown. (response, error_message) = self.keystone.query_users('unknown') self.assertIsNone(error_message) self.assertEquals(0, len(response)) def test_link_user(self): # Link someuser. user, error_message = self.keystone.link_user('cooluser') self.assertIsNone(error_message) self.assertIsNotNone(user) self.assertEquals('cooluser', user.username) self.assertEquals('cooluser@example.com', user.email) # Link again. Should return the same user record. user_again, _ = self.keystone.link_user('cooluser') self.assertEquals(user_again.id, user.id) # Confirm someuser. result, _ = self.keystone.confirm_existing_user('cooluser', 'password') self.assertIsNotNone(result) self.assertEquals('cooluser', result.username) if __name__ == '__main__': unittest.main()