7df8ed4a60
Change SecScanAPI to use a uri creation func instead of test context Pass config provider through validator context Remove app config dependency for validators
69 lines
2.7 KiB
Python
69 lines
2.7 KiB
Python
import os
|
|
import ldap
|
|
import subprocess
|
|
|
|
from data.users import LDAP_CERT_FILENAME
|
|
from data.users.externalldap import LDAPConnection, LDAPUsers
|
|
from util.config.validators import BaseValidator, ConfigValidationException
|
|
|
|
class LDAPValidator(BaseValidator):
|
|
name = "ldap"
|
|
|
|
@classmethod
|
|
def validate(cls, validator_context):
|
|
""" Validates the LDAP connection. """
|
|
config = validator_context.config
|
|
user = validator_context.user
|
|
user_password = validator_context.user_password
|
|
config_provider = validator_context.config_provider
|
|
|
|
if config.get('AUTHENTICATION_TYPE', 'Database') != 'LDAP':
|
|
return
|
|
|
|
# If there is a custom LDAP certificate, then reinstall the certificates for the container.
|
|
if config_provider.volume_file_exists(LDAP_CERT_FILENAME):
|
|
subprocess.check_call([os.path.join(config_provider.get_config_root(), '../init/certs_install.sh')])
|
|
|
|
# Note: raises ldap.INVALID_CREDENTIALS on failure
|
|
admin_dn = config.get('LDAP_ADMIN_DN')
|
|
admin_passwd = config.get('LDAP_ADMIN_PASSWD')
|
|
|
|
if not admin_dn:
|
|
raise ConfigValidationException('Missing Admin DN for LDAP configuration')
|
|
|
|
if not admin_passwd:
|
|
raise ConfigValidationException('Missing Admin Password for LDAP configuration')
|
|
|
|
ldap_uri = config.get('LDAP_URI', 'ldap://localhost')
|
|
if not ldap_uri.startswith('ldap://') and not ldap_uri.startswith('ldaps://'):
|
|
raise ConfigValidationException('LDAP URI must start with ldap:// or ldaps://')
|
|
|
|
allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False)
|
|
|
|
try:
|
|
with LDAPConnection(ldap_uri, admin_dn, admin_passwd, allow_tls_fallback):
|
|
pass
|
|
except ldap.LDAPError as ex:
|
|
values = ex.args[0] if ex.args else {}
|
|
if not isinstance(values, dict):
|
|
raise ConfigValidationException(str(ex.args))
|
|
|
|
raise ConfigValidationException(values.get('desc', 'Unknown error'))
|
|
|
|
# Verify that the superuser exists. If not, raise an exception.
|
|
base_dn = config.get('LDAP_BASE_DN')
|
|
user_rdn = config.get('LDAP_USER_RDN', [])
|
|
uid_attr = config.get('LDAP_UID_ATTR', 'uid')
|
|
email_attr = config.get('LDAP_EMAIL_ATTR', 'mail')
|
|
requires_email = config.get('FEATURE_MAILING', True)
|
|
|
|
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr,
|
|
allow_tls_fallback, requires_email=requires_email)
|
|
|
|
username = user.username
|
|
(result, err_msg) = users.verify_credentials(username, user_password)
|
|
if not result:
|
|
msg = ('Verification of superuser %s failed: %s. \n\nThe user either does not exist ' +
|
|
'in the remote authentication system ' +
|
|
'OR LDAP auth is misconfigured.') % (username, err_msg)
|
|
raise ConfigValidationException(msg)
|