This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/security/registry_jwt.py
Jake Moshenko 9221a515de Use the registry API for security scanning
when the storage engine doesn't support direct download url
2016-05-04 18:04:06 -04:00

76 lines
1.8 KiB
Python

import time
import jwt
from cachetools import lru_cache
ANONYMOUS_SUB = '(anonymous)'
def generate_jwt_object(audience, subject, context, access, lifetime_s, app_config):
""" Generates a compact encoded JWT with the values specified.
"""
token_data = {
'iss': app_config['JWT_AUTH_TOKEN_ISSUER'],
'aud': audience,
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + lifetime_s),
'sub': subject,
'access': access,
'context': context,
}
certificate = _load_certificate_bytes(app_config['JWT_AUTH_CERTIFICATE_PATH'])
token_headers = {
'x5c': [certificate],
}
private_key = _load_private_key(app_config['JWT_AUTH_PRIVATE_KEY_PATH'])
return jwt.encode(token_data, private_key, 'RS256', headers=token_headers)
def build_context_and_subject(user, token, oauthtoken):
""" Builds the custom context field for the JWT signed token and returns it,
along with the subject for the JWT signed token. """
if oauthtoken:
context = {
'kind': 'oauth',
'user': user.username,
'oauth': oauthtoken.uuid,
}
return (context, user.username)
if user:
context = {
'kind': 'user',
'user': user.username,
}
return (context, user.username)
if token:
context = {
'kind': 'token',
'token': token.code,
}
return (context, None)
context = {
'kind': 'anonymous',
}
return (context, ANONYMOUS_SUB)
@lru_cache(maxsize=1)
def _load_certificate_bytes(certificate_file_path):
with open(certificate_file_path) as cert_file:
return ''.join(cert_file.readlines()[1:-1]).rstrip('\n')
@lru_cache(maxsize=1)
def _load_private_key(private_key_file_path):
with open(private_key_file_path) as private_key_file:
return private_key_file.read()