This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/common.py
Joseph Schorr fda203e4d7 Add proper and tested OIDC support on the server
Note that this will still not work on the client side; the followup CL for the client side is right after this one.
2017-01-23 17:53:34 -05:00

254 lines
9.4 KiB
Python

import logging
import json
import string
import datetime
import os
import re
from random import SystemRandom
from functools import wraps
from cachetools import lru_cache
from flask import make_response, render_template, request, abort, session
from flask_login import login_user
from flask_principal import identity_changed
import endpoints.decorated # Register the various exceptions via decorators.
import features
from app import app, oauth_apps, oauth_login, LoginWrappedDBUser, user_analytics, license_validator
from auth import scopes
from auth.permissions import QuayDeferredPermissionUser
from config import frontend_visible_config
from external_libraries import get_external_javascript, get_external_css
from util.names import parse_namespace_repository
from util.secscan import PRIORITY_LEVELS
from util.timedeltastring import convert_to_timedelta
logger = logging.getLogger(__name__)
route_data = None
CACHE_BUSTERS_JSON = 'static/dist/cachebusters.json'
CACHE_BUSTERS = None
def get_cache_busters():
""" Retrieves the cache busters hashes. """
global CACHE_BUSTERS
if CACHE_BUSTERS is not None:
return CACHE_BUSTERS
if not os.path.exists(CACHE_BUSTERS_JSON):
return {}
with open(CACHE_BUSTERS_JSON, 'r') as f:
CACHE_BUSTERS = json.loads(f.read())
return CACHE_BUSTERS
def parse_repository_name(include_tag=False,
ns_kwarg_name='namespace_name',
repo_kwarg_name='repo_name',
tag_kwarg_name='tag_name',
incoming_repo_kwarg='repository'):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
app.config['LIBRARY_NAMESPACE'],
include_tag=include_tag)
del kwargs[incoming_repo_kwarg]
kwargs[ns_kwarg_name] = repo_name_components[0]
kwargs[repo_kwarg_name] = repo_name_components[1]
if include_tag:
kwargs[tag_kwarg_name] = repo_name_components[2]
return func(*args, **kwargs)
return wrapper
return inner
def route_show_if(value):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def route_hide_if(value):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def truthy_param(param):
return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'}
def param_required(param_name, allow_body=False):
def wrapper(wrapped):
@wraps(wrapped)
def decorated(*args, **kwargs):
if param_name not in request.args:
if not allow_body or param_name not in request.values:
abort(make_response('Required param: %s' % param_name, 400))
return wrapped(*args, **kwargs)
return decorated
return wrapper
def common_login(db_user, permanent_session=True):
if login_user(LoginWrappedDBUser(db_user.uuid, db_user)):
logger.debug('Successfully signed in as: %s (%s)' % (db_user.username, db_user.uuid))
new_identity = QuayDeferredPermissionUser.for_user(db_user)
identity_changed.send(app, identity=new_identity)
session['login_time'] = datetime.datetime.now()
if permanent_session and features.PERMANENT_SESSIONS:
session_timeout_str = app.config.get('SESSION_TIMEOUT', '31d')
session.permanent = True
session.permanent_session_lifetime = convert_to_timedelta(session_timeout_str)
# Inform our user analytics that we have a new "lead"
user_analytics.create_lead(db_user.email, db_user.username, db_user.given_name,
db_user.family_name, db_user.company)
return True
else:
logger.debug('User could not be logged in, inactive?.')
return False
def random_string():
random = SystemRandom()
return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(8)])
def list_files(path, extension):
import os
def matches(f):
return os.path.splitext(f)[1] == '.' + extension and f.split(os.path.extsep)[1] != 'spec'
def join_path(dp, f):
# Remove the static/ prefix. It is added in the template.
return os.path.join(dp, f)[len('static/'):]
filepath = 'static/' + path
return [join_path(dp, f) for dp, dn, files in os.walk(filepath) for f in files if matches(f)]
@lru_cache(maxsize=1)
def _get_version_number():
try:
with open('CHANGELOG.md') as f:
return re.search('(v[0-9]+\.[0-9]+\.[0-9]+)', f.readline()).group(0)
except IOError:
return ''
def render_page_template(name, route_data=None, **kwargs):
debugging = app.config.get('DEBUGGING', False)
if debugging:
# If DEBUGGING is enabled, then we load the full set of individual JS and CSS files
# from the file system.
library_styles = list_files('lib', 'css')
main_styles = list_files('css', 'css')
library_scripts = list_files('lib', 'js')
main_scripts = list_files('js', 'js')
file_lists = [library_styles, main_styles, library_scripts, main_scripts]
for file_list in file_lists:
file_list.sort()
else:
library_styles = []
main_styles = ['dist/quay-frontend.css']
library_scripts = []
main_scripts = ['dist/quay-frontend.min.js']
use_cdn = app.config.get('USE_CDN', True)
if request.args.get('use_cdn') is not None:
use_cdn = request.args.get('use_cdn') == 'true'
external_styles = get_external_css(local=not use_cdn)
external_scripts = get_external_javascript(local=not use_cdn)
# Add Stripe checkout if billing is enabled.
if features.BILLING:
external_scripts.append('//checkout.stripe.com/checkout.js')
def add_cachebusters(filenames):
cachebusters = get_cache_busters()
for filename in filenames:
cache_buster = cachebusters.get(filename, random_string()) if not debugging else 'debugging'
yield (filename, cache_buster)
def get_external_login_config():
login_config = []
for login_service in oauth_login.services:
login_config.append({
'id': login_service.service_id(),
'title': login_service.service_name(),
'config': login_service.get_public_config(),
'icon': login_service.get_icon(),
})
return login_config
def get_oauth_config():
oauth_config = {}
for oauth_app in oauth_apps:
oauth_config[oauth_app.key_name] = oauth_app.get_public_config()
return oauth_config
contact_href = None
if len(app.config.get('CONTACT_INFO', [])) == 1:
contact_href = app.config['CONTACT_INFO'][0]
version_number = ''
if not features.BILLING:
version_number = ' - ' + _get_version_number()
resp = make_response(render_template(name,
route_data=route_data,
external_styles=external_styles,
external_scripts=external_scripts,
main_styles=add_cachebusters(main_styles),
library_styles=add_cachebusters(library_styles),
main_scripts=add_cachebusters(main_scripts),
library_scripts=add_cachebusters(library_scripts),
feature_set=features.get_features(),
config_set=frontend_visible_config(app.config),
oauth_set=get_oauth_config(),
external_login_set=get_external_login_config(),
scope_set=scopes.app_scopes(app.config),
vuln_priority_set=PRIORITY_LEVELS,
enterprise_logo=app.config.get('ENTERPRISE_LOGO_URL', ''),
mixpanel_key=app.config.get('MIXPANEL_KEY', ''),
munchkin_key=app.config.get('MARKETO_MUNCHKIN_ID', ''),
recaptcha_key=app.config.get('RECAPTCHA_SITE_KEY', ''),
google_tagmanager_key=app.config.get('GOOGLE_TAGMANAGER_KEY', ''),
google_anaytics_key=app.config.get('GOOGLE_ANALYTICS_KEY', ''),
sentry_public_dsn=app.config.get('SENTRY_PUBLIC_DSN', ''),
is_debug=str(app.config.get('DEBUGGING', False)).lower(),
show_chat=features.SUPPORT_CHAT,
aci_conversion=features.ACI_CONVERSION,
has_billing=features.BILLING,
contact_href=contact_href,
hostname=app.config['SERVER_HOSTNAME'],
preferred_scheme=app.config['PREFERRED_URL_SCHEME'],
version_number=version_number,
license_insufficient=license_validator.insufficient,
license_expiring=license_validator.expiring_soon,
**kwargs))
resp.headers['X-FRAME-OPTIONS'] = 'DENY'
return resp