import time import jwt from cachetools import lru_cache ANONYMOUS_SUB = '(anonymous)' def generate_jwt_object(audience, subject, context, access, lifetime_s, app_config): """ Generates a compact encoded JWT with the values specified. """ token_data = { 'iss': app_config['JWT_AUTH_TOKEN_ISSUER'], 'aud': audience, 'nbf': int(time.time()), 'iat': int(time.time()), 'exp': int(time.time() + lifetime_s), 'sub': subject, 'access': access, 'context': context, } certificate = _load_certificate_bytes(app_config['JWT_AUTH_CERTIFICATE_PATH']) token_headers = { 'x5c': [certificate], } private_key = _load_private_key(app_config['JWT_AUTH_PRIVATE_KEY_PATH']) return jwt.encode(token_data, private_key, 'RS256', headers=token_headers) def build_context_and_subject(user, token, oauthtoken): """ Builds the custom context field for the JWT signed token and returns it, along with the subject for the JWT signed token. """ if oauthtoken: context = { 'kind': 'oauth', 'user': user.username, 'oauth': oauthtoken.uuid, } return (context, user.username) if user: context = { 'kind': 'user', 'user': user.username, } return (context, user.username) if token: context = { 'kind': 'token', 'token': token.code, } return (context, None) context = { 'kind': 'anonymous', } return (context, ANONYMOUS_SUB) @lru_cache(maxsize=1) def _load_certificate_bytes(certificate_file_path): with open(certificate_file_path) as cert_file: cert_lines = cert_file.readlines()[1:-1] return ''.join([cert_line.rstrip('\n') for cert_line in cert_lines]) @lru_cache(maxsize=1) def _load_private_key(private_key_file_path): with open(private_key_file_path) as private_key_file: return private_key_file.read()