This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/common.py

111 lines
No EOL
2.7 KiB
Python

import logging
import os
import base64
from flask import session, make_response, render_template
from flask.ext.login import login_user, UserMixin
from flask.ext.principal import identity_changed
from data import model
from app import app, login_manager
from auth.permissions import QuayDeferredPermissionUser
logger = logging.getLogger(__name__)
route_data = None
def get_route_data():
global route_data
if route_data:
return route_data
routes = []
for rule in app.url_map.iter_rules():
if rule.endpoint.startswith('api.'):
endpoint_method = app.view_functions[rule.endpoint]
is_internal = '__internal_call' in dir(endpoint_method)
is_org_api = '__user_call' in dir(endpoint_method)
methods = list(rule.methods.difference(['HEAD', 'OPTIONS']))
route = {
'name': rule.endpoint[4:],
'methods': methods,
'path': rule.rule,
'parameters': list(rule.arguments)
}
if is_org_api:
route['user_method'] = endpoint_method.__user_call
routes.append(route)
route_data = {
'endpoints': routes
}
return route_data
@login_manager.user_loader
def load_user(username):
logger.debug('Loading user: %s' % username)
return _LoginWrappedDBUser(username)
class _LoginWrappedDBUser(UserMixin):
def __init__(self, db_username, db_user=None):
self._db_username = db_username
self._db_user = db_user
def db_user(self):
if not self._db_user:
self._db_user = model.get_user(self._db_username)
return self._db_user
def is_authenticated(self):
return self.db_user() is not None
def is_active(self):
return self.db_user().verified
def get_id(self):
return unicode(self._db_username)
def common_login(db_user):
if login_user(_LoginWrappedDBUser(db_user.username, db_user)):
logger.debug('Successfully signed in as: %s' % db_user.username)
new_identity = QuayDeferredPermissionUser(db_user.username, 'username')
identity_changed.send(app, identity=new_identity)
return True
else:
logger.debug('User could not be logged in, inactive?.')
return False
@app.errorhandler(model.DataModelException)
def handle_dme(ex):
return make_response(ex.message, 400)
@app.errorhandler(KeyError)
def handle_dme_key_error(ex):
return make_response(ex.message, 400)
def generate_csrf_token():
if '_csrf_token' not in session:
session['_csrf_token'] = base64.b64encode(os.urandom(48))
return session['_csrf_token']
app.jinja_env.globals['csrf_token'] = generate_csrf_token
def render_page_template(name, **kwargs):
resp = make_response(render_template(name, route_data=get_route_data(),
**kwargs))
resp.headers['X-FRAME-OPTIONS'] = 'DENY'
return resp