f6fea27c12
Currently, we use the Quay username via `verify_user` when we go to create the encrypted password. This is only correct if Quay has not generated its own different username for the LDAP user, and fails if it has. We therefore add a new method `confirm_existing_user`, which looks up the federated login for the LDAP user and then runs the auth flow using that username.
280 lines
8.9 KiB
Python
280 lines
8.9 KiB
Python
import ldap
|
|
import logging
|
|
import json
|
|
import itertools
|
|
import uuid
|
|
import struct
|
|
import os
|
|
|
|
from util.aes import AESCipher
|
|
from util.validation import generate_valid_usernames
|
|
from data import model
|
|
|
|
logger = logging.getLogger(__name__)
|
|
if os.environ.get('LDAP_DEBUG') == '1':
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.DEBUG)
|
|
|
|
logger.addHandler(ch)
|
|
|
|
class DatabaseUsers(object):
|
|
def verify_user(self, username_or_email, password):
|
|
""" Simply delegate to the model implementation. """
|
|
result = model.verify_user(username_or_email, password)
|
|
if not result:
|
|
return (None, 'Invalid Username or Password')
|
|
|
|
return (result, None)
|
|
|
|
def confirm_existing_user(self, username, password):
|
|
return self.verify_user(username, password)
|
|
|
|
def user_exists(self, username):
|
|
return model.get_user(username) is not None
|
|
|
|
|
|
class LDAPConnection(object):
|
|
def __init__(self, ldap_uri, user_dn, user_pw):
|
|
self._ldap_uri = ldap_uri
|
|
self._user_dn = user_dn
|
|
self._user_pw = user_pw
|
|
self._conn = None
|
|
|
|
def __enter__(self):
|
|
trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0
|
|
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
|
|
self._conn.set_option(ldap.OPT_REFERRALS, 1)
|
|
self._conn.simple_bind_s(self._user_dn, self._user_pw)
|
|
|
|
return self._conn
|
|
|
|
def __exit__(self, exc_type, value, tb):
|
|
self._conn.unbind_s()
|
|
|
|
|
|
class LDAPUsers(object):
|
|
def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr):
|
|
self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd)
|
|
self._ldap_uri = ldap_uri
|
|
self._base_dn = base_dn
|
|
self._user_rdn = user_rdn
|
|
self._uid_attr = uid_attr
|
|
self._email_attr = email_attr
|
|
|
|
def _ldap_user_search(self, username_or_email):
|
|
with self._ldap_conn as conn:
|
|
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
|
|
user_search_dn = ','.join(self._user_rdn + self._base_dn)
|
|
query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr,
|
|
username_or_email)
|
|
|
|
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
|
|
try:
|
|
pairs = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8'))
|
|
except ldap.LDAPError:
|
|
logger.exception('LDAP search exception')
|
|
return None
|
|
|
|
logger.debug('Found matching pairs: %s', pairs)
|
|
if len(pairs) < 1:
|
|
return None
|
|
|
|
for pair in pairs:
|
|
if pair[0] is not None:
|
|
logger.debug('Found user: %s', pair)
|
|
return pair
|
|
|
|
return None
|
|
|
|
def confirm_existing_user(self, username, password):
|
|
""" Verify the username and password by looking up the *LDAP* username and confirming the
|
|
password.
|
|
"""
|
|
db_user = model.get_user(username)
|
|
if not db_user:
|
|
return (None, 'Invalid user')
|
|
|
|
federated_login = model.lookup_federated_login(db_user, 'ldap')
|
|
if not federated_login:
|
|
return (None, 'Invalid user')
|
|
|
|
return self.verify_user(federated_login.service_ident, password, create_new_user=False)
|
|
|
|
def verify_user(self, username_or_email, password, create_new_user=True):
|
|
""" Verify the credentials with LDAP and if they are valid, create or update the user
|
|
in our database. """
|
|
|
|
# Make sure that even if the server supports anonymous binds, we don't allow it
|
|
if not password:
|
|
return (None, 'Anonymous binding not allowed')
|
|
|
|
found_user = self._ldap_user_search(username_or_email)
|
|
|
|
if found_user is None:
|
|
return (None, 'Username not found')
|
|
|
|
found_dn, found_response = found_user
|
|
|
|
# First validate the password by binding as the user
|
|
logger.debug('Found user %s; validating password', username_or_email)
|
|
try:
|
|
with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')):
|
|
pass
|
|
except ldap.INVALID_CREDENTIALS:
|
|
logger.exception('Invalid LDAP credentials')
|
|
return (None, 'Invalid password')
|
|
|
|
# Now check if we have a federated login for this user
|
|
if not found_response.get(self._uid_attr):
|
|
return (None, 'Missing uid field "%s" in user record' % self._uid_attr)
|
|
|
|
if not found_response.get(self._email_attr):
|
|
return (None, 'Missing mail field "%s" in user record' % self._email_attr)
|
|
|
|
username = found_response[self._uid_attr][0].decode('utf-8')
|
|
email = found_response[self._email_attr][0]
|
|
db_user = model.verify_federated_login('ldap', username)
|
|
|
|
if not db_user:
|
|
if not create_new_user:
|
|
return (None, 'Invalid user')
|
|
|
|
# We must create the user in our db
|
|
valid_username = None
|
|
for valid_username in generate_valid_usernames(username):
|
|
if model.is_username_unique(valid_username):
|
|
break
|
|
|
|
if not valid_username:
|
|
logger.error('Unable to pick a username for user: %s', username)
|
|
return (None, 'Unable to pick a username. Please report this to your administrator.')
|
|
|
|
db_user = model.create_federated_user(valid_username, email, 'ldap', username,
|
|
set_password_notification=False)
|
|
else:
|
|
# Update the db attributes from ldap
|
|
db_user.email = email
|
|
db_user.save()
|
|
|
|
return (db_user, None)
|
|
|
|
def user_exists(self, username):
|
|
found_user = self._ldap_user_search(username)
|
|
return found_user is not None
|
|
|
|
|
|
|
|
class UserAuthentication(object):
|
|
def __init__(self, app=None):
|
|
self.app = app
|
|
if app is not None:
|
|
self.state = self.init_app(app)
|
|
else:
|
|
self.state = None
|
|
|
|
def init_app(self, app):
|
|
authentication_type = app.config.get('AUTHENTICATION_TYPE', 'Database')
|
|
|
|
if authentication_type == 'Database':
|
|
users = DatabaseUsers()
|
|
elif authentication_type == 'LDAP':
|
|
ldap_uri = app.config.get('LDAP_URI', 'ldap://localhost')
|
|
base_dn = app.config.get('LDAP_BASE_DN')
|
|
admin_dn = app.config.get('LDAP_ADMIN_DN')
|
|
admin_passwd = app.config.get('LDAP_ADMIN_PASSWD')
|
|
user_rdn = app.config.get('LDAP_USER_RDN', [])
|
|
uid_attr = app.config.get('LDAP_UID_ATTR', 'uid')
|
|
email_attr = app.config.get('LDAP_EMAIL_ATTR', 'mail')
|
|
|
|
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr)
|
|
|
|
else:
|
|
raise RuntimeError('Unknown authentication type: %s' % authentication_type)
|
|
|
|
# register extension with app
|
|
app.extensions = getattr(app, 'extensions', {})
|
|
app.extensions['authentication'] = users
|
|
return users
|
|
|
|
def _get_secret_key(self):
|
|
""" Returns the secret key to use for encrypting and decrypting. """
|
|
from app import app
|
|
app_secret_key = app.config['SECRET_KEY']
|
|
secret_key = None
|
|
|
|
# First try parsing the key as an int.
|
|
try:
|
|
big_int = int(app_secret_key)
|
|
secret_key = str(bytearray.fromhex('{:02x}'.format(big_int)))
|
|
except ValueError:
|
|
pass
|
|
|
|
# Next try parsing it as an UUID.
|
|
if secret_key is None:
|
|
try:
|
|
secret_key = uuid.UUID(app_secret_key).bytes
|
|
except ValueError:
|
|
pass
|
|
|
|
if secret_key is None:
|
|
secret_key = str(bytearray(map(ord, app_secret_key)))
|
|
|
|
# Otherwise, use the bytes directly.
|
|
return ''.join(itertools.islice(itertools.cycle(secret_key), 32))
|
|
|
|
def encrypt_user_password(self, password):
|
|
""" Returns an encrypted version of the user's password. """
|
|
data = {
|
|
'password': password
|
|
}
|
|
|
|
message = json.dumps(data)
|
|
cipher = AESCipher(self._get_secret_key())
|
|
return cipher.encrypt(message)
|
|
|
|
def _decrypt_user_password(self, encrypted):
|
|
""" Attempts to decrypt the given password and returns it. """
|
|
cipher = AESCipher(self._get_secret_key())
|
|
|
|
try:
|
|
message = cipher.decrypt(encrypted)
|
|
except ValueError:
|
|
return None
|
|
except TypeError:
|
|
return None
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
except ValueError:
|
|
return None
|
|
|
|
return data.get('password', encrypted)
|
|
|
|
def confirm_existing_user(self, username, password):
|
|
""" Verifies that the given password matches to the given DB username. Unlike verify_user, this
|
|
call first translates the DB user via the FederatedLogin table (where applicable).
|
|
"""
|
|
return self.state.confirm_existing_user(username, password)
|
|
|
|
|
|
def verify_user(self, username_or_email, password, basic_auth=False):
|
|
# First try to decode the password as a signed token.
|
|
if basic_auth:
|
|
import features
|
|
|
|
decrypted = self._decrypt_user_password(password)
|
|
if decrypted is None:
|
|
# This is a normal password.
|
|
if features.REQUIRE_ENCRYPTED_BASIC_AUTH:
|
|
msg = ('Client login with unecrypted passwords is disabled. Please generate an ' +
|
|
'encrypted password in the user admin panel for use here.')
|
|
return (None, msg)
|
|
else:
|
|
password = decrypted
|
|
|
|
return self.state.verify_user(username_or_email, password)
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.state, name, None)
|