This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/callbacks.py

141 lines
4.3 KiB
Python
Raw Normal View History

import logging
from flask import request, redirect, url_for, Blueprint
from flask.ext.login import current_user
from endpoints.common import render_page_template, common_login, route_show_if
from app import app, analytics
from data import model
from util.names import parse_repository_name
from util.http import abort
from auth.permissions import AdministerRepositoryPermission
from auth.auth import require_session_login
import features
logger = logging.getLogger(__name__)
client = app.config['HTTPCLIENT']
callback = Blueprint('callback', __name__)
def exchange_github_code_for_token(code, for_login=True):
code = request.args.get('code')
payload = {
'client_id': app.config['GITHUB_LOGIN_CLIENT_ID' if for_login else 'GITHUB_CLIENT_ID'],
'client_secret': app.config['GITHUB_LOGIN_CLIENT_SECRET' if for_login else 'GITHUB_CLIENT_SECRET'],
'code': code,
}
headers = {
'Accept': 'application/json'
}
get_access_token = client.post(app.config['GITHUB_TOKEN_URL'],
params=payload, headers=headers)
json_data = get_access_token.json()
if not json_data:
return ''
token = json_data.get('access_token', '')
return token
def get_github_user(token):
token_param = {
'access_token': token,
}
get_user = client.get(app.config['GITHUB_USER_URL'], params=token_param)
return get_user.json()
@callback.route('/github/callback', methods=['GET'])
@route_show_if(features.GITHUB_LOGIN)
def github_oauth_callback():
error = request.args.get('error', None)
if error:
return render_page_template('githuberror.html', error_message=error)
token = exchange_github_code_for_token(request.args.get('code'))
user_data = get_github_user(token)
username = user_data['login']
github_id = user_data['id']
v3_media_type = {
'Accept': 'application/vnd.github.v3'
}
token_param = {
'access_token': token,
}
get_email = client.get(app.config['GITHUB_USER_EMAILS'], params=token_param,
headers=v3_media_type)
# We will accept any email, but we prefer the primary
found_email = None
for user_email in get_email.json():
found_email = user_email['email']
if user_email['primary']:
break
to_login = model.verify_federated_login('github', github_id)
if not to_login:
# try to create the user
try:
to_login = model.create_federated_user(username, found_email, 'github',
github_id)
# Success, tell analytics
analytics.track(to_login.username, 'register', {'service': 'github'})
state = request.args.get('state', None)
if state:
logger.debug('Aliasing with state: %s' % state)
analytics.alias(to_login.username, state)
except model.DataModelException, ex:
return render_page_template('githuberror.html', error_message=ex.message)
if common_login(to_login):
return redirect(url_for('web.index'))
return render_page_template('githuberror.html')
@callback.route('/github/callback/attach', methods=['GET'])
@route_show_if(features.GITHUB_LOGIN)
@require_session_login
def github_oauth_attach():
token = exchange_github_code_for_token(request.args.get('code'))
user_data = get_github_user(token)
github_id = user_data['id']
user_obj = current_user.db_user()
model.attach_federated_login(user_obj, 'github', github_id)
return redirect(url_for('web.user'))
@callback.route('/github/callback/trigger/<path:repository>', methods=['GET'])
@require_session_login
@parse_repository_name
def attach_github_build_trigger(namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
token = exchange_github_code_for_token(request.args.get('code'), for_login=False)
repo = model.get_repository(namespace, repository)
if not repo:
msg = 'Invalid repository: %s/%s' % (namespace, repository)
abort(404, message=msg)
trigger = model.create_build_trigger(repo, 'github', token, current_user.db_user())
admin_path = '%s/%s/%s' % (namespace, repository, 'admin')
full_url = '%s%s%s' % (url_for('web.repository', path=admin_path), '?tab=trigger&new_trigger=',
trigger.uuid)
logger.debug('Redirecting to full url: %s' % full_url)
return redirect(full_url)
abort(403)