This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/common.py

256 lines
9.5 KiB
Python
Raw Normal View History

import logging
import json
import string
import datetime
import os
import re
from random import SystemRandom
from functools import wraps
from cachetools import lru_cache
from flask import make_response, render_template, request, abort, session
from flask_login import login_user
from flask_principal import identity_changed
import endpoints.decorated # Register the various exceptions via decorators.
import features
from app import app, oauth_apps, oauth_login, LoginWrappedDBUser, user_analytics, license_validator
from auth import scopes
from auth.permissions import QuayDeferredPermissionUser
from config import frontend_visible_config
from external_libraries import get_external_javascript, get_external_css
from util.names import parse_namespace_repository
from util.secscan import PRIORITY_LEVELS
2016-10-10 17:00:59 +00:00
from util.timedeltastring import convert_to_timedelta
logger = logging.getLogger(__name__)
route_data = None
CACHE_BUSTERS_JSON = 'static/dist/cachebusters.json'
CACHE_BUSTERS = None
def get_cache_busters():
""" Retrieves the cache busters hashes. """
global CACHE_BUSTERS
if CACHE_BUSTERS is not None:
return CACHE_BUSTERS
if not os.path.exists(CACHE_BUSTERS_JSON):
return {}
with open(CACHE_BUSTERS_JSON, 'r') as f:
CACHE_BUSTERS = json.loads(f.read())
return CACHE_BUSTERS
2016-03-09 21:20:28 +00:00
def parse_repository_name(include_tag=False,
ns_kwarg_name='namespace_name',
repo_kwarg_name='repo_name',
tag_kwarg_name='tag_name',
incoming_repo_kwarg='repository'):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
app.config['LIBRARY_NAMESPACE'],
include_tag=include_tag)
2016-03-09 21:20:28 +00:00
del kwargs[incoming_repo_kwarg]
kwargs[ns_kwarg_name] = repo_name_components[0]
kwargs[repo_kwarg_name] = repo_name_components[1]
2016-03-09 21:20:28 +00:00
if include_tag:
kwargs[tag_kwarg_name] = repo_name_components[2]
2016-03-09 21:20:28 +00:00
return func(*args, **kwargs)
return wrapper
return inner
def route_show_if(value):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def route_hide_if(value):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def truthy_param(param):
return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'}
def param_required(param_name, allow_body=False):
def wrapper(wrapped):
@wraps(wrapped)
def decorated(*args, **kwargs):
if param_name not in request.args:
2015-07-20 21:04:06 +00:00
if not allow_body or param_name not in request.values:
abort(make_response('Required param: %s' % param_name, 400))
return wrapped(*args, **kwargs)
return decorated
return wrapper
2016-10-10 17:00:59 +00:00
def common_login(db_user, permanent_session=True):
if login_user(LoginWrappedDBUser(db_user.uuid, db_user)):
logger.debug('Successfully signed in as: %s (%s)' % (db_user.username, db_user.uuid))
new_identity = QuayDeferredPermissionUser.for_user(db_user)
identity_changed.send(app, identity=new_identity)
session['login_time'] = datetime.datetime.now()
2016-10-13 17:48:35 +00:00
2016-10-10 17:00:59 +00:00
if permanent_session and features.PERMANENT_SESSIONS:
session_timeout_str = app.config.get('SESSION_TIMEOUT', '31d')
session.permanent = True
session.permanent_session_lifetime = convert_to_timedelta(session_timeout_str)
2016-10-13 17:48:35 +00:00
# Inform our user analytics that we have a new "lead"
user_analytics.create_lead(db_user.email, db_user.username, db_user.given_name,
db_user.family_name, db_user.company)
return True
else:
logger.debug('User could not be logged in, inactive?.')
return False
2013-12-28 19:07:44 +00:00
def random_string():
random = SystemRandom()
return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(8)])
def list_files(path, extension):
import os
def matches(f):
2017-01-07 08:28:02 +00:00
return os.path.splitext(f)[1] == '.' + extension and f.split(os.path.extsep)[1] != 'spec'
def join_path(dp, f):
# Remove the static/ prefix. It is added in the template.
return os.path.join(dp, f)[len('static/'):]
filepath = 'static/' + path
return [join_path(dp, f) for dp, dn, files in os.walk(filepath) for f in files if matches(f)]
@lru_cache(maxsize=1)
def _get_version_number():
try:
with open('CHANGELOG.md') as f:
return re.search('(v[0-9]+\.[0-9]+\.[0-9]+)', f.readline()).group(0)
except IOError:
return ''
def render_page_template(name, route_data=None, **kwargs):
debugging = app.config.get('DEBUGGING', False)
if debugging:
# If DEBUGGING is enabled, then we load the full set of individual JS and CSS files
# from the file system.
library_styles = list_files('lib', 'css')
main_styles = list_files('css', 'css')
library_scripts = list_files('lib', 'js')
# Ensure Webpack bundle is first script on page
main_scripts = ['build/bundle.js'] + list_files('js', 'js')
file_lists = [library_styles, main_styles, library_scripts, main_scripts]
for file_list in file_lists:
file_list.sort()
else:
library_styles = []
main_styles = ['dist/quay-frontend.css']
library_scripts = []
main_scripts = ['dist/quay-frontend.min.js']
use_cdn = app.config.get('USE_CDN', True)
if request.args.get('use_cdn') is not None:
use_cdn = request.args.get('use_cdn') == 'true'
external_styles = get_external_css(local=not use_cdn)
external_scripts = get_external_javascript(local=not use_cdn)
# Add Stripe checkout if billing is enabled.
if features.BILLING:
external_scripts.append('//checkout.stripe.com/checkout.js')
def add_cachebusters(filenames):
cachebusters = get_cache_busters()
for filename in filenames:
cache_buster = cachebusters.get(filename, random_string()) if not debugging else 'debugging'
yield (filename, cache_buster)
def get_external_login_config():
login_config = []
for login_service in oauth_login.services:
login_config.append({
'id': login_service.service_id(),
'title': login_service.service_name(),
'config': login_service.get_public_config(),
'icon': login_service.get_icon(),
})
return login_config
def get_oauth_config():
oauth_config = {}
for oauth_app in oauth_apps:
oauth_config[oauth_app.key_name] = oauth_app.get_public_config()
return oauth_config
contact_href = None
if len(app.config.get('CONTACT_INFO', [])) == 1:
contact_href = app.config['CONTACT_INFO'][0]
version_number = ''
if not features.BILLING:
version_number = ' - ' + _get_version_number()
resp = make_response(render_template(name,
route_data=route_data,
external_styles=external_styles,
external_scripts=external_scripts,
main_styles=add_cachebusters(main_styles),
library_styles=add_cachebusters(library_styles),
main_scripts=add_cachebusters(main_scripts),
library_scripts=add_cachebusters(library_scripts),
feature_set=features.get_features(),
config_set=frontend_visible_config(app.config),
oauth_set=get_oauth_config(),
external_login_set=get_external_login_config(),
scope_set=scopes.app_scopes(app.config),
vuln_priority_set=PRIORITY_LEVELS,
enterprise_logo=app.config.get('ENTERPRISE_LOGO_URL', ''),
mixpanel_key=app.config.get('MIXPANEL_KEY', ''),
2016-10-13 17:48:35 +00:00
munchkin_key=app.config.get('MARKETO_MUNCHKIN_ID', ''),
recaptcha_key=app.config.get('RECAPTCHA_SITE_KEY', ''),
2016-04-06 20:15:26 +00:00
google_tagmanager_key=app.config.get('GOOGLE_TAGMANAGER_KEY', ''),
2016-08-16 18:35:39 +00:00
google_anaytics_key=app.config.get('GOOGLE_ANALYTICS_KEY', ''),
2014-04-28 22:59:22 +00:00
sentry_public_dsn=app.config.get('SENTRY_PUBLIC_DSN', ''),
is_debug=str(app.config.get('DEBUGGING', False)).lower(),
2016-08-08 22:18:35 +00:00
show_chat=features.SUPPORT_CHAT,
aci_conversion=features.ACI_CONVERSION,
has_billing=features.BILLING,
contact_href=contact_href,
hostname=app.config['SERVER_HOSTNAME'],
preferred_scheme=app.config['PREFERRED_URL_SCHEME'],
version_number=version_number,
license_insufficient=license_validator.insufficient,
license_expiring=license_validator.expiring_soon,
**kwargs))
resp.headers['X-FRAME-OPTIONS'] = 'DENY'
return resp