import logging from flask import request, redirect, url_for, Blueprint from flask.ext.login import current_user from endpoints.common import render_page_template, common_login, route_show_if from app import app, analytics, get_app_url from data import model from util.names import parse_repository_name from util.validation import generate_valid_usernames from util.http import abort from auth.permissions import AdministerRepositoryPermission from auth.auth import require_session_login from peewee import IntegrityError import features logger = logging.getLogger(__name__) client = app.config['HTTPCLIENT'] callback = Blueprint('callback', __name__) def render_ologin_error(service_name, error_message='Could not load user data. The token may have expired.'): return render_page_template('ologinerror.html', service_name=service_name, error_message=error_message, service_url=get_app_url()) def exchange_code_for_token(code, service_name='GITHUB', for_login=True, form_encode=False, redirect_suffix=''): code = request.args.get('code') id_config = service_name + '_LOGIN_CLIENT_ID' if for_login else service_name + '_CLIENT_ID' secret_config = service_name + '_LOGIN_CLIENT_SECRET' if for_login else service_name + '_CLIENT_SECRET' payload = { 'client_id': app.config[id_config], 'client_secret': app.config[secret_config], 'code': code, 'grant_type': 'authorization_code', 'redirect_uri': '%s://%s/oauth2/%s/callback%s' % (app.config['PREFERRED_URL_SCHEME'], app.config['SERVER_HOSTNAME'], service_name.lower(), redirect_suffix) } headers = { 'Accept': 'application/json' } if form_encode: get_access_token = client.post(app.config[service_name + '_TOKEN_URL'], data=payload, headers=headers) else: get_access_token = client.post(app.config[service_name + '_TOKEN_URL'], params=payload, headers=headers) json_data = get_access_token.json() if not json_data: return '' token = json_data.get('access_token', '') return token def get_github_user(token): token_param = { 'access_token': token, } get_user = client.get(app.config['GITHUB_USER_URL'], params=token_param) return get_user.json() def get_google_user(token): token_param = { 'access_token': token, 'alt': 'json', } get_user = client.get(app.config['GOOGLE_USER_URL'], params=token_param) return get_user.json() def conduct_oauth_login(service_name, user_id, username, email, metadata={}): to_login = model.verify_federated_login(service_name.lower(), user_id) if not to_login: # try to create the user try: valid = next(generate_valid_usernames(username)) to_login = model.create_federated_user(valid, email, service_name.lower(), user_id, set_password_notification=True, metadata=metadata) # Success, tell analytics analytics.track(to_login.username, 'register', {'service': service_name.lower()}) state = request.args.get('state', None) if state: logger.debug('Aliasing with state: %s' % state) analytics.alias(to_login.username, state) except model.DataModelException, ex: return render_ologin_error(service_name, ex.message) if common_login(to_login): return redirect(url_for('web.index')) return render_ologin_error(service_name) def get_google_username(user_data): username = user_data['email'] at = username.find('@') if at > 0: username = username[0:at] return username @callback.route('/google/callback', methods=['GET']) @route_show_if(features.GOOGLE_LOGIN) def google_oauth_callback(): error = request.args.get('error', None) if error: return render_ologin_error('Google', error) token = exchange_code_for_token(request.args.get('code'), service_name='GOOGLE', form_encode=True) user_data = get_google_user(token) if not user_data or not user_data.get('id', None) or not user_data.get('email', None): return render_ologin_error('Google') username = get_google_username(user_data) metadata = { 'service_username': user_data['email'] } return conduct_oauth_login('Google', user_data['id'], username, user_data['email'], metadata=metadata) @callback.route('/github/callback', methods=['GET']) @route_show_if(features.GITHUB_LOGIN) def github_oauth_callback(): error = request.args.get('error', None) if error: return render_ologin_error('GitHub', error) token = exchange_code_for_token(request.args.get('code'), service_name='GITHUB') user_data = get_github_user(token) if not user_data: return render_ologin_error('GitHub') username = user_data['login'] github_id = user_data['id'] v3_media_type = { 'Accept': 'application/vnd.github.v3' } token_param = { 'access_token': token, } get_email = client.get(app.config['GITHUB_USER_EMAILS'], params=token_param, headers=v3_media_type) # We will accept any email, but we prefer the primary found_email = None for user_email in get_email.json(): found_email = user_email['email'] if user_email['primary']: break metadata = { 'service_username': username } return conduct_oauth_login('github', github_id, username, found_email, metadata=metadata) @callback.route('/google/callback/attach', methods=['GET']) @route_show_if(features.GOOGLE_LOGIN) @require_session_login def google_oauth_attach(): token = exchange_code_for_token(request.args.get('code'), service_name='GOOGLE', redirect_suffix='/attach', form_encode=True) user_data = get_google_user(token) if not user_data or not user_data.get('id', None): return render_ologin_error('Google') google_id = user_data['id'] user_obj = current_user.db_user() username = get_google_username(user_data) metadata = { 'service_username': user_data['email'] } try: model.attach_federated_login(user_obj, 'google', google_id, metadata=metadata) except IntegrityError: err = 'Google account %s is already attached to a %s account' % ( username, app.config['REGISTRY_TITLE_SHORT']) return render_ologin_error('Google', err) return redirect(url_for('web.user')) @callback.route('/github/callback/attach', methods=['GET']) @route_show_if(features.GITHUB_LOGIN) @require_session_login def github_oauth_attach(): token = exchange_code_for_token(request.args.get('code'), service_name='GITHUB') user_data = get_github_user(token) if not user_data: return render_ologin_error('GitHub') github_id = user_data['id'] user_obj = current_user.db_user() username = user_data['login'] metadata = { 'service_username': username } try: model.attach_federated_login(user_obj, 'github', github_id, metadata=metadata) except IntegrityError: err = 'Github account %s is already attached to a %s account' % ( username, app.config['REGISTRY_TITLE_SHORT']) return render_ologin_error('GitHub', err) return redirect(url_for('web.user')) @callback.route('/github/callback/trigger/', methods=['GET']) @route_show_if(features.GITHUB_BUILD) @require_session_login @parse_repository_name def attach_github_build_trigger(namespace, repository): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): token = exchange_code_for_token(request.args.get('code'), service_name='GITHUB', for_login=False) repo = model.get_repository(namespace, repository) if not repo: msg = 'Invalid repository: %s/%s' % (namespace, repository) abort(404, message=msg) trigger = model.create_build_trigger(repo, 'github', token, current_user.db_user()) admin_path = '%s/%s/%s' % (namespace, repository, 'admin') full_url = '%s%s%s' % (url_for('web.repository', path=admin_path), '?tab=trigger&new_trigger=', trigger.uuid) logger.debug('Redirecting to full url: %s' % full_url) return redirect(full_url) abort(403)