Pull out ldap validation into validator class
This commit is contained in:
parent
2d64cf3000
commit
c55ddf7341
3 changed files with 75 additions and 61 deletions
|
@ -1,11 +1,9 @@
|
|||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from StringIO import StringIO
|
||||
from hashlib import sha1
|
||||
|
||||
import ldap
|
||||
import peewee
|
||||
|
||||
from flask import Flask
|
||||
|
@ -32,6 +30,7 @@ from util.config.validators.validate_database import DatabaseValidator
|
|||
from util.config.validators.validate_redis import RedisValidator
|
||||
from util.config.validators.validate_storage import StorageValidator
|
||||
from util.config.validators.validate_email import EmailValidator
|
||||
from util.config.validators.validate_ldap import LDAPValidator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -239,60 +238,6 @@ def _validate_ssl(config, user_obj, _):
|
|||
raise ConfigValidationException('SSL private key failed to validate: %s' % kie.message)
|
||||
|
||||
|
||||
def _validate_ldap(config, user_obj, password):
|
||||
""" Validates the LDAP connection. """
|
||||
if config.get('AUTHENTICATION_TYPE', 'Database') != 'LDAP':
|
||||
return
|
||||
|
||||
# If there is a custom LDAP certificate, then reinstall the certificates for the container.
|
||||
if config_provider.volume_file_exists(LDAP_CERT_FILENAME):
|
||||
subprocess.check_call(['/conf/init/certs_install.sh'])
|
||||
|
||||
# Note: raises ldap.INVALID_CREDENTIALS on failure
|
||||
admin_dn = config.get('LDAP_ADMIN_DN')
|
||||
admin_passwd = config.get('LDAP_ADMIN_PASSWD')
|
||||
|
||||
if not admin_dn:
|
||||
raise ConfigValidationException('Missing Admin DN for LDAP configuration')
|
||||
|
||||
if not admin_passwd:
|
||||
raise ConfigValidationException('Missing Admin Password for LDAP configuration')
|
||||
|
||||
ldap_uri = config.get('LDAP_URI', 'ldap://localhost')
|
||||
if not ldap_uri.startswith('ldap://') and not ldap_uri.startswith('ldaps://'):
|
||||
raise ConfigValidationException('LDAP URI must start with ldap:// or ldaps://')
|
||||
|
||||
allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False)
|
||||
|
||||
try:
|
||||
with LDAPConnection(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback):
|
||||
pass
|
||||
except ldap.LDAPError as ex:
|
||||
values = ex.args[0] if ex.args else {}
|
||||
if not isinstance(values, dict):
|
||||
raise ConfigValidationException(str(ex.args))
|
||||
|
||||
raise ConfigValidationException(values.get('desc', 'Unknown error'))
|
||||
|
||||
# Verify that the superuser exists. If not, raise an exception.
|
||||
base_dn = config.get('LDAP_BASE_DN')
|
||||
user_rdn = config.get('LDAP_USER_RDN', [])
|
||||
uid_attr = config.get('LDAP_UID_ATTR', 'uid')
|
||||
email_attr = config.get('LDAP_EMAIL_ATTR', 'mail')
|
||||
requires_email = config.get('FEATURE_MAILING', True)
|
||||
|
||||
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
|
||||
allow_tls_fallback, requires_email=requires_email)
|
||||
|
||||
username = user_obj.username
|
||||
(result, err_msg) = users.verify_credentials(username, password)
|
||||
if not result:
|
||||
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not exist ' +
|
||||
'in the remote authentication system ' +
|
||||
'OR LDAP auth is misconfigured.') % (username, err_msg)
|
||||
raise ConfigValidationException(msg)
|
||||
|
||||
|
||||
def _validate_jwt(config, user_obj, password):
|
||||
""" Validates the JWT authentication system. """
|
||||
if config.get('AUTHENTICATION_TYPE', 'Database') != 'JWT':
|
||||
|
@ -473,7 +418,7 @@ VALIDATORS = {
|
|||
'bitbucket-trigger': _validate_bitbucket,
|
||||
'google-login': _validate_google_login,
|
||||
'ssl': _validate_ssl,
|
||||
'ldap': _validate_ldap,
|
||||
LDAPValidator.name: LDAPValidator.validate,
|
||||
'jwt': _validate_jwt,
|
||||
'keystone': _validate_keystone,
|
||||
'signer': _validate_signer,
|
||||
|
|
64
util/config/validators/test/test_validate_ldap.py
Normal file
64
util/config/validators/test/test_validate_ldap.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import pytest
|
||||
|
||||
from util.config.validators import ConfigValidationException
|
||||
from util.config.validators.validate_ldap import LDAPValidator
|
||||
from util.morecollections import AttrDict
|
||||
|
||||
from test.test_ldap import mock_ldap
|
||||
|
||||
|
||||
@pytest.mark.parametrize('unvalidated_config', [
|
||||
({}),
|
||||
({'AUTHENTICATION_TYPE': 'Database'}),
|
||||
])
|
||||
def test_validate_noop(unvalidated_config):
|
||||
LDAPValidator.validate(unvalidated_config, None, None)
|
||||
|
||||
@pytest.mark.parametrize('unvalidated_config', [
|
||||
({'AUTHENTICATION_TYPE': 'LDAP'}),
|
||||
({'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_ADMIN_DN': 'foo'}),
|
||||
])
|
||||
def test_invalid_config(unvalidated_config):
|
||||
with pytest.raises(ConfigValidationException):
|
||||
LDAPValidator.validate(unvalidated_config, None, None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('uri', [
|
||||
'foo',
|
||||
'http://foo',
|
||||
'ldap:foo',
|
||||
])
|
||||
def test_invalid_uri(uri):
|
||||
config = {}
|
||||
config['AUTHENTICATION_TYPE'] = 'LDAP'
|
||||
config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io']
|
||||
config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
config['LDAP_ADMIN_PASSWD'] = 'password'
|
||||
config['LDAP_USER_RDN'] = ['ou=employees']
|
||||
config['LDAP_URI'] = uri
|
||||
|
||||
with pytest.raises(ConfigValidationException):
|
||||
LDAPValidator.validate(config, None, None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('username, password, expected_exception', [
|
||||
('invaliduser', 'invalidpass', ConfigValidationException),
|
||||
('someuser', 'invalidpass', ConfigValidationException),
|
||||
('invaliduser', 'somepass', ConfigValidationException),
|
||||
('someuser', 'somepass', None),
|
||||
])
|
||||
def test_validated_ldap(username, password, expected_exception):
|
||||
config = {}
|
||||
config['AUTHENTICATION_TYPE'] = 'LDAP'
|
||||
config['LDAP_BASE_DN'] = ['dc=quay', 'dc=io']
|
||||
config['LDAP_ADMIN_DN'] = 'uid=testy,ou=employees,dc=quay,dc=io'
|
||||
config['LDAP_ADMIN_PASSWD'] = 'password'
|
||||
config['LDAP_USER_RDN'] = ['ou=employees']
|
||||
|
||||
if expected_exception is not None:
|
||||
with pytest.raises(ConfigValidationException):
|
||||
with mock_ldap():
|
||||
LDAPValidator.validate(config, AttrDict(dict(username=username)), password)
|
||||
else:
|
||||
with mock_ldap():
|
||||
LDAPValidator.validate(config, AttrDict(dict(username=username)), password)
|
|
@ -1,5 +1,10 @@
|
|||
from app import app
|
||||
from util.config.validators import BaseValidator
|
||||
import ldap
|
||||
import subprocess
|
||||
|
||||
from app import app, config_provider
|
||||
from data.users import LDAP_CERT_FILENAME
|
||||
from data.users.externalldap import LDAPConnection, LDAPUsers
|
||||
from util.config.validators import BaseValidator, ConfigValidationException
|
||||
|
||||
class LDAPValidator(BaseValidator):
|
||||
name = "ldap"
|
||||
|
@ -50,8 +55,8 @@ class LDAPValidator(BaseValidator):
|
|||
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
|
||||
allow_tls_fallback, requires_email=requires_email)
|
||||
|
||||
username = user_obj.username
|
||||
(result, err_msg) = users.verify_credentials(username, password)
|
||||
username = user.username
|
||||
(result, err_msg) = users.verify_credentials(username, user_password)
|
||||
if not result:
|
||||
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not exist ' +
|
||||
'in the remote authentication system ' +
|
||||
|
|
Reference in a new issue