Use the registry API for security scanning

when the storage engine doesn't support direct download url
This commit is contained in:
Jake Moshenko 2016-05-04 17:40:09 -04:00
parent 1ef7008d85
commit 9221a515de
9 changed files with 149 additions and 106 deletions

View file

@ -0,0 +1,76 @@
import time
import jwt
from cachetools import lru_cache
ANONYMOUS_SUB = '(anonymous)'
def generate_jwt_object(audience, subject, context, access, lifetime_s, app_config):
""" Generates a compact encoded JWT with the values specified.
"""
token_data = {
'iss': app_config['JWT_AUTH_TOKEN_ISSUER'],
'aud': audience,
'nbf': int(time.time()),
'iat': int(time.time()),
'exp': int(time.time() + lifetime_s),
'sub': subject,
'access': access,
'context': context,
}
certificate = _load_certificate_bytes(app_config['JWT_AUTH_CERTIFICATE_PATH'])
token_headers = {
'x5c': [certificate],
}
private_key = _load_private_key(app_config['JWT_AUTH_PRIVATE_KEY_PATH'])
return jwt.encode(token_data, private_key, 'RS256', headers=token_headers)
def build_context_and_subject(user, token, oauthtoken):
""" Builds the custom context field for the JWT signed token and returns it,
along with the subject for the JWT signed token. """
if oauthtoken:
context = {
'kind': 'oauth',
'user': user.username,
'oauth': oauthtoken.uuid,
}
return (context, user.username)
if user:
context = {
'kind': 'user',
'user': user.username,
}
return (context, user.username)
if token:
context = {
'kind': 'token',
'token': token.code,
}
return (context, None)
context = {
'kind': 'anonymous',
}
return (context, ANONYMOUS_SUB)
@lru_cache(maxsize=1)
def _load_certificate_bytes(certificate_file_path):
with open(certificate_file_path) as cert_file:
return ''.join(cert_file.readlines()[1:-1]).rstrip('\n')
@lru_cache(maxsize=1)
def _load_private_key(private_key_file_path):
with open(private_key_file_path) as private_key_file:
return private_key_file.read()