import logging import json import os import jwt from datetime import datetime, timedelta from data.users.federated import FederatedUsers, VerifiedCredentials logger = logging.getLogger(__name__) class ExternalJWTAuthN(FederatedUsers): """ Delegates authentication to a REST endpoint that returns JWTs. """ PUBLIC_KEY_FILENAME = 'jwt-authn.cert' def __init__(self, verify_url, issuer, override_config_dir, http_client, max_fresh_s, public_key_path=None): super(ExternalJWTAuthN, self).__init__('jwtauthn') self.verify_url = verify_url self.issuer = issuer self.client = http_client self.max_fresh_s = max_fresh_s default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME) public_key_path = public_key_path or default_key_path if not os.path.exists(public_key_path): error_message = ('JWT Authentication public key file "%s" not found in directory %s' % (ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir)) raise Exception(error_message) with open(public_key_path) as public_key_file: self.public_key = public_key_file.read() def verify_credentials(self, username_or_email, password): result = self.client.get(self.verify_url, timeout=2, auth=(username_or_email, password)) if result.status_code != 200: return (None, result.text or 'Invalid username or password') try: result_data = json.loads(result.text) except ValueError: raise Exception('Returned JWT Authentication body does not contain JSON') # Load the JWT returned. encoded = result_data.get('token', '') try: payload = jwt.decode(encoded, self.public_key, algorithms=['RS256'], audience='quay.io/jwtauthn', issuer=self.issuer) except jwt.InvalidTokenError: logger.exception('Exception when decoding returned JWT') return (None, 'Invalid username or password') if not 'sub' in payload: raise Exception('Missing username field in JWT') if not 'email' in payload: raise Exception('Missing email field in JWT') if not 'exp' in payload: raise Exception('Missing exp field in JWT') # Verify that the expiration is no more than self.max_fresh_s seconds in the future. expiration = datetime.utcfromtimestamp(payload['exp']) if expiration > datetime.utcnow() + timedelta(seconds=self.max_fresh_s): logger.debug('Payload expiration is outside of the %s second window: %s', self.max_fresh_s, payload['exp']) return (None, 'Invalid username or password') # Parse out the username and email. return (VerifiedCredentials(username=payload['sub'], email=payload['email']), None)