import logging import itertools import json import uuid import features from data import model from data.users.database import DatabaseUsers from data.users.externalldap import LDAPUsers from data.users.externaljwt import ExternalJWTAuthN from data.users.keystone import KeystoneUsers from util.security.aes import AESCipher logger = logging.getLogger(__name__) def get_federated_service_name(authentication_type): if authentication_type == 'LDAP': return 'ldap' if authentication_type == 'JWT': return 'jwtauthn' if authentication_type == 'Keystone': return 'keystone' raise Exception('Unknown auth type: %s' % authentication_type) LDAP_CERT_FILENAME = 'ldap.crt' def get_users_handler(config, config_provider, override_config_dir): """ Returns a users handler for the authentication configured in the given config object. """ authentication_type = config.get('AUTHENTICATION_TYPE', 'Database') if authentication_type == 'Database': return DatabaseUsers() if authentication_type == 'LDAP': ldap_uri = config.get('LDAP_URI', 'ldap://localhost') base_dn = config.get('LDAP_BASE_DN') admin_dn = config.get('LDAP_ADMIN_DN') admin_passwd = config.get('LDAP_ADMIN_PASSWD') user_rdn = config.get('LDAP_USER_RDN', []) uid_attr = config.get('LDAP_UID_ATTR', 'uid') email_attr = config.get('LDAP_EMAIL_ATTR', 'mail') secondary_user_rdns = config.get('LDAP_SECONDARY_USER_RDNS', []) allow_tls_fallback = config.get('LDAP_ALLOW_INSECURE_FALLBACK', False) return LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr, allow_tls_fallback, secondary_user_rdns=secondary_user_rdns) if authentication_type == 'JWT': verify_url = config.get('JWT_VERIFY_ENDPOINT') issuer = config.get('JWT_AUTH_ISSUER') max_fresh_s = config.get('JWT_AUTH_MAX_FRESH_S', 300) return ExternalJWTAuthN(verify_url, issuer, override_config_dir, config['HTTPCLIENT'], max_fresh_s) if authentication_type == 'Keystone': auth_url = config.get('KEYSTONE_AUTH_URL') keystone_admin_username = config.get('KEYSTONE_ADMIN_USERNAME') keystone_admin_password = config.get('KEYSTONE_ADMIN_PASSWORD') keystone_admin_tenant = config.get('KEYSTONE_ADMIN_TENANT') return KeystoneUsers(auth_url, keystone_admin_username, keystone_admin_password, keystone_admin_tenant) raise RuntimeError('Unknown authentication type: %s' % authentication_type) class UserAuthentication(object): def __init__(self, app=None, config_provider=None, override_config_dir=None): self.app_secret_key = None self.app = app if app is not None: self.state = self.init_app(app, config_provider, override_config_dir) else: self.state = None def init_app(self, app, config_provider, override_config_dir): self.app_secret_key = app.config['SECRET_KEY'] users = get_users_handler(app.config, config_provider, override_config_dir) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['authentication'] = users return users def _get_secret_key(self): """ Returns the secret key to use for encrypting and decrypting. """ secret_key = None # First try parsing the key as an int. try: big_int = int(self.app_secret_key) secret_key = str(bytearray.fromhex('{:02x}'.format(big_int))) except ValueError: pass # Next try parsing it as an UUID. if secret_key is None: try: secret_key = uuid.UUID(self.app_secret_key).bytes except ValueError: pass if secret_key is None: secret_key = str(bytearray(map(ord, self.app_secret_key))) # Otherwise, use the bytes directly. return ''.join(itertools.islice(itertools.cycle(secret_key), 32)) def encrypt_user_password(self, password): """ Returns an encrypted version of the user's password. """ data = { 'password': password } message = json.dumps(data) cipher = AESCipher(self._get_secret_key()) return cipher.encrypt(message) def _decrypt_user_password(self, encrypted): """ Attempts to decrypt the given password and returns it. """ cipher = AESCipher(self._get_secret_key()) try: message = cipher.decrypt(encrypted) except ValueError: return None except TypeError: return None try: data = json.loads(message) except ValueError: return None return data.get('password', encrypted) def confirm_existing_user(self, username, password): """ Verifies that the given password matches to the given DB username. Unlike verify_credentials, this call first translates the DB user via the FederatedLogin table (where applicable). """ return self.state.confirm_existing_user(username, password) def verify_credentials(self, username_or_email, password): """ Verifies that the given username and password credentials are valid. """ return self.state.verify_credentials(username_or_email, password) def verify_and_link_user(self, username_or_email, password, basic_auth=False): """ Verifies that the given username and password credentials are valid and, if so, creates or links the database user to the federated identity. """ # First try to decode the password as a signed token. if basic_auth: decrypted = self._decrypt_user_password(password) if decrypted is None: # This is a normal password. if features.REQUIRE_ENCRYPTED_BASIC_AUTH: msg = ('Client login with unencrypted passwords is disabled. Please generate an ' + 'encrypted password in the user admin panel for use here.') return (None, msg) else: password = decrypted (result, err_msg) = self.state.verify_and_link_user(username_or_email, password) if not result: return (result, err_msg) if not result.enabled: return (None, 'This user has been disabled. Please contact your administrator.') return (result, err_msg) def __getattr__(self, name): return getattr(self.state, name, None)