This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_keystone_auth.py
2017-04-03 11:31:29 -04:00

326 lines
9.5 KiB
Python

import json
import os
import unittest
import requests
from flask import Flask, request, abort, make_response
from contextlib import contextmanager
from test.helpers import liveserver_app
from data.users.keystone import get_keystone_users
from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
@contextmanager
def fake_keystone(version=3, requires_email=True):
""" Context manager which instantiates and runs a webserver with a fake Keystone implementation,
until the result is yielded.
Usage:
with fake_keystone(version) as keystone_auth:
# Make keystone_auth requests.
"""
keystone_app, port = _create_app(requires_email)
server_url = 'http://' + keystone_app.config['SERVER_HOSTNAME']
endpoint_url = server_url + '/v3'
if version == 2:
endpoint_url = server_url + '/v2.0/auth'
keystone_auth = get_keystone_users(version, endpoint_url,
'adminuser', 'adminpass', 'admintenant',
requires_email=requires_email)
with liveserver_app(keystone_app, port):
yield keystone_auth
def _create_app(requires_email=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
server_url = 'http://localhost:%s' % (_PORT_NUMBER)
users = [
{'username': 'adminuser', 'name': 'Admin User', 'password': 'adminpass'},
{'username': 'cool.user', 'name': 'Cool User', 'password': 'password'},
{'username': 'some.neat.user', 'name': 'Neat User', 'password': 'foobar'},
]
ks_app = Flask('testks')
ks_app.config['SERVER_HOSTNAME'] = 'localhost:%s' % _PORT_NUMBER
if os.environ.get('DEBUG') == 'true':
ks_app.config['DEBUG'] = True
@ks_app.route('/v2.0/admin/users/<userid>', methods=['GET'])
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if requires_email:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if requires_email:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': user_data
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": server_url + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
creds = request.json['auth'][u'passwordCredentials']
for user in users:
if creds['username'] == user['username'] and creds['password'] == user['password']:
return json.dumps({
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds['username'],
"tenant": {"id": "sometenant"},
},
"serviceCatalog":[
{
"endpoints": [
{
"adminURL": server_url + '/v2.0/admin',
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds['username'],
"roles_links": [],
"id": creds['username'],
"roles": [],
"name": user['name'],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
})
abort(403)
return ks_app, _PORT_NUMBER
class KeystoneAuthTestsMixin:
maxDiff = None
@property
def emails(self):
raise NotImplementedError
def fake_keystone(self):
raise NotImplementedError
def setUp(self):
setup_database_for_testing(self)
self.session = requests.Session()
def tearDown(self):
finished_database_for_testing(self)
def test_invalid_user(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('unknownuser', 'password')
self.assertIsNone(user)
def test_invalid_password(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('cool.user', 'notpassword')
self.assertIsNone(user)
def test_cooluser(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('cool.user', 'password')
self.assertEquals(user.username, 'cool.user')
self.assertEquals(user.email, 'cool.user@example.com' if self.emails else None)
def test_neatuser(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials('some.neat.user', 'foobar')
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=False)
@property
def emails(self):
return False
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=False)
@property
def emails(self):
return False
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=True)
@property
def emails(self):
return True
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=True)
def emails(self):
return True
def test_query(self):
with self.fake_keystone() as keystone:
# Lookup cool.
(response, federated_id, error_message) = keystone.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
self.assertEquals('keystone', federated_id)
user_info = response[0]
self.assertEquals("cool.user", user_info.username)
# Lookup unknown.
(response, federated_id, error_message) = keystone.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
self.assertEquals('keystone', federated_id)
def test_link_user(self):
with self.fake_keystone() as keystone:
# Link someuser.
user, error_message = keystone.link_user('cool.user')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cool_user', user.username)
self.assertEquals('cool.user@example.com', user.email)
# Link again. Should return the same user record.
user_again, _ = keystone.link_user('cool.user')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = keystone.confirm_existing_user('cool_user', 'password')
self.assertIsNotNone(result)
self.assertEquals('cool_user', result.username)
if __name__ == '__main__':
unittest.main()