import logging from data.users.federated import FederatedUsers, UserInformation from oauth.loginmanager import OAuthLoginManager from oauth.oidc import PublicKeyLoadException from util.security.jwtutil import InvalidTokenError logger = logging.getLogger(__name__) class OIDCInternalAuth(FederatedUsers): """ Handles authentication by delegating authentication to a signed OIDC JWT produced by the configured OIDC service. """ def __init__(self, config, login_service_id, requires_email): super(OIDCInternalAuth, self).__init__('oidc', requires_email) login_manager = OAuthLoginManager(config) self.login_service = login_manager.get_service(login_service_id) if self.login_service is None: raise Exception('Unknown OIDC login service %s' % login_service_id) @property def supports_encrypted_credentials(self): # Since the "password" is already a signed JWT. return False def get_user(self, username_or_email): return (None, 'Cannot retrieve users for OIDC') def query_users(self, query, limit=20): return (None, 'Cannot query users for OIDC') def verify_credentials(self, username_or_email, id_token): try: payload = self.login_service.decode_user_jwt(id_token) except InvalidTokenError as ite: logger.exception('Got invalid token error on OIDC decode: %s', ite.message) return (None, 'Could not validate OIDC token') except PublicKeyLoadException as pke: logger.exception('Could not load public key during OIDC decode: %s', pke.message) return (None, 'Could not validate OIDC token') user_info = UserInformation(username=payload['sub'], id=payload['sub'], email=None) return (user_info, None)