This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/callbacks.py
2015-03-13 15:34:28 -07:00

279 lines
9.1 KiB
Python

import logging
import requests
from flask import request, redirect, url_for, Blueprint
from flask.ext.login import current_user
from endpoints.common import render_page_template, common_login, route_show_if
from app import app, analytics, get_app_url, github_login, google_login, github_trigger
from data import model
from util.names import parse_repository_name
from util.validation import generate_valid_usernames
from util.http import abort
from auth.permissions import AdministerRepositoryPermission
from auth.auth import require_session_login
from peewee import IntegrityError
import features
logger = logging.getLogger(__name__)
client = app.config['HTTPCLIENT']
callback = Blueprint('callback', __name__)
def render_ologin_error(service_name,
error_message='Could not load user data. The token may have expired.'):
return render_page_template('ologinerror.html', service_name=service_name,
error_message=error_message,
service_url=get_app_url(),
user_creation=features.USER_CREATION)
def exchange_code_for_token(code, service, form_encode=False, redirect_suffix=''):
code = request.args.get('code')
payload = {
'client_id': service.client_id(),
'client_secret': service.client_secret(),
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': '%s://%s/oauth2/%s/callback%s' % (app.config['PREFERRED_URL_SCHEME'],
app.config['SERVER_HOSTNAME'],
service.service_name().lower(),
redirect_suffix)
}
headers = {
'Accept': 'application/json'
}
token_url = service.token_endpoint()
if form_encode:
get_access_token = client.post(token_url, data=payload, headers=headers)
else:
get_access_token = client.post(token_url, params=payload, headers=headers)
json_data = get_access_token.json()
if not json_data:
return ''
token = json_data.get('access_token', '')
return token
def get_user(service, token):
token_param = {
'access_token': token,
'alt': 'json',
}
get_user = client.get(service.user_endpoint(), params=token_param)
if get_user.status_code != requests.codes.ok:
return {}
return get_user.json()
def conduct_oauth_login(service, user_id, username, email, metadata={}):
service_name = service.service_name()
to_login = model.verify_federated_login(service_name.lower(), user_id)
if not to_login:
# See if we can create a new user.
if not features.USER_CREATION:
error_message = 'User creation is disabled. Please contact your administrator'
return render_ologin_error(service_name, error_message)
# Try to create the user
try:
new_username = None
for valid in generate_valid_usernames(username):
if model.get_user_or_org(valid):
continue
new_username = valid
break
to_login = model.create_federated_user(new_username, email, service_name.lower(),
user_id, set_password_notification=True,
metadata=metadata)
# Success, tell analytics
analytics.track(to_login.username, 'register', {'service': service_name.lower()})
state = request.args.get('state', None)
if state:
logger.debug('Aliasing with state: %s' % state)
analytics.alias(to_login.username, state)
except model.InvalidEmailAddressException as ieex:
message = "The e-mail address %s is already associated " % (email, )
message = message + "with an existing %s account." % (app.config['REGISTRY_TITLE_SHORT'], )
message = message + "\nPlease log in with your username and password and "
message = message + "associate your %s account to use it in the future." % (service_name, )
return render_ologin_error(service_name, message)
except model.DataModelException as ex:
return render_ologin_error(service_name, ex.message)
if common_login(to_login):
return redirect(url_for('web.index'))
return render_ologin_error(service_name)
def get_google_username(user_data):
username = user_data['email']
at = username.find('@')
if at > 0:
username = username[0:at]
return username
@callback.route('/google/callback', methods=['GET'])
@route_show_if(features.GOOGLE_LOGIN)
def google_oauth_callback():
error = request.args.get('error', None)
if error:
return render_ologin_error('Google', error)
token = exchange_code_for_token(request.args.get('code'), google_login, form_encode=True)
user_data = get_user(google_login, token)
if not user_data or not user_data.get('id', None) or not user_data.get('email', None):
return render_ologin_error('Google')
username = get_google_username(user_data)
metadata = {
'service_username': user_data['email']
}
return conduct_oauth_login(google_login, user_data['id'], username, user_data['email'],
metadata=metadata)
@callback.route('/github/callback', methods=['GET'])
@route_show_if(features.GITHUB_LOGIN)
def github_oauth_callback():
error = request.args.get('error', None)
if error:
return render_ologin_error('GitHub', error)
token = exchange_code_for_token(request.args.get('code'), github_login)
user_data = get_user(github_login, token)
if not user_data or not 'login' in user_data:
return render_ologin_error('GitHub')
username = user_data['login']
github_id = user_data['id']
v3_media_type = {
'Accept': 'application/vnd.github.v3'
}
token_param = {
'access_token': token,
}
get_email = client.get(github_login.email_endpoint(), params=token_param,
headers=v3_media_type)
# We will accept any email, but we prefer the primary
found_email = None
for user_email in get_email.json():
found_email = user_email['email']
if user_email['primary']:
break
metadata = {
'service_username': username
}
return conduct_oauth_login(github_login, github_id, username, found_email, metadata=metadata)
@callback.route('/google/callback/attach', methods=['GET'])
@route_show_if(features.GOOGLE_LOGIN)
@require_session_login
def google_oauth_attach():
token = exchange_code_for_token(request.args.get('code'), google_login,
redirect_suffix='/attach', form_encode=True)
user_data = get_user(google_login, token)
if not user_data or not user_data.get('id', None):
return render_ologin_error('Google')
google_id = user_data['id']
user_obj = current_user.db_user()
username = get_google_username(user_data)
metadata = {
'service_username': user_data['email']
}
try:
model.attach_federated_login(user_obj, 'google', google_id, metadata=metadata)
except IntegrityError:
err = 'Google account %s is already attached to a %s account' % (
username, app.config['REGISTRY_TITLE_SHORT'])
return render_ologin_error('Google', err)
return redirect(url_for('web.user'))
@callback.route('/github/callback/attach', methods=['GET'])
@route_show_if(features.GITHUB_LOGIN)
@require_session_login
def github_oauth_attach():
token = exchange_code_for_token(request.args.get('code'), github_login)
user_data = get_user(github_login, token)
if not user_data:
return render_ologin_error('GitHub')
github_id = user_data['id']
user_obj = current_user.db_user()
username = user_data['login']
metadata = {
'service_username': username
}
try:
model.attach_federated_login(user_obj, 'github', github_id, metadata=metadata)
except IntegrityError:
err = 'Github account %s is already attached to a %s account' % (
username, app.config['REGISTRY_TITLE_SHORT'])
return render_ologin_error('GitHub', err)
return redirect(url_for('web.user'))
@callback.route('/github/callback/trigger/<path:repository>', methods=['GET'])
@callback.route('/github/callback/trigger/<path:repository>/__new', methods=['GET'])
@route_show_if(features.GITHUB_BUILD)
@require_session_login
@parse_repository_name
def attach_github_build_trigger(namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if permission.can():
token = exchange_code_for_token(request.args.get('code'), github_trigger)
repo = model.get_repository(namespace, repository)
if not repo:
msg = 'Invalid repository: %s/%s' % (namespace, repository)
abort(404, message=msg)
trigger = model.create_build_trigger(repo, 'github', token, current_user.db_user())
# TODO(jschorr): Remove once the new layout is in place.
admin_path = '%s/%s/%s' % (namespace, repository, 'admin')
full_url = '%s%s%s' % (url_for('web.repository', path=admin_path), '?tab=trigger&new_trigger=',
trigger.uuid)
if '__new' in request.url:
repo_path = '%s/%s' % (namespace, repository)
full_url = '%s%s%s' % (url_for('web.repository', path=repo_path), '?tab=builds&newtrigger=',
trigger.uuid)
logger.debug('Redirecting to full url: %s' % full_url)
return redirect(full_url)
abort(403)