import logging from data import model from oauth.loginmanager import OAuthLoginManager from oauth.oidc import PublicKeyLoadException from util.security.jwtutil import InvalidTokenError logger = logging.getLogger(__name__) class UnknownServiceException(Exception): pass class OIDCInternalAuth(object): """ Handles authentication by delegating authentication to a signed OIDC JWT produced by the configured OIDC service. """ def __init__(self, config, login_service_id, requires_email): login_manager = OAuthLoginManager(config) self.login_service_id = login_service_id self.login_service = login_manager.get_service(login_service_id) if self.login_service is None: raise UnknownServiceException('Unknown OIDC login service %s' % login_service_id) @property def federated_service(self): return None @property def requires_distinct_cli_password(self): # Since the "password" is the generated ID token. return False @property def supports_encrypted_credentials(self): # Since the "password" is already a signed JWT. return False def verify_credentials(self, username_or_email, id_token): # Parse the ID token. try: payload = self.login_service.decode_user_jwt(id_token) except InvalidTokenError as ite: logger.exception('Got invalid token error on OIDC decode: %s. Token: %s', ite.message, id_token) return (None, 'Could not validate OIDC token') except PublicKeyLoadException as pke: logger.exception('Could not load public key during OIDC decode: %s. Token: %s', pke.message, id_token) return (None, 'Could not validate OIDC token') # Find the user ID. user_id = payload['sub'] # Lookup the federated login and user record with that matching ID and service. user_found = model.user.verify_federated_login(self.login_service_id, user_id) if user_found is None: return (None, 'User does not exist') if not user_found.enabled: return (None, 'User account is disabled. Please contact your administrator.') return (user_found, None) def verify_and_link_user(self, username_or_email, password): return self.verify_credentials(username_or_email, password) def confirm_existing_user(self, username, password): return self.verify_credentials(username, password) def link_user(self, username_or_email): return (None, 'Unsupported for this authentication system') def get_and_link_federated_user_info(self, user_info): return (None, 'Unsupported for this authentication system') def query_users(self, query, limit): return (None, '', '') def check_group_lookup_args(self, group_lookup_args): return (False, 'Not supported') def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False): return (None, 'Not supported') def service_metadata(self): return {} def ping(self): """ Always assumed to be working. If the DB is broken, other checks will handle it. """ return (True, None)