Make email addresses optional in external auth if email feature is turned off
Before this change, external auth such as Keystone would fail if a user without an email address tried to login, even if the email feature was disabled.
This commit is contained in:
parent
934cdecbd6
commit
d7f56350a4
18 changed files with 206 additions and 93 deletions
|
@ -18,10 +18,13 @@ from initdb import setup_database_for_testing, finished_database_for_testing
|
|||
|
||||
_PORT_NUMBER = 5001
|
||||
|
||||
|
||||
class JWTAuthTestCase(LiveServerTestCase):
|
||||
class JWTAuthTestMixin(object):
|
||||
maxDiff = None
|
||||
|
||||
@property
|
||||
def emails(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
public_key = NamedTemporaryFile(delete=True)
|
||||
|
@ -60,10 +63,14 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
|
||||
for user in users:
|
||||
if user['name'].startswith(query):
|
||||
results.append({
|
||||
result = {
|
||||
'username': user['name'],
|
||||
'email': user['email'],
|
||||
})
|
||||
}
|
||||
|
||||
if self.emails:
|
||||
result['email'] = user['email']
|
||||
|
||||
results.append(result)
|
||||
|
||||
token_data = {
|
||||
'iss': 'authy',
|
||||
|
@ -95,7 +102,7 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
'iat': datetime.utcnow(),
|
||||
'exp': datetime.utcnow() + timedelta(seconds=60),
|
||||
'sub': user['name'],
|
||||
'email': user['email']
|
||||
'email': user['email'],
|
||||
}
|
||||
|
||||
encoded = jwt.encode(token_data, private_key, 'RS256')
|
||||
|
@ -124,7 +131,7 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
'iat': datetime.utcnow(),
|
||||
'exp': datetime.utcnow() + timedelta(seconds=60),
|
||||
'sub': user['name'],
|
||||
'email': user['email']
|
||||
'email': user['email'],
|
||||
}
|
||||
|
||||
encoded = jwt.encode(token_data, private_key, 'RS256')
|
||||
|
@ -151,7 +158,8 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
getuser_url = self.get_server_url() + '/user/get'
|
||||
|
||||
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
|
||||
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name)
|
||||
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name,
|
||||
requires_email=self.emails)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
@ -211,7 +219,7 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
self.assertEquals(1, len(results))
|
||||
|
||||
self.assertEquals('cooluser', results[0].username)
|
||||
self.assertEquals('user@domain.com', results[0].email)
|
||||
self.assertEquals('user@domain.com' if self.emails else None, results[0].email)
|
||||
|
||||
# Lookup `some`.
|
||||
results, identifier, error_message = self.jwt_auth.query_users('some')
|
||||
|
@ -220,7 +228,7 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
self.assertEquals(1, len(results))
|
||||
|
||||
self.assertEquals('some.neat.user', results[0].username)
|
||||
self.assertEquals('neat@domain.com', results[0].email)
|
||||
self.assertEquals('neat@domain.com' if self.emails else None, results[0].email)
|
||||
|
||||
# Lookup `unknown`.
|
||||
results, identifier, error_message = self.jwt_auth.query_users('unknown')
|
||||
|
@ -271,5 +279,17 @@ class JWTAuthTestCase(LiveServerTestCase):
|
|||
self.assertIsNone(user)
|
||||
|
||||
|
||||
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, LiveServerTestCase):
|
||||
@property
|
||||
def emails(self):
|
||||
return False
|
||||
|
||||
|
||||
class JWTAuthTestCase(JWTAuthTestMixin, LiveServerTestCase):
|
||||
@property
|
||||
def emails(self):
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -15,6 +15,10 @@ _PORT_NUMBER = 5001
|
|||
class KeystoneAuthTestsMixin():
|
||||
maxDiff = None
|
||||
|
||||
@property
|
||||
def emails(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def create_app(self):
|
||||
global _PORT_NUMBER
|
||||
_PORT_NUMBER = _PORT_NUMBER + 1
|
||||
|
@ -35,10 +39,12 @@ class KeystoneAuthTestsMixin():
|
|||
def getuser(userid):
|
||||
for user in users:
|
||||
if user['username'] == userid:
|
||||
user_data = {}
|
||||
if self.emails:
|
||||
user_data['email'] = userid + '@example.com'
|
||||
|
||||
return json.dumps({
|
||||
'user': {
|
||||
'email': userid + '@example.com',
|
||||
}
|
||||
'user': user_data
|
||||
})
|
||||
|
||||
abort(404)
|
||||
|
@ -47,15 +53,19 @@ class KeystoneAuthTestsMixin():
|
|||
def getv3user(userid):
|
||||
for user in users:
|
||||
if user['username'] == userid:
|
||||
user_data = {
|
||||
"domain_id": "default",
|
||||
"enabled": True,
|
||||
"id": user['username'],
|
||||
"links": {},
|
||||
"name": user['username'],
|
||||
}
|
||||
|
||||
if self.emails:
|
||||
user_data['email'] = user['username'] + '@example.com'
|
||||
|
||||
return json.dumps({
|
||||
'user': {
|
||||
"domain_id": "default",
|
||||
"enabled": True,
|
||||
"id": user['username'],
|
||||
"links": {},
|
||||
"name": user['username'],
|
||||
"email": user['username'] + '@example.com',
|
||||
}
|
||||
'user': user_data
|
||||
})
|
||||
|
||||
abort(404)
|
||||
|
@ -209,24 +219,54 @@ class KeystoneAuthTestsMixin():
|
|||
def test_cooluser(self):
|
||||
(user, _) = self.keystone.verify_credentials('cooluser', 'password')
|
||||
self.assertEquals(user.username, 'cooluser')
|
||||
self.assertEquals(user.email, 'cooluser@example.com')
|
||||
self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None)
|
||||
|
||||
def test_neatuser(self):
|
||||
(user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar')
|
||||
self.assertEquals(user.username, 'some.neat.user')
|
||||
self.assertEquals(user.email, 'some.neat.user@example.com')
|
||||
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
|
||||
|
||||
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
|
||||
@property
|
||||
def keystone(self):
|
||||
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
|
||||
'adminuser', 'adminpass', 'admintenant',
|
||||
requires_email=False)
|
||||
|
||||
@property
|
||||
def emails(self):
|
||||
return False
|
||||
|
||||
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
|
||||
@property
|
||||
def keystone(self):
|
||||
return get_keystone_users(3, self.get_server_url() + '/v3',
|
||||
'adminuser', 'adminpass', 'admintenant',
|
||||
requires_email=False)
|
||||
@property
|
||||
def emails(self):
|
||||
return False
|
||||
|
||||
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
|
||||
@property
|
||||
def keystone(self):
|
||||
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
|
||||
'adminuser', 'adminpass', 'admintenant')
|
||||
'adminuser', 'adminpass', 'admintenant',
|
||||
requires_email=True)
|
||||
|
||||
@property
|
||||
def emails(self):
|
||||
return True
|
||||
|
||||
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
|
||||
@property
|
||||
def keystone(self):
|
||||
return get_keystone_users(3, self.get_server_url() + '/v3',
|
||||
'adminuser', 'adminpass', 'admintenant')
|
||||
'adminuser', 'adminpass', 'admintenant',
|
||||
requires_email=True)
|
||||
|
||||
def emails(self):
|
||||
return True
|
||||
|
||||
def test_query(self):
|
||||
# Lookup cool.
|
||||
|
|
|
@ -76,7 +76,14 @@ class TestLDAP(unittest.TestCase):
|
|||
})
|
||||
|
||||
self.mockldap.start()
|
||||
self.ldap = self._create_ldap(requires_email=True)
|
||||
|
||||
def tearDown(self):
|
||||
self.mockldap.stop()
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
|
||||
def _create_ldap(self, requires_email=True):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
admin_passwd = 'password'
|
||||
|
@ -86,15 +93,9 @@ class TestLDAP(unittest.TestCase):
|
|||
secondary_user_rdns = ['ou=otheremployees']
|
||||
|
||||
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
|
||||
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)
|
||||
|
||||
self.ldap = ldap
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
self.mockldap.stop()
|
||||
finished_database_for_testing(self)
|
||||
self.ctx.__exit__(True, None, None)
|
||||
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
|
||||
requires_email=requires_email)
|
||||
return ldap
|
||||
|
||||
def test_invalid_admin_password(self):
|
||||
base_dn = ['dc=quay', 'dc=io']
|
||||
|
@ -144,10 +145,15 @@ class TestLDAP(unittest.TestCase):
|
|||
self.assertEquals(err_msg, 'Invalid user')
|
||||
|
||||
def test_missing_mail(self):
|
||||
(response, err_msg) = self.ldap.verify_and_link_user('nomail', 'somepass')
|
||||
(response, err_msg) = self.ldap.get_user('nomail')
|
||||
self.assertIsNone(response)
|
||||
self.assertEquals('Missing mail field "mail" in user record', err_msg)
|
||||
|
||||
def test_missing_mail_allowed(self):
|
||||
ldap = self._create_ldap(requires_email=False)
|
||||
(response, _) = ldap.get_user('nomail')
|
||||
self.assertEquals(response.username, 'nomail')
|
||||
|
||||
def test_confirm_different_username(self):
|
||||
# Verify that the user is logged in and their username was adjusted.
|
||||
(response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass')
|
||||
|
|
Reference in a new issue