import ldap import logging import json import itertools import uuid import os import jwt from collections import namedtuple from datetime import datetime, timedelta import features from data import model from util.aes import AESCipher from util.validation import generate_valid_usernames logger = logging.getLogger(__name__) if os.environ.get('LDAP_DEBUG') == '1': logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) logger.addHandler(ch) def _get_federated_user(username, email, federated_service, create_new_user): db_user = model.user.verify_federated_login(federated_service, username) if not db_user: if not create_new_user: return (None, 'Invalid user') # We must create the user in our db valid_username = None for valid_username in generate_valid_usernames(username): if model.user.is_username_unique(valid_username): break if not valid_username: logger.error('Unable to pick a username for user: %s', username) return (None, 'Unable to pick a username. Please report this to your administrator.') db_user = model.user.create_federated_user(valid_username, email, federated_service, username, set_password_notification=False) else: # Update the db attributes from ldap db_user.email = email db_user.save() return (db_user, None) class ExternalJWTAuthN(object): """ Delegates authentication to a REST endpoint that returns JWTs. """ PUBLIC_KEY_FILENAME = 'jwt-authn.cert' def __init__(self, verify_url, issuer, override_config_dir, http_client, max_fresh_s, public_key_path=None): self.verify_url = verify_url self.issuer = issuer self.client = http_client self.max_fresh_s = max_fresh_s default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME) public_key_path = public_key_path or default_key_path if not os.path.exists(public_key_path): error_message = ('JWT Authentication public key file "%s" not found in directory %s' % (ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir)) raise Exception(error_message) with open(public_key_path) as public_key_file: self.public_key = public_key_file.read() def verify_user(self, username_or_email, password, create_new_user=True): result = self.client.get(self.verify_url, timeout=2, auth=(username_or_email, password)) if result.status_code != 200: return (None, result.text or 'Invalid username or password') try: result_data = json.loads(result.text) except ValueError: raise Exception('Returned JWT Authentication body does not contain JSON') # Load the JWT returned. encoded = result_data.get('token', '') try: payload = jwt.decode(encoded, self.public_key, algorithms=['RS256'], audience='quay.io/jwtauthn', issuer=self.issuer) except jwt.InvalidTokenError: logger.exception('Exception when decoding returned JWT') return (None, 'Invalid username or password') if not 'sub' in payload: raise Exception('Missing username field in JWT') if not 'email' in payload: raise Exception('Missing email field in JWT') if not 'exp' in payload: raise Exception('Missing exp field in JWT') # Verify that the expiration is no more than self.max_fresh_s seconds in the future. expiration = datetime.utcfromtimestamp(payload['exp']) if expiration > datetime.utcnow() + timedelta(seconds=self.max_fresh_s): logger.debug('Payload expiration is outside of the %s second window: %s', self.max_fresh_s, payload['exp']) return (None, 'Invalid username or password') # Parse out the username and email. return _get_federated_user(payload['sub'], payload['email'], 'jwtauthn', create_new_user) def confirm_existing_user(self, username, password): db_user = model.user.get_user(username) if not db_user: return (None, 'Invalid user') federated_login = model.user.lookup_federated_login(db_user, 'jwtauthn') if not federated_login: return (None, 'Invalid user') return self.verify_user(federated_login.service_ident, password, create_new_user=False) class DatabaseUsers(object): def verify_user(self, username_or_email, password): """ Simply delegate to the model implementation. """ result = model.user.verify_user(username_or_email, password) if not result: return (None, 'Invalid Username or Password') return (result, None) def confirm_existing_user(self, username, password): return self.verify_user(username, password) class LDAPConnection(object): def __init__(self, ldap_uri, user_dn, user_pw): self._ldap_uri = ldap_uri self._user_dn = user_dn self._user_pw = user_pw self._conn = None def __enter__(self): trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0 self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) self._conn.set_option(ldap.OPT_REFERRALS, 1) try: self._conn.simple_bind_s(self._user_dn, self._user_pw) except ldap.INVALID_CREDENTIALS: logger.exception('LDAP admin dn or password are invalid') return None return self._conn def __exit__(self, exc_type, value, tb): self._conn.unbind_s() class LDAPUsers(object): _LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs']) def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr): self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd) self._ldap_uri = ldap_uri self._base_dn = base_dn self._user_rdn = user_rdn self._uid_attr = uid_attr self._email_attr = email_attr def _get_ldap_referral_dn(self, referral_exception): logger.debug('Got referral: %s', referral_exception.args[0]) if not referral_exception.args[0] or not referral_exception.args[0].get('info'): logger.debug('LDAP referral missing info block') return None referral_info = referral_exception.args[0]['info'] if not referral_info.startswith('Referral:\n'): logger.debug('LDAP referral missing Referral header') return None referral_uri = referral_info[len('Referral:\n'):] if not referral_uri.startswith('ldap:///'): logger.debug('LDAP referral URI does not start with ldap:///') return None referral_dn = referral_uri[len('ldap:///'):] return referral_dn def _ldap_user_search(self, username_or_email): with self._ldap_conn as conn: if conn is None: return (None, 'LDAP Admin dn or password is invalid') logger.debug('Incoming username or email param: %s', username_or_email.__repr__()) user_search_dn = ','.join(self._user_rdn + self._base_dn) query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr, username_or_email) logger.debug('Conducting user search: %s under %s', query, user_search_dn) try: pairs = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8')) except ldap.REFERRAL as re: referral_dn = self._get_ldap_referral_dn(re) if not referral_dn: return (None, 'Failed to follow referral when looking up username') try: subquery = u'(%s=%s)' % (self._uid_attr, username_or_email) pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery) except ldap.LDAPError: logger.exception('LDAP referral search exception') return (None, 'Username not found') except ldap.LDAPError: logger.exception('LDAP search exception') return (None, 'Username not found') logger.debug('Found matching pairs: %s', pairs) results = [LDAPUsers._LDAPResult(*pair) for pair in pairs] # Filter out pairs without DNs. Some LDAP impls will return such # pairs. with_dns = [result for result in results if result.dn] if len(with_dns) < 1: return (None, 'Username not found') # If we have found a single pair, then return it. if len(with_dns) == 1: return (with_dns[0], None) # Otherwise, there are multiple pairs with DNs, so find the one with the mail # attribute (if any). with_mail = [result for result in results if result.attrs.get(self._email_attr)] return (with_mail[0] if with_mail else with_dns[0], None) def confirm_existing_user(self, username, password): """ Verify the username and password by looking up the *LDAP* username and confirming the password. """ db_user = model.user.get_user(username) if not db_user: return (None, 'Invalid user') federated_login = model.user.lookup_federated_login(db_user, 'ldap') if not federated_login: return (None, 'Invalid user') return self.verify_user(federated_login.service_ident, password, create_new_user=False) def verify_user(self, username_or_email, password, create_new_user=True): """ Verify the credentials with LDAP and if they are valid, create or update the user in our database. """ # Make sure that even if the server supports anonymous binds, we don't allow it if not password: return (None, 'Anonymous binding not allowed') (found_user, err_msg) = self._ldap_user_search(username_or_email) if found_user is None: return (None, err_msg) found_dn, found_response = found_user logger.debug('Found user for LDAP username %s; validating password', username_or_email) logger.debug('DN %s found: %s', found_dn, found_response) # First validate the password by binding as the user try: with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')): pass except ldap.REFERRAL as re: referral_dn = self._get_ldap_referral_dn(re) if not referral_dn: return (None, 'Invalid username') try: with LDAPConnection(self._ldap_uri, referral_dn, password.encode('utf-8')): pass except ldap.INVALID_CREDENTIALS: logger.exception('Invalid LDAP credentials') return (None, 'Invalid password') except ldap.INVALID_CREDENTIALS: logger.exception('Invalid LDAP credentials') return (None, 'Invalid password') # Now check if we have a federated login for this user if not found_response.get(self._uid_attr): return (None, 'Missing uid field "%s" in user record' % self._uid_attr) if not found_response.get(self._email_attr): return (None, 'Missing mail field "%s" in user record' % self._email_attr) username = found_response[self._uid_attr][0].decode('utf-8') email = found_response[self._email_attr][0] return _get_federated_user(username, email, 'ldap', create_new_user) class UserAuthentication(object): def __init__(self, app=None, override_config_dir=None): self.app_secret_key = None self.app = app if app is not None: self.state = self.init_app(app, override_config_dir) else: self.state = None def init_app(self, app, override_config_dir): self.app_secret_key = app.config['SECRET_KEY'] authentication_type = app.config.get('AUTHENTICATION_TYPE', 'Database') if authentication_type == 'Database': users = DatabaseUsers() elif authentication_type == 'LDAP': ldap_uri = app.config.get('LDAP_URI', 'ldap://localhost') base_dn = app.config.get('LDAP_BASE_DN') admin_dn = app.config.get('LDAP_ADMIN_DN') admin_passwd = app.config.get('LDAP_ADMIN_PASSWD') user_rdn = app.config.get('LDAP_USER_RDN', []) uid_attr = app.config.get('LDAP_UID_ATTR', 'uid') email_attr = app.config.get('LDAP_EMAIL_ATTR', 'mail') users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr) elif authentication_type == 'JWT': verify_url = app.config.get('JWT_VERIFY_ENDPOINT') issuer = app.config.get('JWT_AUTH_ISSUER') max_fresh_s = app.config.get('JWT_AUTH_MAX_FRESH_S', 300) users = ExternalJWTAuthN(verify_url, issuer, override_config_dir, max_fresh_s, app.config['HTTPCLIENT']) else: raise RuntimeError('Unknown authentication type: %s' % authentication_type) # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['authentication'] = users return users def _get_secret_key(self): """ Returns the secret key to use for encrypting and decrypting. """ secret_key = None # First try parsing the key as an int. try: big_int = int(self.app_secret_key) secret_key = str(bytearray.fromhex('{:02x}'.format(big_int))) except ValueError: pass # Next try parsing it as an UUID. if secret_key is None: try: secret_key = uuid.UUID(self.app_secret_key).bytes except ValueError: pass if secret_key is None: secret_key = str(bytearray(map(ord, self.app_secret_key))) # Otherwise, use the bytes directly. return ''.join(itertools.islice(itertools.cycle(secret_key), 32)) def encrypt_user_password(self, password): """ Returns an encrypted version of the user's password. """ data = { 'password': password } message = json.dumps(data) cipher = AESCipher(self._get_secret_key()) return cipher.encrypt(message) def _decrypt_user_password(self, encrypted): """ Attempts to decrypt the given password and returns it. """ cipher = AESCipher(self._get_secret_key()) try: message = cipher.decrypt(encrypted) except ValueError: return None except TypeError: return None try: data = json.loads(message) except ValueError: return None return data.get('password', encrypted) def confirm_existing_user(self, username, password): """ Verifies that the given password matches to the given DB username. Unlike verify_user, this call first translates the DB user via the FederatedLogin table (where applicable). """ return self.state.confirm_existing_user(username, password) def verify_user(self, username_or_email, password, basic_auth=False): # First try to decode the password as a signed token. if basic_auth: decrypted = self._decrypt_user_password(password) if decrypted is None: # This is a normal password. if features.REQUIRE_ENCRYPTED_BASIC_AUTH: msg = ('Client login with unecrypted passwords is disabled. Please generate an ' + 'encrypted password in the user admin panel for use here.') return (None, msg) else: password = decrypted (result, err_msg) = self.state.verify_user(username_or_email, password) if not result: return (result, err_msg) if not result.enabled: return (None, 'This user has been disabled. Please contact your administrator.') return (result, err_msg) def __getattr__(self, name): return getattr(self.state, name, None)