Merge pull request #1905 from coreos-inc/external-auth-search

Add support for entity search against external auth users not yet linked
This commit is contained in:
josephschorr 2016-10-27 16:06:42 -04:00 committed by GitHub
commit 934cdecbd6
16 changed files with 817 additions and 100 deletions

Binary file not shown.

View file

@ -53,6 +53,58 @@ class JWTAuthTestCase(LiveServerTestCase):
data = base64.b64decode(request.headers['Authorization'][len('Basic '):])
return data.split(':', 1)
@jwt_app.route('/user/query', methods=['GET'])
def query_users():
query = request.args.get('query')
results = []
for user in users:
if user['name'].startswith(query):
results.append({
'username': user['name'],
'email': user['email'],
})
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/query',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'results': results,
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
@jwt_app.route('/user/get', methods=['GET'])
def get_user():
username = request.args.get('username')
if username == 'disabled':
return make_response('User is currently disabled', 401)
for user in users:
if user['name'] == username or user['email'] == username:
token_data = {
'iss': 'authy',
'aud': 'quay.io/jwtauthn/getuser',
'nbf': datetime.utcnow(),
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
}
encoded = jwt.encode(token_data, private_key, 'RS256')
return jsonify({
'token': encoded
})
return make_response('Invalid username or password', 404)
@jwt_app.route('/user/verify', methods=['GET'])
def verify_user():
username, password = _get_basic_auth()
@ -80,7 +132,7 @@ class JWTAuthTestCase(LiveServerTestCase):
'token': encoded
})
return make_response('', 404)
return make_response('Invalid username or password', 404)
jwt_app.config['TESTING'] = True
return jwt_app
@ -94,7 +146,11 @@ class JWTAuthTestCase(LiveServerTestCase):
self.session = requests.Session()
self.jwt_auth = ExternalJWTAuthN(self.get_server_url() + '/user/verify', 'authy', '',
verify_url = self.get_server_url() + '/user/verify'
query_url = self.get_server_url() + '/user/query'
getuser_url = self.get_server_url() + '/user/get'
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name)
def tearDown(self):
@ -142,11 +198,78 @@ class JWTAuthTestCase(LiveServerTestCase):
self.assertIsNotNone(result)
self.assertEquals('some_neat_user', result.username)
def test_disabled_user_custom_erorr(self):
def test_disabled_user_custom_error(self):
result, error_message = self.jwt_auth.verify_and_link_user('disabled', 'password')
self.assertIsNone(result)
self.assertEquals('User is currently disabled', error_message)
def test_query(self):
# Lookup `cool`.
results, identifier, error_message = self.jwt_auth.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('cooluser', results[0].username)
self.assertEquals('user@domain.com', results[0].email)
# Lookup `some`.
results, identifier, error_message = self.jwt_auth.query_users('some')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(1, len(results))
self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com', results[0].email)
# Lookup `unknown`.
results, identifier, error_message = self.jwt_auth.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals('jwtauthn', identifier)
self.assertEquals(0, len(results))
def test_get_user(self):
# Lookup cooluser.
result, error_message = self.jwt_auth.get_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
self.assertEquals('user@domain.com', result.email)
# Lookup some.neat.user.
result, error_message = self.jwt_auth.get_user('some.neat.user')
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEquals('some.neat.user', result.username)
self.assertEquals('neat@domain.com', result.email)
# Lookup unknown user.
result, error_message = self.jwt_auth.get_user('unknownuser')
self.assertIsNone(result)
def test_link_user(self):
# Link cooluser.
user, error_message = self.jwt_auth.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
# Link again. Should return the same user record.
user_again, _ = self.jwt_auth.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Confirm cooluser.
result, _ = self.jwt_auth.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
def test_link_invalid_user(self):
user, error_message = self.jwt_auth.link_user('invaliduser')
self.assertIsNotNone(error_message)
self.assertIsNone(user)
if __name__ == '__main__':
unittest.main()

View file

@ -4,16 +4,15 @@ import unittest
import requests
from flask import Flask, request, abort
from flask import Flask, request, abort, make_response
from flask_testing import LiveServerTestCase
from data.users.keystone import KeystoneUsers
from data.users.keystone import get_keystone_users
from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
class KeystoneAuthTests(LiveServerTestCase):
class KeystoneAuthTestsMixin():
maxDiff = None
def create_app(self):
@ -44,6 +43,106 @@ class KeystoneAuthTests(LiveServerTestCase):
abort(404)
@ks_app.route('/v3/identity/users/<userid>', methods=['GET'])
def getv3user(userid):
for user in users:
if user['username'] == userid:
return json.dumps({
'user': {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
}
})
abort(404)
@ks_app.route('/v3/identity/users', methods=['GET'])
def v3identity():
returned = []
for user in users:
if not request.args.get('name') or user['username'].startswith(request.args.get('name')):
returned.append({
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
})
return json.dumps({"users": returned})
@ks_app.route('/v3/auth/tokens', methods=['POST'])
def v3tokens():
creds = request.json['auth']['identity']['password']['user']
for user in users:
if creds['name'] == user['username'] and creds['password'] == user['password']:
data = json.dumps({
"token": {
"methods": [
"password"
],
"roles": [
{
"id": "9fe2ff9ee4384b1894a90878d3e92bab",
"name": "_member_"
},
{
"id": "c703057be878458588961ce9a0ce686b",
"name": "admin"
}
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin"
},
"catalog": [
{
"endpoints": [
{
"url": self.get_server_url() + '/v3/identity',
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88"
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone"
}
],
"extras": {
},
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": user['username'],
"name": "admin"
},
"audit_ids": [
"yRt0UrxJSs6-WYJgwEMMmg"
],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
})
response = make_response(data, 200)
response.headers['X-Subject-Token'] = 'sometoken'
return response
abort(403)
@ks_app.route('/v2.0/auth/tokens', methods=['POST'])
def tokens():
@ -89,9 +188,15 @@ class KeystoneAuthTests(LiveServerTestCase):
return ks_app
def setUp(self):
setup_database_for_testing(self)
self.session = requests.Session()
self.keystone = KeystoneUsers(self.get_server_url() + '/v2.0/auth', 'adminuser', 'adminpass',
'admintenant')
def tearDown(self):
finished_database_for_testing(self)
@property
def keystone(self):
raise NotImplementedError
def test_invalid_user(self):
(user, _) = self.keystone.verify_credentials('unknownuser', 'password')
@ -111,6 +216,48 @@ class KeystoneAuthTests(LiveServerTestCase):
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com')
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant')
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant')
def test_query(self):
# Lookup cool.
(response, error_message) = self.keystone.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
user_info = response[0]
self.assertEquals("cooluser", user_info.username)
# Lookup unknown.
(response, error_message) = self.keystone.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
def test_link_user(self):
# Link someuser.
user, error_message = self.keystone.link_user('cooluser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('cooluser', user.username)
self.assertEquals('cooluser@example.com', user.email)
# Link again. Should return the same user record.
user_again, _ = self.keystone.link_user('cooluser')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = self.keystone.confirm_existing_user('cooluser', 'password')
self.assertIsNotNone(result)
self.assertEquals('cooluser', result.username)
if __name__ == '__main__':
unittest.main()

View file

@ -4,6 +4,7 @@ from app import app
from initdb import setup_database_for_testing, finished_database_for_testing
from data.users import LDAPUsers
from mockldap import MockLdap
from mock import patch
class TestLDAP(unittest.TestCase):
def setUp(self):
@ -198,6 +199,60 @@ class TestLDAP(unittest.TestCase):
(response, _) = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertEquals(response.username, 'someuser')
def test_link_user(self):
# Link someuser.
user, error_message = self.ldap.link_user('someuser')
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEquals('someuser', user.username)
# Link again. Should return the same user record.
user_again, _ = self.ldap.link_user('someuser')
self.assertEquals(user_again.id, user.id)
# Confirm someuser.
result, _ = self.ldap.confirm_existing_user('someuser', 'somepass')
self.assertIsNotNone(result)
self.assertEquals('someuser', result.username)
def test_query(self):
def initializer(uri, trace_level=0):
obj = self.mockldap[uri]
# Seed to "support" wildcard queries, which MockLDAP does not support natively.
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([
('uid=cool.user,ou=employees,dc=quay,dc=io', {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': ['cool.user', 'referred'],
'userPassword': ['somepass'],
'mail': ['foo@bar.com']
})
])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2, '(|(uid=cool*)(mail=cool*))')([])
obj.search_s.seed('ou=employees,dc=quay,dc=io', 2, '(|(uid=unknown*)(mail=unknown*))')([])
obj.search_s.seed('ou=otheremployees,dc=quay,dc=io', 2,
'(|(uid=unknown*)(mail=unknown*))')([])
return obj
with patch('ldap.initialize', new=initializer):
# Lookup cool.
(response, error_message) = self.ldap.query_users('cool')
self.assertIsNone(error_message)
self.assertEquals(1, len(response))
user_info = response[0]
self.assertEquals("cool.user", user_info.username)
self.assertEquals("foo@bar.com", user_info.email)
# Lookup unknown.
(response, error_message) = self.ldap.query_users('unknown')
self.assertIsNone(error_message)
self.assertEquals(0, len(response))
if __name__ == '__main__':
unittest.main()