This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/users/externaljwt.py

72 lines
2.7 KiB
Python

import logging
import json
import os
import jwt
from datetime import datetime, timedelta
from data.users.federated import FederatedUsers, VerifiedCredentials
logger = logging.getLogger(__name__)
class ExternalJWTAuthN(FederatedUsers):
""" Delegates authentication to a REST endpoint that returns JWTs. """
PUBLIC_KEY_FILENAME = 'jwt-authn.cert'
def __init__(self, verify_url, issuer, override_config_dir, http_client, max_fresh_s,
public_key_path=None):
super(ExternalJWTAuthN, self).__init__('jwtauthn')
self.verify_url = verify_url
self.issuer = issuer
self.client = http_client
self.max_fresh_s = max_fresh_s
default_key_path = os.path.join(override_config_dir, ExternalJWTAuthN.PUBLIC_KEY_FILENAME)
public_key_path = public_key_path or default_key_path
if not os.path.exists(public_key_path):
error_message = ('JWT Authentication public key file "%s" not found in directory %s' %
(ExternalJWTAuthN.PUBLIC_KEY_FILENAME, override_config_dir))
raise Exception(error_message)
with open(public_key_path) as public_key_file:
self.public_key = public_key_file.read()
def verify_credentials(self, username_or_email, password):
result = self.client.get(self.verify_url, timeout=2, auth=(username_or_email, password))
if result.status_code != 200:
return (None, result.text or 'Invalid username or password')
try:
result_data = json.loads(result.text)
except ValueError:
raise Exception('Returned JWT Authentication body does not contain JSON')
# Load the JWT returned.
encoded = result_data.get('token', '')
try:
payload = jwt.decode(encoded, self.public_key, algorithms=['RS256'],
audience='quay.io/jwtauthn', issuer=self.issuer)
except jwt.InvalidTokenError:
logger.exception('Exception when decoding returned JWT')
return (None, 'Invalid username or password')
if not 'sub' in payload:
raise Exception('Missing username field in JWT')
if not 'email' in payload:
raise Exception('Missing email field in JWT')
if not 'exp' in payload:
raise Exception('Missing exp field in JWT')
# Verify that the expiration is no more than self.max_fresh_s seconds in the future.
expiration = datetime.utcfromtimestamp(payload['exp'])
if expiration > datetime.utcnow() + timedelta(seconds=self.max_fresh_s):
logger.debug('Payload expiration is outside of the %s second window: %s', self.max_fresh_s,
payload['exp'])
return (None, 'Invalid username or password')
# Parse out the username and email.
return (VerifiedCredentials(username=payload['sub'], email=payload['email']), None)