This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/web.py
Joseph Schorr 98e57b9d2b Merge branch 'master' into tutorial
Conflicts:
	endpoints/index.py
	static/css/quay.css
	static/js/app.js
	static/js/controllers.js
	test/data/test.db
2014-02-06 21:23:27 -05:00

299 lines
6.8 KiB
Python

import logging
import requests
import stripe
from flask import (abort, redirect, request, url_for, render_template,
make_response, Response, Blueprint)
from flask.ext.login import login_required, current_user
from urlparse import urlparse
from data import model
from app import app, mixpanel
from auth.permissions import AdministerOrganizationPermission
from util.invoice import renderInvoiceToPdf
from util.seo import render_snapshot
from util.cache import no_cache
from endpoints.api import get_route_data
from endpoints.common import common_login
logger = logging.getLogger(__name__)
web = Blueprint('web', __name__)
def render_page_template(name, **kwargs):
resp = make_response(render_template(name, route_data=get_route_data(),
**kwargs))
resp.headers['X-FRAME-OPTIONS'] = 'DENY'
return resp
@web.route('/', methods=['GET'], defaults={'path': ''})
@web.route('/repository/<path:path>', methods=['GET'])
@web.route('/organization/<path:path>', methods=['GET'])
@no_cache
def index(path):
return render_page_template('index.html')
@web.route('/snapshot', methods=['GET'])
@web.route('/snapshot/', methods=['GET'])
@web.route('/snapshot/<path:path>', methods=['GET'])
def snapshot(path = ''):
parsed = urlparse(request.url)
final_url = '%s://%s/%s' % (parsed.scheme, 'localhost', path)
result = render_snapshot(final_url)
if result:
return result
abort(404)
@web.route('/plans/')
@no_cache
def plans():
return index('')
@web.route('/guide/')
@no_cache
def guide():
return index('')
@web.route('/tutorial/')
@no_cache
def tutorial():
return index('')
@web.route('/organizations/')
@web.route('/organizations/new/')
@no_cache
def organizations():
return index('')
@web.route('/user/')
@no_cache
def user():
return index('')
@web.route('/signin/')
@no_cache
def signin():
return index('')
@web.route('/contact/')
@no_cache
def contact():
return index('')
@web.route('/about/')
@no_cache
def about():
return index('')
@web.route('/new/')
@no_cache
def new():
return index('')
@web.route('/repository/')
@no_cache
def repository():
return index('')
@web.route('/security/')
@no_cache
def security():
return index('')
@web.route('/v1')
@web.route('/v1/')
@no_cache
def v1():
return index('')
@web.route('/status', methods=['GET'])
@no_cache
def status():
return make_response('Healthy')
@web.route('/tos', methods=['GET'])
@no_cache
def tos():
return render_page_template('tos.html')
@web.route('/disclaimer', methods=['GET'])
@no_cache
def disclaimer():
return render_page_template('disclaimer.html')
@web.route('/privacy', methods=['GET'])
@no_cache
def privacy():
return render_page_template('privacy.html')
@web.route('/receipt', methods=['GET'])
def receipt():
if not current_user.is_authenticated():
abort(401)
return
id = request.args.get('id')
if id:
invoice = stripe.Invoice.retrieve(id)
if invoice:
user_or_org = model.get_user_or_org_by_customer_id(invoice.customer)
if user_or_org:
if user_or_org.organization:
admin_org = AdministerOrganizationPermission(user_or_org.username)
if not admin_org.can():
abort(404)
return
else:
if not user_or_org.username == current_user.db_user().username:
abort(404)
return
file_data = renderInvoiceToPdf(invoice, user_or_org)
return Response(file_data,
mimetype="application/pdf",
headers={"Content-Disposition": "attachment;filename=receipt.pdf"})
abort(404)
def exchange_github_code_for_token(code):
code = request.args.get('code')
payload = {
'client_id': app.config['GITHUB_CLIENT_ID'],
'client_secret': app.config['GITHUB_CLIENT_SECRET'],
'code': code,
}
headers = {
'Accept': 'application/json'
}
get_access_token = requests.post(app.config['GITHUB_TOKEN_URL'],
params=payload, headers=headers)
token = get_access_token.json()['access_token']
return token
def get_github_user(token):
token_param = {
'access_token': token,
}
get_user = requests.get(app.config['GITHUB_USER_URL'], params=token_param)
return get_user.json()
@web.route('/oauth2/github/callback', methods=['GET'])
def github_oauth_callback():
error = request.args.get('error', None)
if error:
return render_page_template('githuberror.html', error_message=error)
token = exchange_github_code_for_token(request.args.get('code'))
user_data = get_github_user(token)
username = user_data['login']
github_id = user_data['id']
v3_media_type = {
'Accept': 'application/vnd.github.v3'
}
token_param = {
'access_token': token,
}
get_email = requests.get(app.config['GITHUB_USER_EMAILS'],
params=token_param, headers=v3_media_type)
# We will accept any email, but we prefer the primary
found_email = None
for user_email in get_email.json():
found_email = user_email['email']
if user_email['primary']:
break
to_login = model.verify_federated_login('github', github_id)
if not to_login:
# try to create the user
try:
to_login = model.create_federated_user(username, found_email, 'github',
github_id)
# Success, tell mixpanel
mixpanel.track(to_login.username, 'register', {'service': 'github'})
state = request.args.get('state', None)
if state:
logger.debug('Aliasing with state: %s' % state)
mixpanel.alias(to_login.username, state)
except model.DataModelException, ex:
return render_page_template('githuberror.html', error_message=ex.message)
if common_login(to_login):
return redirect(url_for('web.index'))
return render_page_template('githuberror.html')
@web.route('/oauth2/github/callback/attach', methods=['GET'])
@login_required
def github_oauth_attach():
token = exchange_github_code_for_token(request.args.get('code'))
user_data = get_github_user(token)
github_id = user_data['id']
user_obj = current_user.db_user()
model.attach_federated_login(user_obj, 'github', github_id)
return redirect(url_for('web.user'))
@web.route('/confirm', methods=['GET'])
def confirm_email():
code = request.values['code']
user = None
new_email = None
try:
user, new_email = model.confirm_user_email(code)
except model.DataModelException as ex:
return render_page_template('confirmerror.html', error_message=ex.message)
common_login(user)
return redirect(url_for('web.user', tab='email')
if new_email else url_for('web.index'))
@web.route('/recovery', methods=['GET'])
def confirm_recovery():
code = request.values['code']
user = model.validate_reset_code(code)
if user:
common_login(user)
return redirect(url_for('web.user'))
else:
abort(403)