import logging import requests from flask import request, redirect, url_for, Blueprint from flask_login import current_user from peewee import IntegrityError import features from app import app, analytics, get_app_url, github_login, google_login, dex_login from auth.process import require_session_login from data import model from endpoints.common import common_login, route_show_if from endpoints.web import render_page_template_with_routedata from util.security.jwtutil import decode, InvalidTokenError from util.validation import generate_valid_usernames logger = logging.getLogger(__name__) client = app.config['HTTPCLIENT'] oauthlogin = Blueprint('oauthlogin', __name__) def render_ologin_error(service_name, error_message='Could not load user data. The token may have expired.'): user_creation = features.USER_CREATION and features.DIRECT_LOGIN return render_page_template_with_routedata('ologinerror.html', service_name=service_name, error_message=error_message, service_url=get_app_url(), user_creation=user_creation) def get_user(service, token): token_param = { 'access_token': token, 'alt': 'json', } got_user = client.get(service.user_endpoint(), params=token_param) if got_user.status_code != requests.codes.ok: return {} return got_user.json() def conduct_oauth_login(service, user_id, username, email, metadata={}): service_name = service.service_name() to_login = model.user.verify_federated_login(service_name.lower(), user_id) if not to_login: # See if we can create a new user. if not features.USER_CREATION: error_message = 'User creation is disabled. Please contact your administrator' return render_ologin_error(service_name, error_message) # Try to create the user try: new_username = None for valid in generate_valid_usernames(username): if model.user.get_user_or_org(valid): continue new_username = valid break prompts = model.user.get_default_user_prompts(features) to_login = model.user.create_federated_user(new_username, email, service_name.lower(), user_id, set_password_notification=True, metadata=metadata, prompts=prompts) # Success, tell analytics analytics.track(to_login.username, 'register', {'service': service_name.lower()}) state = request.args.get('state', None) if state: logger.debug('Aliasing with state: %s', state) analytics.alias(to_login.username, state) except model.InvalidEmailAddressException: message = "The e-mail address %s is already associated " % (email, ) message = message + "with an existing %s account." % (app.config['REGISTRY_TITLE_SHORT'], ) message = message + "\nPlease log in with your username and password and " message = message + "associate your %s account to use it in the future." % (service_name, ) return render_ologin_error(service_name, message) except model.DataModelException as ex: return render_ologin_error(service_name, ex.message) if common_login(to_login): if model.user.has_user_prompts(to_login): return redirect(url_for('web.updateuser')) else: return redirect(url_for('web.index')) return render_ologin_error(service_name) def get_email_username(user_data): username = user_data['email'] at = username.find('@') if at > 0: username = username[0:at] return username @oauthlogin.route('/google/callback', methods=['GET']) @route_show_if(features.GOOGLE_LOGIN) def google_oauth_callback(): error = request.args.get('error', None) if error: return render_ologin_error('Google', error) code = request.args.get('code') token = google_login.exchange_code_for_token(app.config, client, code, form_encode=True) user_data = get_user(google_login, token) if not user_data or not user_data.get('id', None) or not user_data.get('email', None): return render_ologin_error('Google') if not user_data.get('verified_email', False): return render_ologin_error( 'Google', 'A verified e-mail address is required for login. Please verify your ' + 'e-mail address in Google and try again.', ) username = get_email_username(user_data) metadata = { 'service_username': user_data['email'] } return conduct_oauth_login(google_login, user_data['id'], username, user_data['email'], metadata=metadata) @oauthlogin.route('/github/callback', methods=['GET']) @route_show_if(features.GITHUB_LOGIN) def github_oauth_callback(): error = request.args.get('error', None) if error: return render_ologin_error('GitHub', error) # Exchange the OAuth code. code = request.args.get('code') token = github_login.exchange_code_for_token(app.config, client, code) # Retrieve the user's information. user_data = get_user(github_login, token) if not user_data or not 'login' in user_data: return render_ologin_error('GitHub') username = user_data['login'] github_id = user_data['id'] v3_media_type = { 'Accept': 'application/vnd.github.v3' } token_param = { 'access_token': token, } # Retrieve the user's orgnizations (if organization filtering is turned on) if github_login.allowed_organizations() is not None: get_orgs = client.get(github_login.orgs_endpoint(), params=token_param, headers={'Accept': 'application/vnd.github.moondragon+json'}) organizations = set([org.get('login').lower() for org in get_orgs.json()]) if not (organizations & set(github_login.allowed_organizations())): err = """You are not a member of an allowed GitHub organization. Please contact your system administrator if you believe this is in error.""" return render_ologin_error('GitHub', err) # Find the e-mail address for the user: we will accept any email, but we prefer the primary get_email = client.get(github_login.email_endpoint(), params=token_param, headers=v3_media_type) found_email = None for user_email in get_email.json(): if not github_login.is_enterprise() and not user_email['verified']: continue found_email = user_email['email'] if user_email['primary']: break if found_email is None: err = 'There is no verified e-mail address attached to the GitHub account.' return render_ologin_error('GitHub', err) metadata = { 'service_username': username } return conduct_oauth_login(github_login, github_id, username, found_email, metadata=metadata) @oauthlogin.route('/google/callback/attach', methods=['GET']) @route_show_if(features.GOOGLE_LOGIN) @require_session_login def google_oauth_attach(): code = request.args.get('code') token = google_login.exchange_code_for_token(app.config, client, code, redirect_suffix='/attach', form_encode=True) user_data = get_user(google_login, token) if not user_data or not user_data.get('id', None): return render_ologin_error('Google') if not user_data.get('verified_email', False): return render_ologin_error( 'Google', 'A verified e-mail address is required for login. Please verify your ' + 'e-mail address in Google and try again.', ) google_id = user_data['id'] user_obj = current_user.db_user() username = get_email_username(user_data) metadata = { 'service_username': user_data['email'] } try: model.user.attach_federated_login(user_obj, 'google', google_id, metadata=metadata) except IntegrityError: err = 'Google account %s is already attached to a %s account' % ( username, app.config['REGISTRY_TITLE_SHORT']) return render_ologin_error('Google', err) return redirect(url_for('web.user_view', path=user_obj.username, tab='external')) @oauthlogin.route('/github/callback/attach', methods=['GET']) @route_show_if(features.GITHUB_LOGIN) @require_session_login def github_oauth_attach(): code = request.args.get('code') token = github_login.exchange_code_for_token(app.config, client, code) user_data = get_user(github_login, token) if not user_data: return render_ologin_error('GitHub') github_id = user_data['id'] user_obj = current_user.db_user() username = user_data['login'] metadata = { 'service_username': username } try: model.user.attach_federated_login(user_obj, 'github', github_id, metadata=metadata) except IntegrityError: err = 'Github account %s is already attached to a %s account' % ( username, app.config['REGISTRY_TITLE_SHORT']) return render_ologin_error('GitHub', err) return redirect(url_for('web.user_view', path=user_obj.username, tab='external')) def decode_user_jwt(token, oidc_provider): try: return decode(token, oidc_provider.get_public_key(), algorithms=['RS256'], audience=oidc_provider.client_id(), issuer=oidc_provider.issuer) except InvalidTokenError: # Public key may have expired. Try to retrieve an updated public key and use it to decode. return decode(token, oidc_provider.get_public_key(force_refresh=True), algorithms=['RS256'], audience=oidc_provider.client_id(), issuer=oidc_provider.issuer) @oauthlogin.route('/dex/callback', methods=['GET', 'POST']) @route_show_if(features.DEX_LOGIN) def dex_oauth_callback(): error = request.values.get('error', None) if error: return render_ologin_error(dex_login.public_title, error) code = request.values.get('code') if not code: return render_ologin_error(dex_login.public_title, 'Missing OAuth code') token = dex_login.exchange_code_for_token(app.config, client, code, client_auth=True, form_encode=True) try: payload = decode_user_jwt(token, dex_login) except InvalidTokenError: logger.exception('Exception when decoding returned JWT') return render_ologin_error( dex_login.public_title, 'Could not decode response. Please contact your system administrator about this error.', ) username = get_email_username(payload) metadata = {} dex_id = payload['sub'] email_address = payload['email'] if not payload.get('email_verified', False): return render_ologin_error( dex_login.public_title, 'A verified e-mail address is required for login. Please verify your ' + 'e-mail address in %s and try again.' % dex_login.public_title, ) return conduct_oauth_login(dex_login, dex_id, username, email_address, metadata=metadata) @oauthlogin.route('/dex/callback/attach', methods=['GET', 'POST']) @route_show_if(features.DEX_LOGIN) @require_session_login def dex_oauth_attach(): code = request.args.get('code') token = dex_login.exchange_code_for_token(app.config, client, code, redirect_suffix='/attach', client_auth=True, form_encode=True) if not token: return render_ologin_error(dex_login.public_title) try: payload = decode_user_jwt(token, dex_login) except InvalidTokenError: logger.exception('Exception when decoding returned JWT') return render_ologin_error( dex_login.public_title, 'Could not decode response. Please contact your system administrator about this error.', ) user_obj = current_user.db_user() dex_id = payload['sub'] metadata = {} try: model.user.attach_federated_login(user_obj, 'dex', dex_id, metadata=metadata) except IntegrityError: err = '%s account is already attached to a %s account' % (dex_login.public_title, app.config['REGISTRY_TITLE_SHORT']) return render_ologin_error(dex_login.public_title, err) return redirect(url_for('web.user_view', path=user_obj.username, tab='external'))