230 lines
		
	
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			230 lines
		
	
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import json
 | |
| import string
 | |
| import datetime
 | |
| import os
 | |
| import re
 | |
| 
 | |
| # Register the various exceptions via decorators.
 | |
| import endpoints.decorated
 | |
| 
 | |
| from flask import make_response, render_template, request, abort, session
 | |
| from flask.ext.login import login_user
 | |
| from flask.ext.principal import identity_changed
 | |
| from random import SystemRandom
 | |
| from cachetools import lru_cache
 | |
| 
 | |
| from data import model
 | |
| from app import app, oauth_apps, LoginWrappedDBUser
 | |
| 
 | |
| from auth.permissions import QuayDeferredPermissionUser
 | |
| from auth import scopes
 | |
| from endpoints.api.discovery import swagger_route_data
 | |
| from werkzeug.routing import BaseConverter
 | |
| from functools import wraps
 | |
| from config import frontend_visible_config
 | |
| from external_libraries import get_external_javascript, get_external_css
 | |
| from util.secscan.api import PRIORITY_LEVELS
 | |
| 
 | |
| import features
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| route_data = None
 | |
| 
 | |
| CACHE_BUSTERS_JSON = 'static/dist/cachebusters.json'
 | |
| CACHE_BUSTERS = None
 | |
| 
 | |
| def get_cache_busters():
 | |
|   """ Retrieves the cache busters hashes. """
 | |
|   global CACHE_BUSTERS
 | |
|   if CACHE_BUSTERS is not None:
 | |
|     return CACHE_BUSTERS
 | |
| 
 | |
|   if not os.path.exists(CACHE_BUSTERS_JSON):
 | |
|     return {}
 | |
| 
 | |
|   with open(CACHE_BUSTERS_JSON, 'r') as f:
 | |
|     CACHE_BUSTERS = json.loads(f.read())
 | |
|     return CACHE_BUSTERS
 | |
| 
 | |
| 
 | |
| class RepoPathConverter(BaseConverter):
 | |
|   regex = '[\.a-zA-Z0-9_\-]+/[\.a-zA-Z0-9_\-]+'
 | |
|   weight = 200
 | |
| 
 | |
| app.url_map.converters['repopath'] = RepoPathConverter
 | |
| 
 | |
| def route_show_if(value):
 | |
|   def decorator(f):
 | |
|     @wraps(f)
 | |
|     def decorated_function(*args, **kwargs):
 | |
|       if not value:
 | |
|         abort(404)
 | |
| 
 | |
|       return f(*args, **kwargs)
 | |
|     return decorated_function
 | |
|   return decorator
 | |
| 
 | |
| 
 | |
| def route_hide_if(value):
 | |
|   def decorator(f):
 | |
|     @wraps(f)
 | |
|     def decorated_function(*args, **kwargs):
 | |
|       if value:
 | |
|         abort(404)
 | |
| 
 | |
|       return f(*args, **kwargs)
 | |
|     return decorated_function
 | |
|   return decorator
 | |
| 
 | |
| 
 | |
| def get_route_data():
 | |
|   global route_data
 | |
|   if route_data:
 | |
|     return route_data
 | |
| 
 | |
|   route_data = swagger_route_data(include_internal=True, compact=True)
 | |
|   return route_data
 | |
| 
 | |
| 
 | |
| def truthy_param(param):
 | |
|   return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'}
 | |
| 
 | |
| 
 | |
| def param_required(param_name, allow_body=False):
 | |
|   def wrapper(wrapped):
 | |
|     @wraps(wrapped)
 | |
|     def decorated(*args, **kwargs):
 | |
|       if param_name not in request.args:
 | |
|         if not allow_body or param_name not in request.values:
 | |
|           abort(make_response('Required param: %s' % param_name, 400))
 | |
|       return wrapped(*args, **kwargs)
 | |
|     return decorated
 | |
|   return wrapper
 | |
| 
 | |
| 
 | |
| def common_login(db_user):
 | |
|   if login_user(LoginWrappedDBUser(db_user.uuid, db_user)):
 | |
|     logger.debug('Successfully signed in as: %s (%s)' % (db_user.username, db_user.uuid))
 | |
|     new_identity = QuayDeferredPermissionUser.for_user(db_user)
 | |
|     identity_changed.send(app, identity=new_identity)
 | |
|     session['login_time'] = datetime.datetime.now()
 | |
|     return True
 | |
|   else:
 | |
|     logger.debug('User could not be logged in, inactive?.')
 | |
|     return False
 | |
| 
 | |
| def random_string():
 | |
|   random = SystemRandom()
 | |
|   return ''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(8)])
 | |
| 
 | |
| def list_files(path, extension):
 | |
|   import os
 | |
|   def matches(f):
 | |
|     return os.path.splitext(f)[1] == '.' + extension
 | |
| 
 | |
|   def join_path(dp, f):
 | |
|     # Remove the static/ prefix. It is added in the template.
 | |
|     return os.path.join(dp, f)[len('static/'):]
 | |
| 
 | |
|   filepath = 'static/' + path
 | |
|   return [join_path(dp, f) for dp, dn, files in os.walk(filepath) for f in files if matches(f)]
 | |
| 
 | |
| @lru_cache(maxsize=1)
 | |
| def _get_version_number():
 | |
|   try:
 | |
|     with open('CHANGELOG.md') as f:
 | |
|       return re.search('(v[0-9]+\.[0-9]+\.[0-9]+)', f.readline()).group(0)
 | |
|   except IOError:
 | |
|     return ''
 | |
| 
 | |
| def render_page_template(name, **kwargs):
 | |
|   debugging = app.config.get('DEBUGGING', False)
 | |
|   if debugging:
 | |
|     # If DEBUGGING is enabled, then we load the full set of individual JS and CSS files
 | |
|     # from the file system.
 | |
|     library_styles = list_files('lib', 'css')
 | |
|     main_styles = list_files('css', 'css')
 | |
|     library_scripts = list_files('lib', 'js')
 | |
|     main_scripts = list_files('js', 'js')
 | |
| 
 | |
|     file_lists = [library_styles, main_styles, library_scripts, main_scripts]
 | |
|     for file_list in file_lists:
 | |
|       file_list.sort()
 | |
|   else:
 | |
|     library_styles = []
 | |
|     main_styles = ['dist/quay-frontend.css']
 | |
|     library_scripts = []
 | |
|     main_scripts = ['dist/quay-frontend.min.js']
 | |
| 
 | |
|   use_cdn = app.config.get('USE_CDN', True)
 | |
|   if request.args.get('use_cdn') is not None:
 | |
|     use_cdn = request.args.get('use_cdn') == 'true'
 | |
| 
 | |
|   external_styles = get_external_css(local=not use_cdn)
 | |
|   external_scripts = get_external_javascript(local=not use_cdn)
 | |
| 
 | |
|   def add_cachebusters(filenames):
 | |
|     cachebusters = get_cache_busters()
 | |
|     for filename in filenames:
 | |
|       cache_buster = cachebusters.get(filename, random_string()) if not debugging else 'debugging'
 | |
|       yield (filename, cache_buster)
 | |
| 
 | |
|   def get_oauth_config():
 | |
|     oauth_config = {}
 | |
|     for oauth_app in oauth_apps:
 | |
|       oauth_config[oauth_app.key_name] = oauth_app.get_public_config()
 | |
| 
 | |
|     return oauth_config
 | |
| 
 | |
|   contact_href = None
 | |
|   if len(app.config.get('CONTACT_INFO', [])) == 1:
 | |
|     contact_href = app.config['CONTACT_INFO'][0]
 | |
| 
 | |
|   version_number = ''
 | |
|   if not features.BILLING:
 | |
|     version_number = ' - ' + _get_version_number()
 | |
| 
 | |
|   resp = make_response(render_template(name,
 | |
|                                        route_data=json.dumps(get_route_data()),
 | |
|                                        external_styles=external_styles,
 | |
|                                        external_scripts=external_scripts,
 | |
|                                        main_styles=add_cachebusters(main_styles),
 | |
|                                        library_styles=add_cachebusters(library_styles),
 | |
|                                        main_scripts=add_cachebusters(main_scripts),
 | |
|                                        library_scripts=add_cachebusters(library_scripts),
 | |
|                                        feature_set=json.dumps(features.get_features()),
 | |
|                                        config_set=json.dumps(frontend_visible_config(app.config)),
 | |
|                                        oauth_set=json.dumps(get_oauth_config()),
 | |
|                                        scope_set=json.dumps(scopes.app_scopes(app.config)),
 | |
|                                        vuln_priority_set=json.dumps(PRIORITY_LEVELS),
 | |
|                                        mixpanel_key=app.config.get('MIXPANEL_KEY', ''),
 | |
|                                        google_analytics_key=app.config.get('GOOGLE_ANALYTICS_KEY', ''),
 | |
|                                        sentry_public_dsn=app.config.get('SENTRY_PUBLIC_DSN', ''),
 | |
|                                        is_debug=str(app.config.get('DEBUGGING', False)).lower(),
 | |
|                                        show_chat=features.OLARK_CHAT,
 | |
|                                        has_billing=features.BILLING,
 | |
|                                        contact_href=contact_href,
 | |
|                                        hostname=app.config['SERVER_HOSTNAME'],
 | |
|                                        preferred_scheme=app.config['PREFERRED_URL_SCHEME'],
 | |
|                                        version_number=version_number,
 | |
|                                         **kwargs))
 | |
| 
 | |
|   resp.headers['X-FRAME-OPTIONS'] = 'DENY'
 | |
|   return resp
 | |
| 
 | |
| 
 | |
| def check_repository_usage(user_or_org, plan_found):
 | |
|   private_repos = model.user.get_private_repo_count(user_or_org.username)
 | |
|   if plan_found is None:
 | |
|     repos_allowed = 0
 | |
|   else:
 | |
|     repos_allowed = plan_found['privateRepos']
 | |
| 
 | |
|   if private_repos > repos_allowed:
 | |
|     model.notification.create_unique_notification('over_private_usage', user_or_org,
 | |
|                                            {'namespace': user_or_org.username})
 | |
|   else:
 | |
|     model.notification.delete_notifications_by_kind(user_or_org, 'over_private_usage')
 | |
| 
 |