import logging import json import string import datetime import os import re from random import SystemRandom from functools import wraps from cachetools import lru_cache from flask import make_response, render_template, request, abort, session from flask_login import login_user from flask_principal import identity_changed import endpoints.decorated # Register the various exceptions via decorators. import features from app import app, oauth_apps, oauth_login, LoginWrappedDBUser, user_analytics, license_validator from auth import scopes from auth.permissions import QuayDeferredPermissionUser from config import frontend_visible_config from external_libraries import get_external_javascript, get_external_css from util.names import parse_namespace_repository from util.secscan import PRIORITY_LEVELS from util.saas.useranalytics import build_error_callback from util.timedeltastring import convert_to_timedelta logger = logging.getLogger(__name__) route_data = None CACHE_BUSTERS_JSON = 'static/dist/cachebusters.json' CACHE_BUSTERS = None def get_cache_busters(): """ Retrieves the cache busters hashes. """ global CACHE_BUSTERS if CACHE_BUSTERS is not None: return CACHE_BUSTERS if not os.path.exists(CACHE_BUSTERS_JSON): return {} with open(CACHE_BUSTERS_JSON, 'r') as f: CACHE_BUSTERS = json.loads(f.read()) return CACHE_BUSTERS def parse_repository_name(include_tag=False, ns_kwarg_name='namespace_name', repo_kwarg_name='repo_name', tag_kwarg_name='tag_name', incoming_repo_kwarg='repository'): def inner(func): @wraps(func) def wrapper(*args, **kwargs): repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg], app.config['LIBRARY_NAMESPACE'], include_tag=include_tag) del kwargs[incoming_repo_kwarg] kwargs[ns_kwarg_name] = repo_name_components[0] kwargs[repo_kwarg_name] = repo_name_components[1] if include_tag: kwargs[tag_kwarg_name] = repo_name_components[2] return func(*args, **kwargs) return wrapper return inner def route_show_if(value): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not value: abort(404) return f(*args, **kwargs) return decorated_function return decorator def route_hide_if(value): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if value: abort(404) return f(*args, **kwargs) return decorated_function return decorator def truthy_param(param): return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'} def param_required(param_name, allow_body=False): def wrapper(wrapped): @wraps(wrapped) def decorated(*args, **kwargs): if param_name not in request.args: if not allow_body or param_name not in request.values: abort(make_response('Required param: %s' % param_name, 400)) return wrapped(*args, **kwargs) return decorated return wrapper def common_login(db_user, permanent_session=True): if login_user(LoginWrappedDBUser(db_user.uuid, db_user)): logger.debug('Successfully signed in as: %s (%s)' % (db_user.username, db_user.uuid)) new_identity = QuayDeferredPermissionUser.for_user(db_user) identity_changed.send(app, identity=new_identity) session['login_time'] = datetime.datetime.now() if permanent_session and features.PERMANENT_SESSIONS: session_timeout_str = app.config.get('SESSION_TIMEOUT', '31d') session.permanent = True session.permanent_session_lifetime = convert_to_timedelta(session_timeout_str) # Inform our user analytics that we have a new "lead" create_lead_future = user_analytics.create_lead( db_user.email, db_user.username, db_user.given_name, db_user.family_name, db_user.company, ) create_lead_future.add_done_callback(build_error_callback('Create lead failed')) return True else: logger.debug('User could not be logged in, inactive?.') return False def random_string(): random = SystemRandom() return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(8)]) def list_files(path, extension): import os def matches(f): return os.path.splitext(f)[1] == '.' + extension and f.split(os.path.extsep)[1] != 'spec' def join_path(dp, f): # Remove the static/ prefix. It is added in the template. return os.path.join(dp, f)[len('static/'):] filepath = 'static/' + path return [join_path(dp, f) for dp, dn, files in os.walk(filepath) for f in files if matches(f)] @lru_cache(maxsize=1) def _get_version_number(): try: with open('CHANGELOG.md') as f: return re.search('(v[0-9]+\.[0-9]+\.[0-9]+)', f.readline()).group(0) except IOError: return '' def render_page_template(name, route_data=None, **kwargs): debugging = app.config.get('DEBUGGING', False) library_styles = [] main_styles = [] library_scripts = [] main_scripts = ['build/quay-frontend.js'] use_cdn = app.config.get('USE_CDN', True) if request.args.get('use_cdn') is not None: use_cdn = request.args.get('use_cdn') == 'true' external_styles = get_external_css(local=not use_cdn) external_scripts = get_external_javascript(local=not use_cdn) # Add Stripe checkout if billing is enabled. if features.BILLING: external_scripts.append('//checkout.stripe.com/checkout.js') def add_cachebusters(filenames): cachebusters = get_cache_busters() for filename in filenames: cache_buster = cachebusters.get(filename, random_string()) if not debugging else 'debugging' yield (filename, cache_buster) def get_external_login_config(): login_config = [] for login_service in oauth_login.services: login_config.append({ 'id': login_service.service_id(), 'title': login_service.service_name(), 'config': login_service.get_public_config(), 'icon': login_service.get_icon(), }) return login_config def get_oauth_config(): oauth_config = {} for oauth_app in oauth_apps: oauth_config[oauth_app.key_name] = oauth_app.get_public_config() return oauth_config contact_href = None if len(app.config.get('CONTACT_INFO', [])) == 1: contact_href = app.config['CONTACT_INFO'][0] version_number = '' if not features.BILLING: version_number = 'Quay %s' % _get_version_number() resp = make_response(render_template(name, route_data=route_data, external_styles=external_styles, external_scripts=external_scripts, main_styles=add_cachebusters(main_styles), library_styles=add_cachebusters(library_styles), main_scripts=add_cachebusters(main_scripts), library_scripts=add_cachebusters(library_scripts), feature_set=features.get_features(), config_set=frontend_visible_config(app.config), oauth_set=get_oauth_config(), external_login_set=get_external_login_config(), scope_set=scopes.app_scopes(app.config), vuln_priority_set=PRIORITY_LEVELS, enterprise_logo=app.config.get('ENTERPRISE_LOGO_URL', ''), mixpanel_key=app.config.get('MIXPANEL_KEY', ''), munchkin_key=app.config.get('MARKETO_MUNCHKIN_ID', ''), recaptcha_key=app.config.get('RECAPTCHA_SITE_KEY', ''), google_tagmanager_key=app.config.get('GOOGLE_TAGMANAGER_KEY', ''), google_anaytics_key=app.config.get('GOOGLE_ANALYTICS_KEY', ''), sentry_public_dsn=app.config.get('SENTRY_PUBLIC_DSN', ''), is_debug=str(app.config.get('DEBUGGING', False)).lower(), show_chat=features.SUPPORT_CHAT, aci_conversion=features.ACI_CONVERSION, has_billing=features.BILLING, contact_href=contact_href, hostname=app.config['SERVER_HOSTNAME'], preferred_scheme=app.config['PREFERRED_URL_SCHEME'], version_number=version_number, license_insufficient=license_validator.insufficient, license_expiring=license_validator.expiring_soon, current_year=datetime.datetime.now().year, **kwargs)) resp.headers['X-FRAME-OPTIONS'] = 'DENY' return resp