Add support to Keystone Auth for external user linking

Also adds Keystone V3 support
This commit is contained in:
Joseph Schorr 2016-10-27 15:35:52 -04:00
parent fbb524e34e
commit b3d1d7227c
5 changed files with 262 additions and 17 deletions

View file

@ -4,16 +4,15 @@ import unittest
import requests
from flask import Flask, request, abort
from flask import Flask, request, abort, make_response
from flask_testing import LiveServerTestCase
from data.users.keystone import KeystoneUsers
from data.users.keystone import get_keystone_users
from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
class KeystoneAuthTests(LiveServerTestCase):
class KeystoneAuthTestsMixin():
maxDiff = None
def create_app(self):
@ -44,6 +43,106 @@ class KeystoneAuthTests(LiveServerTestCase):
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
return json.dumps({
'user': {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
}
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": self.get_server_url() + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
@ -89,9 +188,15 @@ class KeystoneAuthTests(LiveServerTestCase):
return ks_app
def setUp(self):
setup_database_for_testing(self)
self.session = requests.Session()
self.keystone = KeystoneUsers(self.get_server_url() + '/v2.0/auth', 'adminuser', 'adminpass',
'admintenant')
def tearDown(self):
finished_database_for_testing(self)
@property
def keystone(self):
raise NotImplementedError
def test_invalid_user(self):
(user, _) = self.keystone.verify_credentials('unknownuser', 'password')
@ -111,6 +216,48 @@ class KeystoneAuthTests(LiveServerTestCase):
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com')
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant')
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant')
def test_query(self):
# Lookup cool.
(response, error_message) = self.keystone.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
user_info = response[0]
self.assertEquals("cooluser", user_info.username)
# Lookup unknown.
(response, error_message) = self.keystone.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
def test_link_user(self):
# Link someuser.
user, error_message = self.keystone.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
self.assertEquals('cooluser@example.com', user.email)
# Link again. Should return the same user record.
user_again, _ = self.keystone.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = self.keystone.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
if __name__ == '__main__':
unittest.main()