import logging import json import os from data.users.federated import FederatedUsers, VerifiedCredentials from util.security import jwtutil logger = logging.getLogger(__name__) class ExternalJWTAuthN(FederatedUsers): """ Delegates authentication to a REST endpoint that returns JWTs. """ PUBLIC_KEY_FILENAME = 'jwt-authn.cert' def __init__(self, verify_url, issuer, override_config_dir, http_client, max_fresh_s, public_key_path=None): super(ExternalJWTAuthN, self).__init__('jwtauthn') self.verify_url = verify_url self.issuer = issuer self.client = http_client self.max_fresh_s = max_fresh_s default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME) public_key_path = public_key_path or default_key_path if not os.path.exists(public_key_path): error_message = ('JWT Authentication public key file "%s" not found in directory %s' % (ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir)) raise Exception(error_message) with open(public_key_path) as public_key_file: self.public_key = public_key_file.read() def verify_credentials(self, username_or_email, password): result = self.client.get(self.verify_url, timeout=2, auth=(username_or_email, password)) if result.status_code != 200: return (None, result.text or 'Invalid username or password') try: result_data = json.loads(result.text) except ValueError: raise Exception('Returned JWT Authentication body does not contain JSON') # Load the JWT returned. encoded = result_data.get('token', '') exp_limit_options = jwtutil.exp_max_s_option(self.max_fresh_s) try: payload = jwtutil.decode(encoded, self.public_key, algorithms=['RS256'], audience='quay.io/jwtauthn', issuer=self.issuer, options=exp_limit_options) except jwtutil.InvalidTokenError: logger.exception('Exception when decoding returned JWT') return (None, 'Invalid username or password') if not 'sub' in payload: raise Exception('Missing username field in JWT') if not 'email' in payload: raise Exception('Missing email field in JWT') # Parse out the username and email. return (VerifiedCredentials(username=payload['sub'], email=payload['email']), None)