This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/users.py
Joseph Schorr 8aac3fd86e Add support for an external JWT-based authentication system
This authentication system hits two HTTP endpoints to check and verify the existence of users:

Existance endpoint:
GET http://endpoint/ with Authorization: Basic (username:) =>
    Returns 200 if the username/email exists, 4** otherwise

Verification endpoint:
GET http://endpoint/ with Authorization: Basic (username:password) =>
    Returns 200 and a signed JWT with the user's username and email address if the username+password validates, 4** otherwise with the body containing an optional error message

The JWT produced by the endpoint must be issued with an issuer matching that configured in the config.yaml, and the audience must be "quay.io/jwtauthn". The JWT is signed using a private key and then validated on the Quay.io side with the associated public key, found as "jwt-authn.cert" in the conf/stack directory.
2015-06-05 13:20:10 -04:00

427 lines
14 KiB
Python

import ldap
import logging
import json
import itertools
import uuid
import struct
import os
import urllib
import jwt
from util.aes import AESCipher
from util.validation import generate_valid_usernames
from data import model
from collections import namedtuple
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
if os.environ.get('LDAP_DEBUG') == '1':
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
def _get_federated_user(username, email, federated_service, create_new_user):
db_user = model.verify_federated_login(federated_service, username)
if not db_user:
if not create_new_user:
return (None, 'Invalid user')
# We must create the user in our db
valid_username = None
for valid_username in generate_valid_usernames(username):
if model.is_username_unique(valid_username):
break
if not valid_username:
logger.error('Unable to pick a username for user: %s', username)
return (None, 'Unable to pick a username. Please report this to your administrator.')
db_user = model.create_federated_user(valid_username, email, federated_service, username,
set_password_notification=False)
else:
# Update the db attributes from ldap
db_user.email = email
db_user.save()
return (db_user, None)
class JWTAuthUsers(object):
""" Delegates authentication to a REST endpoint that returns JWTs. """
PUBLIC_KEY_FILENAME = 'jwt-authn.cert'
def __init__(self, exists_url, verify_url, issuer, public_key_path=None):
from app import OVERRIDE_CONFIG_DIRECTORY
self.verify_url = verify_url
self.exists_url = exists_url
self.issuer = issuer
default_key_path = os.path.join(OVERRIDE_CONFIG_DIRECTORY, JWTAuthUsers.PUBLIC_KEY_FILENAME)
public_key_path = public_key_path or default_key_path
if not os.path.exists(public_key_path):
error_message = ('JWT Authentication public key file "%s" not found in directory %s' %
(JWTAuthUsers.PUBLIC_KEY_FILENAME, OVERRIDE_CONFIG_DIRECTORY))
raise Exception(error_message)
with open(public_key_path) as public_key_file:
self.public_key = public_key_file.read()
def verify_user(self, username_or_email, password, create_new_user=True):
from app import app
client = app.config['HTTPCLIENT']
result = client.get(self.verify_url, timeout=2, auth=(username_or_email, password))
if result.status_code != 200:
return (None, result.text or 'Invalid username or password')
try:
result_data = json.loads(result.text)
except ValueError:
raise Exception('Returned JWT Authentication body does not contain JSON')
# Load the JWT returned.
encoded = result_data.get('token', '')
try:
payload = jwt.decode(encoded, self.public_key, algorithms=['RS256'],
audience='quay.io/jwtauthn', issuer=self.issuer)
except jwt.InvalidTokenError:
logger.exception('Exception when decoding returned JWT')
return (None, 'Invalid username or password')
if not 'sub' in payload:
raise Exception('Missing username field in JWT')
if not 'email' in payload:
raise Exception('Missing email field in JWT')
if not 'exp' in payload:
raise Exception('Missing exp field in JWT')
# Verify that the expiration is no more than 300 seconds in the future.
if datetime.fromtimestamp(payload['exp']) > datetime.utcnow() + timedelta(seconds=300):
logger.debug('Payload expiration is outside of the 300 second window: %s', payload['exp'])
return (None, 'Invalid username or password')
# Parse out the username and email.
return _get_federated_user(payload['sub'], payload['email'], 'jwtauthn', create_new_user)
def user_exists(self, username):
from app import app
client = app.config['HTTPCLIENT']
result = client.get(self.exists_url, auth=(username, ''), timeout=2)
if result.status_code / 500 >= 1:
raise Exception('Internal Error when trying to check if user exists: %s' % result.text)
return result.status_code == 200
def confirm_existing_user(self, username, password):
db_user = model.get_user(username)
if not db_user:
return (None, 'Invalid user')
federated_login = model.lookup_federated_login(db_user, 'jwtauthn')
if not federated_login:
return (None, 'Invalid user')
return self.verify_user(federated_login.service_ident, password, create_new_user=False)
class DatabaseUsers(object):
def verify_user(self, username_or_email, password):
""" Simply delegate to the model implementation. """
result = model.verify_user(username_or_email, password)
if not result:
return (None, 'Invalid Username or Password')
return (result, None)
def confirm_existing_user(self, username, password):
return self.verify_user(username, password)
def user_exists(self, username):
return model.get_user(username) is not None
class LDAPConnection(object):
def __init__(self, ldap_uri, user_dn, user_pw):
self._ldap_uri = ldap_uri
self._user_dn = user_dn
self._user_pw = user_pw
self._conn = None
def __enter__(self):
trace_level = 2 if os.environ.get('LDAP_DEBUG') == '1' else 0
self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level)
self._conn.set_option(ldap.OPT_REFERRALS, 1)
self._conn.simple_bind_s(self._user_dn, self._user_pw)
return self._conn
def __exit__(self, exc_type, value, tb):
self._conn.unbind_s()
class LDAPUsers(object):
_LDAPResult = namedtuple('LDAPResult', ['dn', 'attrs'])
def __init__(self, ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr):
self._ldap_conn = LDAPConnection(ldap_uri, admin_dn, admin_passwd)
self._ldap_uri = ldap_uri
self._base_dn = base_dn
self._user_rdn = user_rdn
self._uid_attr = uid_attr
self._email_attr = email_attr
def _get_ldap_referral_dn(self, referral_exception):
logger.debug('Got referral: %s', referral_exception.args[0])
if not referral_exception.args[0] or not referral_exception.args[0].get('info'):
logger.debug('LDAP referral missing info block')
return None
referral_info = referral_exception.args[0]['info']
if not referral_info.startswith('Referral:\n'):
logger.debug('LDAP referral missing Referral header')
return None
referral_uri = referral_info[len('Referral:\n'):]
if not referral_uri.startswith('ldap:///'):
logger.debug('LDAP referral URI does not start with ldap:///')
return None
referral_dn = referral_uri[len('ldap:///'):]
return referral_dn
def _ldap_user_search(self, username_or_email):
with self._ldap_conn as conn:
logger.debug('Incoming username or email param: %s', username_or_email.__repr__())
user_search_dn = ','.join(self._user_rdn + self._base_dn)
query = u'(|({0}={2})({1}={2}))'.format(self._uid_attr, self._email_attr,
username_or_email)
logger.debug('Conducting user search: %s under %s', query, user_search_dn)
try:
pairs = conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query.encode('utf-8'))
except ldap.REFERRAL as re:
referral_dn = self._get_ldap_referral_dn(re)
if not referral_dn:
return None
try:
subquery = u'(%s=%s)' % (self._uid_attr, username_or_email)
pairs = conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery)
except ldap.LDAPError:
logger.exception('LDAP referral search exception')
return None
except ldap.LDAPError:
logger.exception('LDAP search exception')
return None
logger.debug('Found matching pairs: %s', pairs)
results = [LDAPUsers._LDAPResult(*pair) for pair in pairs]
# Filter out pairs without DNs. Some LDAP impls will return such
# pairs.
with_dns = [result for result in results if result.dn]
if len(with_dns) < 1:
return None
# If we have found a single pair, then return it.
if len(with_dns) == 1:
return with_dns[0]
# Otherwise, there are multiple pairs with DNs, so find the one with the mail
# attribute (if any).
with_mail = [result for result in results if result.attrs.get(self._email_attr)]
return with_mail[0] if with_mail else with_dns[0]
def confirm_existing_user(self, username, password):
""" Verify the username and password by looking up the *LDAP* username and confirming the
password.
"""
db_user = model.get_user(username)
if not db_user:
return (None, 'Invalid user')
federated_login = model.lookup_federated_login(db_user, 'ldap')
if not federated_login:
return (None, 'Invalid user')
return self.verify_user(federated_login.service_ident, password, create_new_user=False)
def verify_user(self, username_or_email, password, create_new_user=True):
""" Verify the credentials with LDAP and if they are valid, create or update the user
in our database. """
# Make sure that even if the server supports anonymous binds, we don't allow it
if not password:
return (None, 'Anonymous binding not allowed')
found_user = self._ldap_user_search(username_or_email)
if found_user is None:
return (None, 'Username not found')
found_dn, found_response = found_user
logger.debug('Found user for LDAP username %s; validating password', username_or_email)
logger.debug('DN %s found: %s', found_dn, found_response)
# First validate the password by binding as the user
try:
with LDAPConnection(self._ldap_uri, found_dn, password.encode('utf-8')):
pass
except ldap.REFERRAL as re:
referral_dn = self._get_ldap_referral_dn(re)
if not referral_dn:
return (None, 'Invalid username')
try:
with LDAPConnection(self._ldap_uri, referral_dn, password.encode('utf-8')):
pass
except ldap.INVALID_CREDENTIALS:
logger.exception('Invalid LDAP credentials')
return (None, 'Invalid password')
except ldap.INVALID_CREDENTIALS:
logger.exception('Invalid LDAP credentials')
return (None, 'Invalid password')
# Now check if we have a federated login for this user
if not found_response.get(self._uid_attr):
return (None, 'Missing uid field "%s" in user record' % self._uid_attr)
if not found_response.get(self._email_attr):
return (None, 'Missing mail field "%s" in user record' % self._email_attr)
username = found_response[self._uid_attr][0].decode('utf-8')
email = found_response[self._email_attr][0]
return _get_federated_user(username, email, 'ldap', create_new_user)
def user_exists(self, username):
found_user = self._ldap_user_search(username)
return found_user is not None
class UserAuthentication(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
authentication_type = app.config.get('AUTHENTICATION_TYPE', 'Database')
if authentication_type == 'Database':
users = DatabaseUsers()
elif authentication_type == 'LDAP':
ldap_uri = app.config.get('LDAP_URI', 'ldap://localhost')
base_dn = app.config.get('LDAP_BASE_DN')
admin_dn = app.config.get('LDAP_ADMIN_DN')
admin_passwd = app.config.get('LDAP_ADMIN_PASSWD')
user_rdn = app.config.get('LDAP_USER_RDN', [])
uid_attr = app.config.get('LDAP_UID_ATTR', 'uid')
email_attr = app.config.get('LDAP_EMAIL_ATTR', 'mail')
users = LDAPUsers(ldap_uri, base_dn, admin_dn, admin_passwd, user_rdn, uid_attr, email_attr)
elif authentication_type == 'JWT':
verify_url = app.config.get('JWT_VERIFY_ENDPOINT')
exists_url = app.config.get('JWT_EXISTS_ENDPOINT')
issuer = app.config.get('JWT_AUTH_ISSUER')
users = JWTAuthUsers(exists_url, verify_url, issuer)
else:
raise RuntimeError('Unknown authentication type: %s' % authentication_type)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['authentication'] = users
return users
def _get_secret_key(self):
""" Returns the secret key to use for encrypting and decrypting. """
from app import app
app_secret_key = app.config['SECRET_KEY']
secret_key = None
# First try parsing the key as an int.
try:
big_int = int(app_secret_key)
secret_key = str(bytearray.fromhex('{:02x}'.format(big_int)))
except ValueError:
pass
# Next try parsing it as an UUID.
if secret_key is None:
try:
secret_key = uuid.UUID(app_secret_key).bytes
except ValueError:
pass
if secret_key is None:
secret_key = str(bytearray(map(ord, app_secret_key)))
# Otherwise, use the bytes directly.
return ''.join(itertools.islice(itertools.cycle(secret_key), 32))
def encrypt_user_password(self, password):
""" Returns an encrypted version of the user's password. """
data = {
'password': password
}
message = json.dumps(data)
cipher = AESCipher(self._get_secret_key())
return cipher.encrypt(message)
def _decrypt_user_password(self, encrypted):
""" Attempts to decrypt the given password and returns it. """
cipher = AESCipher(self._get_secret_key())
try:
message = cipher.decrypt(encrypted)
except ValueError:
return None
except TypeError:
return None
try:
data = json.loads(message)
except ValueError:
return None
return data.get('password', encrypted)
def confirm_existing_user(self, username, password):
""" Verifies that the given password matches to the given DB username. Unlike verify_user, this
call first translates the DB user via the FederatedLogin table (where applicable).
"""
return self.state.confirm_existing_user(username, password)
def verify_user(self, username_or_email, password, basic_auth=False):
# First try to decode the password as a signed token.
if basic_auth:
import features
decrypted = self._decrypt_user_password(password)
if decrypted is None:
# This is a normal password.
if features.REQUIRE_ENCRYPTED_BASIC_AUTH:
msg = ('Client login with unecrypted passwords is disabled. Please generate an ' +
'encrypted password in the user admin panel for use here.')
return (None, msg)
else:
password = decrypted
return self.state.verify_user(username_or_email, password)
def __getattr__(self, name):
return getattr(self.state, name, None)