Merge pull request #2047 from coreos-inc/external-auth-email-optional

Make email addresses optional in external auth if email feature is turned off
This commit is contained in:
josephschorr 2016-10-31 14:16:33 -04:00 committed by GitHub
commit 840ea4e768
18 changed files with 206 additions and 93 deletions

View file

@ -5,10 +5,10 @@ from data.model import (user, team, DataModelException, InvalidOrganizationExcep
InvalidUsernameException, db_transaction, _basequery)
def create_organization(name, email, creating_user):
def create_organization(name, email, creating_user, email_required=True):
try:
# Create the org
new_org = user.create_user_noverify(name, email)
new_org = user.create_user_noverify(name, email, email_required=email_required)
new_org.organization = True
new_org.save()

View file

@ -1,6 +1,7 @@
import bcrypt
import logging
import json
import uuid
from peewee import JOIN_LEFT_OUTER, IntegrityError, fn
from uuid import uuid4
@ -31,13 +32,12 @@ def hash_password(password, salt=None):
salt = salt or bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt)
def create_user(username, password, email, auto_verify=False):
def create_user(username, password, email, auto_verify=False, email_required=True):
""" Creates a regular user, if allowed. """
if not validate_password(password):
raise InvalidPasswordException(INVALID_PASSWORD_MESSAGE)
created = create_user_noverify(username, email)
created = create_user_noverify(username, email, email_required=email_required)
created.password_hash = hash_password(password)
created.verified = auto_verify
created.save()
@ -45,9 +45,14 @@ def create_user(username, password, email, auto_verify=False):
return created
def create_user_noverify(username, email):
if not validate_email(email):
raise InvalidEmailAddressException('Invalid email address: %s' % email)
def create_user_noverify(username, email, email_required=True):
if email_required:
if not validate_email(email):
raise InvalidEmailAddressException('Invalid email address: %s' % email)
else:
# If email addresses are not required and none was specified, then we just use a unique
# ID to ensure that the database consistency check remains intact.
email = email or str(uuid.uuid4())
(username_valid, username_issue) = validate_username(username)
if not username_valid:
@ -300,8 +305,8 @@ def list_entity_robot_permission_teams(entity_name, include_permissions=False):
def create_federated_user(username, email, service_name, service_ident,
set_password_notification, metadata={}):
new_user = create_user_noverify(username, email)
set_password_notification, metadata={}, email_required=True):
new_user = create_user_noverify(username, email, email_required=email_required)
new_user.verified = True
new_user.save()

View file

@ -29,7 +29,7 @@ def get_federated_service_name(authentication_type):
LDAP_CERT_FILENAME = 'ldap.crt'
def get_users_handler(config, config_provider, override_config_dir):
def get_users_handler(config, _, override_config_dir):
""" Returns a users handler for the authentication configured in the given config object. """
authentication_type = config.get('AUTHENTICATION_TYPE', 'Database')
@ -48,7 +48,8 @@ def get_users_handler(config, config_provider, override_config_dir):
allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False)
return LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
allow_tls_fallback, secondary_user_rdns=secondary_user_rdns)
allow_tls_fallback, secondary_user_rdns=secondary_user_rdns,
requires_email=features.MAILING)
if authentication_type == 'JWT':
verify_url = config.get('JWT_VERIFY_ENDPOINT')
@ -59,7 +60,8 @@ def get_users_handler(config, config_provider, override_config_dir):
getuser_url = config.get('JWT_GETUSER_ENDPOINT', None)
return ExternalJWTAuthN(verify_url, query_url, getuser_url, issuer, override_config_dir,
config['HTTPCLIENT'], max_fresh_s)
config['HTTPCLIENT'], max_fresh_s,
requires_email=features.MAILING)
if authentication_type == 'Keystone':
auth_url = config.get('KEYSTONE_AUTH_URL')
@ -68,9 +70,9 @@ def get_users_handler(config, config_provider, override_config_dir):
keystone_admin_username = config.get('KEYSTONE_ADMIN_USERNAME')
keystone_admin_password = config.get('KEYSTONE_ADMIN_PASSWORD')
keystone_admin_tenant = config.get('KEYSTONE_ADMIN_TENANT')
return get_keystone_users(auth_version, auth_url, keystone_admin_username,
keystone_admin_password, keystone_admin_tenant, timeout)
keystone_admin_password, keystone_admin_tenant, timeout,
requires_email=features.MAILING)
raise RuntimeError('Unknown authentication type: %s' % authentication_type)

View file

@ -14,8 +14,8 @@ class ExternalJWTAuthN(FederatedUsers):
PUBLIC_KEY_FILENAME = 'jwt-authn.cert'
def __init__(self, verify_url, query_url, getuser_url, issuer, override_config_dir, http_client,
max_fresh_s, public_key_path=None):
super(ExternalJWTAuthN, self).__init__('jwtauthn')
max_fresh_s, public_key_path=None, requires_email=True):
super(ExternalJWTAuthN, self).__init__('jwtauthn', requires_email)
self.verify_url = verify_url
self.query_url = query_url
self.getuser_url = getuser_url
@ -23,6 +23,7 @@ class ExternalJWTAuthN(FederatedUsers):
self.issuer = issuer
self.client = http_client
self.max_fresh_s = max_fresh_s
self.requires_email = requires_email
default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME)
public_key_path = public_key_path or default_key_path
@ -48,11 +49,12 @@ class ExternalJWTAuthN(FederatedUsers):
if not 'sub' in payload:
raise Exception('Missing sub field in JWT')
if not 'email' in payload:
if self.requires_email and not 'email' in payload:
raise Exception('Missing email field in JWT')
# Parse out the username and email.
user_info = UserInformation(username=payload['sub'], email=payload['email'], id=payload['sub'])
user_info = UserInformation(username=payload['sub'], email=payload.get('email'),
id=payload['sub'])
return (user_info, None)
@ -67,7 +69,7 @@ class ExternalJWTAuthN(FederatedUsers):
query_results = []
for result in payload['results'][0:limit]:
user_info = UserInformation(username=result['username'], email=result['email'],
user_info = UserInformation(username=result['username'], email=result.get('email'),
id=result['username'])
query_results.append(user_info)
@ -83,10 +85,11 @@ class ExternalJWTAuthN(FederatedUsers):
if not 'sub' in payload:
raise Exception('Missing sub field in JWT')
if not 'email' in payload:
if self.requires_email and not 'email' in payload:
raise Exception('Missing email field in JWT')
user_info = UserInformation(username=payload['sub'], email=payload['email'], id=payload['sub'])
user_info = UserInformation(username=payload['sub'], email=payload.get('email'),
id=payload['sub'])
return (user_info, None)

View file

@ -53,15 +53,14 @@ class LDAPUsers(FederatedUsers):
_LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs'])
def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
allow_tls_fallback=False, secondary_user_rdns=None):
super(LDAPUsers, self).__init__('ldap')
allow_tls_fallback=False, secondary_user_rdns=None, requires_email=True):
super(LDAPUsers, self).__init__('ldap', requires_email)
self._ldap = LDAPConnectionBuilder(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback)
self._ldap_uri = ldap_uri
self._uid_attr = uid_attr
self._email_attr = email_attr
self._allow_tls_fallback = allow_tls_fallback
self._requires_email = requires_email
# Note: user_rdn is a list of RDN pieces (for historical reasons), and secondary_user_rds
# is a list of RDN strings.
@ -167,11 +166,11 @@ class LDAPUsers(FederatedUsers):
if not response.get(self._uid_attr):
return (None, 'Missing uid field "%s" in user record' % self._uid_attr)
if not response.get(self._email_attr):
if self._requires_email and not response.get(self._email_attr):
return (None, 'Missing mail field "%s" in user record' % self._email_attr)
username = response[self._uid_attr][0].decode('utf-8')
email = response[self._email_attr][0]
email = response.get(self._email_attr, [None])[0]
return (UserInformation(username=username, email=email, id=username), None)
def get_user(self, username_or_email):

View file

@ -12,8 +12,9 @@ UserInformation = namedtuple('UserInformation', ['username', 'email', 'id'])
class FederatedUsers(object):
""" Base class for all federated users systems. """
def __init__(self, federated_service):
def __init__(self, federated_service, requires_email):
self._federated_service = federated_service
self._requires_email = requires_email
@property
def federated_service(self):
@ -50,11 +51,13 @@ class FederatedUsers(object):
db_user = model.user.create_federated_user(valid_username, email, self._federated_service,
username,
set_password_notification=False)
set_password_notification=False,
email_required=self._requires_email)
else:
# Update the db attributes from the federated service.
db_user.email = email
db_user.save()
if email:
db_user.email = email
db_user.save()
return (db_user, None)

View file

@ -18,23 +18,27 @@ def _take(n, iterable):
def get_keystone_users(auth_version, auth_url, admin_username, admin_password, admin_tenant,
timeout=None):
timeout=None, requires_email=True):
if auth_version == 3:
return KeystoneV3Users(auth_url, admin_username, admin_password, admin_tenant, timeout)
return KeystoneV3Users(auth_url, admin_username, admin_password, admin_tenant, timeout,
requires_email)
else:
return KeystoneV2Users(auth_url, admin_username, admin_password, admin_tenant, timeout)
return KeystoneV2Users(auth_url, admin_username, admin_password, admin_tenant, timeout,
requires_email)
class KeystoneV2Users(FederatedUsers):
""" Delegates authentication to OpenStack Keystone V2. """
def __init__(self, auth_url, admin_username, admin_password, admin_tenant, timeout=None):
super(KeystoneV2Users, self).__init__('keystone')
def __init__(self, auth_url, admin_username, admin_password, admin_tenant, timeout=None,
requires_email=True):
super(KeystoneV2Users, self).__init__('keystone', requires_email)
self.auth_url = auth_url
self.admin_username = admin_username
self.admin_password = admin_password
self.admin_tenant = admin_tenant
self.timeout = timeout or DEFAULT_TIMEOUT
self.debug = os.environ.get('USERS_DEBUG') == '1'
self.requires_email = requires_email
def verify_credentials(self, username_or_email, password):
try:
@ -58,7 +62,11 @@ class KeystoneV2Users(FederatedUsers):
logger.exception('Keystone unauthorized admin')
return (None, 'Keystone admin credentials are invalid: %s' % kut.message)
return (UserInformation(username=username_or_email, email=user.email, id=user_id), None)
if self.requires_email and not hasattr(user, 'email'):
return (None, 'Missing email field for user %s' % user_id)
email = user.email if hasattr(user, 'email') else None
return (UserInformation(username=username_or_email, email=email, id=user_id), None)
def query_users(self, query, limit=20):
return (None, 'Unsupported in Keystone V2')
@ -69,14 +77,16 @@ class KeystoneV2Users(FederatedUsers):
class KeystoneV3Users(FederatedUsers):
""" Delegates authentication to OpenStack Keystone V3. """
def __init__(self, auth_url, admin_username, admin_password, admin_tenant, timeout=None):
super(KeystoneV3Users, self).__init__('keystone')
def __init__(self, auth_url, admin_username, admin_password, admin_tenant, timeout=None,
requires_email=True):
super(KeystoneV3Users, self).__init__('keystone', requires_email)
self.auth_url = auth_url
self.admin_username = admin_username
self.admin_password = admin_password
self.admin_tenant = admin_tenant
self.timeout = timeout or DEFAULT_TIMEOUT
self.debug = os.environ.get('USERS_DEBUG') == '1'
self.requires_email = requires_email
def verify_credentials(self, username_or_email, password):
try:
@ -85,6 +95,10 @@ class KeystoneV3Users(FederatedUsers):
debug=self.debug)
user_id = keystone_client.user_id
user = keystone_client.users.get(user_id)
if self.requires_email and not hasattr(user, 'email'):
return (None, 'Missing email field for user %s' % user_id)
return (self._user_info(user), None)
except KeystoneAuthorizationFailure as kaf:
logger.exception('Keystone auth failure for user: %s', username_or_email)
@ -101,12 +115,15 @@ class KeystoneV3Users(FederatedUsers):
if len(users_found) != 1:
return (None, 'Single user not found')
return (users_found[0], None)
user = users_found[0]
if self.requires_email and not user.email:
return (None, 'Missing email field for user %s' % user.id)
return (user, None)
@staticmethod
def _user_info(user):
# Because Keystone uses defined attributes...
email = user.email if hasattr(user, 'email') else ''
email = user.email if hasattr(user, 'email') else None
return UserInformation(user.name, email, user.id)
def query_users(self, query, limit=20):

View file

@ -69,7 +69,6 @@ class OrganizationList(ApiResource):
'description': 'Description of a new organization.',
'required': [
'name',
'email',
],
'properties': {
'name': {
@ -105,8 +104,12 @@ class OrganizationList(ApiResource):
msg = 'A user or organization with this name already exists'
raise request_error(message=msg)
if features.MAILING and not org_data.get('email'):
raise request_error(message='Email address is required')
try:
model.organization.create_organization(org_data['name'], org_data['email'], user)
model.organization.create_organization(org_data['name'], org_data.get('email'), user,
email_required=features.MAILING)
return 'Created', 201
except model.DataModelException as ex:
raise request_error(exception=ex)

View file

@ -210,7 +210,7 @@ class SuperUserList(ApiResource):
'CreateInstallUser': {
'id': 'CreateInstallUser',
'description': 'Data for creating a user',
'required': ['username', 'email'],
'required': ['username'],
'properties': {
'username': {
'type': 'string',
@ -253,15 +253,15 @@ class SuperUserList(ApiResource):
user_information = request.get_json()
if SuperUserPermission().can():
username = user_information['username']
email = user_information['email']
# Generate a temporary password for the user.
random = SystemRandom()
password = ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)])
# Create the user.
user = model.user.create_user(username, password, email, auto_verify=not features.MAILING)
username = user_information['username']
email = user_information.get('email')
user = model.user.create_user(username, password, email, auto_verify=not features.MAILING,
email_required=features.MAILING)
# If mailing is turned on, send the user a verification email.
if features.MAILING:

View file

@ -163,7 +163,6 @@ class User(ApiResource):
'required': [
'username',
'password',
'email',
],
'properties': {
'username': {
@ -355,9 +354,14 @@ class User(ApiResource):
if existing_user:
raise request_error(message='The username already exists')
if features.MAILING and not user_data.get('email'):
raise request_error(message='Email address is required')
try:
new_user = model.user.create_user(user_data['username'], user_data['password'],
user_data['email'], auto_verify=not features.MAILING)
user_data.get('email'),
auto_verify=not features.MAILING,
email_required=features.MAILING)
email_address_confirmed = handle_invite_code(invite_code, new_user)
if features.MAILING and not email_address_confirmed:

View file

@ -10,7 +10,7 @@
})
}]);
function OrgViewCtrl($scope, $routeParams, $timeout, ApiService, UIService, AvatarService) {
function OrgViewCtrl($scope, $routeParams, $timeout, ApiService, UIService, AvatarService, Config, Features) {
var orgname = $routeParams.orgname;
$scope.namespace = orgname;
@ -22,6 +22,9 @@
$scope.changeEmailInfo = null;
$scope.context = {};
$scope.Config = Config;
$scope.Features = Features;
$scope.orgScope = {
'changingOrganization': false,
'organizationEmail': ''

View file

@ -54,7 +54,7 @@
<span class="description">This will also be the namespace for your repositories. Must be alphanumeric, all lowercase and at least four characters long.</span>
</div>
<div class="form-group nested">
<div class="form-group nested" quay-require="['MAILING']">
<label for="orgName">Organization Email</label>
<div class="field-row">
<span class="field-container">

View file

@ -106,7 +106,8 @@
<td>
<span class="avatar" size="48" data="organization.avatar"></span>
<div class="help-text" ng-if="Config.AVATAR_KIND == 'local'">Avatar is generated based off the organization's name.</div>
<div class="help-text" ng-if="Config.AVATAR_KIND == 'gravatar'">Avatar is served by <a href="http://gravatar.com" rel="nofollow" target="_blank">Gravatar</a> based on the {{ organization.email }} e-mail address.</div>
<div class="help-text" ng-if="Config.AVATAR_KIND == 'gravatar' && Features.MAILING">Avatar is served by <a href="http://gravatar.com" rel="nofollow" target="_blank">Gravatar</a> based on the {{ organization.email }} e-mail address.</div>
<div class="help-text" ng-if="Config.AVATAR_KIND == 'gravatar' && !Features.MAILING">Avatar is served by <a href="http://gravatar.com" rel="nofollow" target="_blank">Gravatar</a> based on the unique ID: {{ organization.email }}.</div>
</td>
</tr>
<tr quay-show="Features.MAILING">

View file

@ -18,10 +18,13 @@ from initdb import setup_database_for_testing, finished_database_for_testing
_PORT_NUMBER = 5001
class JWTAuthTestCase(LiveServerTestCase):
class JWTAuthTestMixin(object):
maxDiff = None
@property
def emails(self):
raise NotImplementedError
@classmethod
def setUpClass(cls):
public_key = NamedTemporaryFile(delete=True)
@ -60,10 +63,14 @@ class JWTAuthTestCase(LiveServerTestCase):
for user in users:
if user['name'].startswith(query):
results.append({
result = {
'username': user['name'],
'email': user['email'],
})
}
if self.emails:
result['email'] = user['email']
results.append(result)
token_data = {
'iss': 'authy',
@ -95,7 +102,7 @@ class JWTAuthTestCase(LiveServerTestCase):
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
@ -124,7 +131,7 @@ class JWTAuthTestCase(LiveServerTestCase):
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(seconds=60),
'sub': user['name'],
'email': user['email']
'email': user['email'],
}
encoded = jwt.encode(token_data, private_key, 'RS256')
@ -151,7 +158,8 @@ class JWTAuthTestCase(LiveServerTestCase):
getuser_url = self.get_server_url() + '/user/get'
self.jwt_auth = ExternalJWTAuthN(verify_url, query_url, getuser_url, 'authy', '',
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name)
app.config['HTTPCLIENT'], 300, JWTAuthTestCase.public_key.name,
requires_email=self.emails)
def tearDown(self):
finished_database_for_testing(self)
@ -211,7 +219,7 @@ class JWTAuthTestCase(LiveServerTestCase):
self.assertEquals(1, len(results))
self.assertEquals('cooluser', results[0].username)
self.assertEquals('user@domain.com', results[0].email)
self.assertEquals('user@domain.com' if self.emails else None, results[0].email)
# Lookup `some`.
results, identifier, error_message = self.jwt_auth.query_users('some')
@ -220,7 +228,7 @@ class JWTAuthTestCase(LiveServerTestCase):
self.assertEquals(1, len(results))
self.assertEquals('some.neat.user', results[0].username)
self.assertEquals('neat@domain.com', results[0].email)
self.assertEquals('neat@domain.com' if self.emails else None, results[0].email)
# Lookup `unknown`.
results, identifier, error_message = self.jwt_auth.query_users('unknown')
@ -271,5 +279,17 @@ class JWTAuthTestCase(LiveServerTestCase):
self.assertIsNone(user)
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, LiveServerTestCase):
@property
def emails(self):
return False
class JWTAuthTestCase(JWTAuthTestMixin, LiveServerTestCase):
@property
def emails(self):
return True
if __name__ == '__main__':
unittest.main()

View file

@ -15,6 +15,10 @@ _PORT_NUMBER = 5001
class KeystoneAuthTestsMixin():
maxDiff = None
@property
def emails(self):
raise NotImplementedError
def create_app(self):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
@ -35,10 +39,12 @@ class KeystoneAuthTestsMixin():
def getuser(userid):
for user in users:
if user['username'] == userid:
user_data = {}
if self.emails:
user_data['email'] = userid + '@example.com'
return json.dumps({
'user': {
'email': userid + '@example.com',
}
'user': user_data
})
abort(404)
@ -47,15 +53,19 @@ class KeystoneAuthTestsMixin():
def getv3user(userid):
for user in users:
if user['username'] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
}
if self.emails:
user_data['email'] = user['username'] + '@example.com'
return json.dumps({
'user': {
"domain_id": "default",
"enabled": True,
"id": user['username'],
"links": {},
"name": user['username'],
"email": user['username'] + '@example.com',
}
'user': user_data
})
abort(404)
@ -209,24 +219,54 @@ class KeystoneAuthTestsMixin():
def test_cooluser(self):
(user, _) = self.keystone.verify_credentials('cooluser', 'password')
self.assertEquals(user.username, 'cooluser')
self.assertEquals(user.email, 'cooluser@example.com')
self.assertEquals(user.email, 'cooluser@example.com' if self.emails else None)
def test_neatuser(self):
(user, _) = self.keystone.verify_credentials('some.neat.user', 'foobar')
self.assertEquals(user.username, 'some.neat.user')
self.assertEquals(user.email, 'some.neat.user@example.com')
self.assertEquals(user.email, 'some.neat.user@example.com' if self.emails else None)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
@property
def emails(self):
return False
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant',
requires_email=False)
@property
def emails(self):
return False
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(2, self.get_server_url() + '/v2.0/auth',
'adminuser', 'adminpass', 'admintenant')
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
@property
def emails(self):
return True
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, LiveServerTestCase):
@property
def keystone(self):
return get_keystone_users(3, self.get_server_url() + '/v3',
'adminuser', 'adminpass', 'admintenant')
'adminuser', 'adminpass', 'admintenant',
requires_email=True)
def emails(self):
return True
def test_query(self):
# Lookup cool.

View file

@ -76,7 +76,14 @@ class TestLDAP(unittest.TestCase):
})
self.mockldap.start()
self.ldap = self._create_ldap(requires_email=True)
def tearDown(self):
self.mockldap.stop()
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def _create_ldap(self, requires_email=True):
base_dn = ['dc=quay', 'dc=io']
admin_dn = 'uid=testy,ou=employees,dc=quay,dc=io'
admin_passwd = 'password'
@ -86,15 +93,9 @@ class TestLDAP(unittest.TestCase):
secondary_user_rdns = ['ou=otheremployees']
ldap = LDAPUsers('ldap://localhost', base_dn, admin_dn, admin_passwd, user_rdn,
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns)
self.ldap = ldap
def tearDown(self):
self.mockldap.stop()
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
uid_attr, email_attr, secondary_user_rdns=secondary_user_rdns,
requires_email=requires_email)
return ldap
def test_invalid_admin_password(self):
base_dn = ['dc=quay', 'dc=io']
@ -144,10 +145,15 @@ class TestLDAP(unittest.TestCase):
self.assertEquals(err_msg, 'Invalid user')
def test_missing_mail(self):
(response, err_msg) = self.ldap.verify_and_link_user('nomail', 'somepass')
(response, err_msg) = self.ldap.get_user('nomail')
self.assertIsNone(response)
self.assertEquals('Missing mail field "mail" in user record', err_msg)
def test_missing_mail_allowed(self):
ldap = self._create_ldap(requires_email=False)
(response, _) = ldap.get_user('nomail')
self.assertEquals(response.username, 'nomail')
def test_confirm_different_username(self):
# Verify that the user is logged in and their username was adjusted.
(response, _) = self.ldap.verify_and_link_user('cool.user', 'somepass')

View file

@ -354,9 +354,10 @@ def _validate_ldap(config, password):
user_rdn = config.get('LDAP_USER_RDN', [])
uid_attr = config.get('LDAP_UID_ATTR', 'uid')
email_attr = config.get('LDAP_EMAIL_ATTR', 'mail')
requires_email = config.get('FEATURE_MAILING', True)
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
allow_tls_fallback)
allow_tls_fallback, requires_email=requires_email)
username = get_authenticated_user().username
(result, err_msg) = users.verify_credentials(username, password)
@ -388,7 +389,8 @@ def _validate_jwt(config, password):
users = ExternalJWTAuthN(verify_endpoint, query_endpoint, getuser_endpoint, issuer,
OVERRIDE_CONFIG_DIRECTORY,
app.config['HTTPCLIENT'],
app.config.get('JWT_AUTH_MAX_FRESH_S', 300))
app.config.get('JWT_AUTH_MAX_FRESH_S', 300),
requires_email=config.get('FEATURE_MAILING', True))
# Verify that the superuser exists. If not, raise an exception.
username = get_authenticated_user().username
@ -439,7 +441,9 @@ def _validate_keystone(config, password):
if not admin_tenant:
raise Exception('Missing admin tenant')
users = get_keystone_users(auth_version, auth_url, admin_username, admin_password, admin_tenant)
requires_email = config.get('FEATURE_MAILING', True)
users = get_keystone_users(auth_version, auth_url, admin_username, admin_password, admin_tenant,
requires_email)
# Verify that the superuser exists. If not, raise an exception.
username = get_authenticated_user().username

View file

@ -26,6 +26,9 @@ def validate_label_key(label_key):
def validate_email(email_address):
if not email_address:
return False
return bool(re.match(r'[^@]+@[^@]+\.[^@]+', email_address))