This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/users/oidc.py
2017-10-12 16:25:21 -04:00

87 lines
2.8 KiB
Python

import logging
from data import model
from oauth.loginmanager import OAuthLoginManager
from oauth.oidc import PublicKeyLoadException
from util.security.jwtutil import InvalidTokenError
logger = logging.getLogger(__name__)
class UnknownServiceException(Exception):
pass
class OIDCInternalAuth(object):
""" Handles authentication by delegating authentication to a signed OIDC JWT produced by the
configured OIDC service.
"""
def __init__(self, config, login_service_id, requires_email):
login_manager = OAuthLoginManager(config)
self.login_service_id = login_service_id
self.login_service = login_manager.get_service(login_service_id)
if self.login_service is None:
raise UnknownServiceException('Unknown OIDC login service %s' % login_service_id)
@property
def federated_service(self):
return None
@property
def supports_encrypted_credentials(self):
# Since the "password" is already a signed JWT.
return False
def verify_credentials(self, username_or_email, id_token):
# Parse the ID token.
try:
payload = self.login_service.decode_user_jwt(id_token)
except InvalidTokenError as ite:
logger.exception('Got invalid token error on OIDC decode: %s. Token: %s', ite.message, id_token)
return (None, 'Could not validate OIDC token')
except PublicKeyLoadException as pke:
logger.exception('Could not load public key during OIDC decode: %s. Token: %s', pke.message, id_token)
return (None, 'Could not validate OIDC token')
# Find the user ID.
user_id = payload['sub']
# Lookup the federated login and user record with that matching ID and service.
user_found = model.user.verify_federated_login(self.login_service_id, user_id)
if user_found is None:
return (None, 'User does not exist')
if not user_found.enabled:
return (None, 'User account is disabled. Please contact your administrator.')
return (user_found, None)
def verify_and_link_user(self, username_or_email, password):
return self.verify_credentials(username_or_email, password)
def confirm_existing_user(self, username, password):
return self.verify_credentials(username, password)
def link_user(self, username_or_email):
return (None, 'Unsupported for this authentication system')
def get_and_link_federated_user_info(self, user_info):
return (None, 'Unsupported for this authentication system')
def query_users(self, query, limit):
return (None, '', '')
def check_group_lookup_args(self, group_lookup_args):
return (False, 'Not supported')
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
return (None, 'Not supported')
def service_metadata(self):
return {}
def ping(self):
""" Always assumed to be working. If the DB is broken, other checks will handle it. """
return (True, None)