This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/web.py

305 lines
7.5 KiB
Python

import logging
import stripe
import os
from flask import (abort, redirect, request, url_for, make_response, Response,
Blueprint)
from flask.ext.login import current_user
from urlparse import urlparse
from data import model
from data.model.oauth import DatabaseAuthorizationProvider
from app import app
from auth.permissions import AdministerOrganizationPermission
from util.invoice import renderInvoiceToPdf
from util.seo import render_snapshot
from util.cache import no_cache
from endpoints.common import common_login, render_page_template, generate_csrf_token
from util.names import parse_repository_name
from util.gravatar import compute_hash
from auth import scopes
logger = logging.getLogger(__name__)
web = Blueprint('web', __name__)
STATUS_TAGS = app.config['STATUS_TAGS']
@web.route('/', methods=['GET'], defaults={'path': ''})
@web.route('/organization/<path:path>', methods=['GET'])
@no_cache
def index(path):
return render_page_template('index.html')
@web.route('/snapshot', methods=['GET'])
@web.route('/snapshot/', methods=['GET'])
@web.route('/snapshot/<path:path>', methods=['GET'])
def snapshot(path = ''):
parsed = urlparse(request.url)
final_url = '%s://%s/%s' % (parsed.scheme, 'localhost', path)
result = render_snapshot(final_url)
if result:
return result
abort(404)
@web.route('/plans/')
@no_cache
def plans():
return index('')
@web.route('/guide/')
@no_cache
def guide():
return index('')
@web.route('/tutorial/')
@no_cache
def tutorial():
return index('')
@web.route('/organizations/')
@web.route('/organizations/new/')
@no_cache
def organizations():
return index('')
@web.route('/user/')
@no_cache
def user():
return index('')
@web.route('/signin/')
@no_cache
def signin():
return index('')
@web.route('/contact/')
@no_cache
def contact():
return index('')
@web.route('/about/')
@no_cache
def about():
return index('')
@web.route('/new/')
@no_cache
def new():
return index('')
@web.route('/repository/', defaults={'path': ''})
@web.route('/repository/<path:path>', methods=['GET'])
@no_cache
def repository(path):
return index('')
@web.route('/security/')
@no_cache
def security():
return index('')
@web.route('/v1')
@web.route('/v1/')
@no_cache
def v1():
return index('')
@web.route('/status', methods=['GET'])
@no_cache
def status():
return make_response('Healthy')
@web.route('/tos', methods=['GET'])
@no_cache
def tos():
return render_page_template('tos.html')
@web.route('/disclaimer', methods=['GET'])
@no_cache
def disclaimer():
return render_page_template('disclaimer.html')
@web.route('/privacy', methods=['GET'])
@no_cache
def privacy():
return render_page_template('privacy.html')
@web.route('/receipt', methods=['GET'])
def receipt():
if not current_user.is_authenticated():
abort(401)
return
id = request.args.get('id')
if id:
invoice = stripe.Invoice.retrieve(id)
if invoice:
user_or_org = model.get_user_or_org_by_customer_id(invoice.customer)
if user_or_org:
if user_or_org.organization:
admin_org = AdministerOrganizationPermission(user_or_org.username)
if not admin_org.can():
abort(404)
return
else:
if not user_or_org.username == current_user.db_user().username:
abort(404)
return
file_data = renderInvoiceToPdf(invoice, user_or_org)
return Response(file_data,
mimetype="application/pdf",
headers={"Content-Disposition": "attachment;filename=receipt.pdf"})
abort(404)
@web.route('/confirm', methods=['GET'])
def confirm_email():
code = request.values['code']
user = None
new_email = None
try:
user, new_email = model.confirm_user_email(code)
except model.DataModelException as ex:
return render_page_template('confirmerror.html', error_message=ex.message)
common_login(user)
return redirect(url_for('web.user', tab='email')
if new_email else url_for('web.index'))
@web.route('/recovery', methods=['GET'])
def confirm_recovery():
code = request.values['code']
user = model.validate_reset_code(code)
if user:
common_login(user)
return redirect(url_for('web.user'))
else:
abort(403)
@web.route('/repository/<path:repository>/status', methods=['GET'])
@parse_repository_name
@no_cache
def build_status_badge(namespace, repository):
token = request.args.get('token', None)
is_public = model.repository_is_public(namespace, repository)
if not is_public:
repo = model.get_repository(namespace, repository)
if not repo or token != repo.badge_token:
abort(404)
# Lookup the tags for the repository.
tags = model.list_repository_tags(namespace, repository)
is_empty = len(list(tags)) == 0
build = model.get_recent_repository_build(namespace, repository)
if not is_empty and (not build or build.phase == 'complete'):
status_name = 'ready'
elif build and build.phase == 'error':
status_name = 'failed'
elif build and build.phase != 'complete':
status_name = 'building'
else:
status_name = 'none'
response = make_response(STATUS_TAGS[status_name])
response.content_type = 'image/svg+xml'
return response
class FlaskAuthorizationProvider(DatabaseAuthorizationProvider):
def get_authorized_user(self):
return current_user.db_user()
def _make_response(self, body='', headers=None, status_code=200):
return make_response(body, status_code, headers)
@web.route('/oauth/authorizeapp', methods=['POST'])
def authorize_application():
if not current_user.is_authenticated():
abort(401)
return
provider = FlaskAuthorizationProvider()
client_id = request.form.get('client_id', None)
redirect_uri = request.form.get('redirect_uri', None)
scope = request.form.get('scope', None)
csrf = request.form.get('csrf', None)
# Verify the csrf token.
if csrf != generate_csrf_token():
abort(404)
return
# Add the access token.
return provider.get_token_response('token', client_id, redirect_uri, scope=scope)
@web.route('/oauth/authorize', methods=['GET'])
@no_cache
def request_authorization_code():
provider = FlaskAuthorizationProvider()
response_type = request.args.get('response_type', 'code')
client_id = request.args.get('client_id', None)
redirect_uri = request.args.get('redirect_uri', None)
scope = request.args.get('scope', None)
if not provider.validate_has_scopes(client_id, scope):
if not provider.validate_redirect_uri(client_id, redirect_uri):
abort(404)
return
# Load the scope information.
scope_info = scopes.get_scope_information(scope)
if not scope_info:
abort(404)
return
# Load the application information.
oauth_app = provider.get_application_for_client_id(client_id)
oauth_app_view = {
'name': oauth_app.name,
'description': oauth_app.description,
'url': oauth_app.application_uri,
'organization': {
'name': oauth_app.organization.username,
'gravatar': compute_hash(oauth_app.organization.email)
}
}
# Show the authorization page.
return render_page_template('oauthorize.html', scopes=scope_info, application=oauth_app_view,
enumerate=enumerate, client_id=client_id, redirect_uri=redirect_uri,
scope=scope, csrf_token_val=generate_csrf_token())
if response_type == 'token':
return provider.get_token_response(response_type, client_id, redirect_uri, scope=scope)
else:
return provider.get_authorization_code(response_type, client_id, redirect_uri, scope=scope)