This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/oauthlogin.py
Joseph Schorr fda203e4d7 Add proper and tested OIDC support on the server
Note that this will still not work on the client side; the followup CL for the client side is right after this one.
2017-01-23 17:53:34 -05:00

169 lines
6.2 KiB
Python

import logging
import uuid
from flask import request, redirect, url_for, Blueprint
from peewee import IntegrityError
import features
from app import app, analytics, get_app_url, oauth_login
from auth.auth_context import get_authenticated_user
from auth.process import require_session_login
from data import model
from endpoints.common import common_login
from endpoints.web import index
from endpoints.csrf import csrf_protect, OAUTH_CSRF_TOKEN_NAME
from oauth.login import OAuthLoginException
from util.validation import generate_valid_usernames
logger = logging.getLogger(__name__)
client = app.config['HTTPCLIENT']
oauthlogin = Blueprint('oauthlogin', __name__)
oauthlogin_csrf_protect = csrf_protect(OAUTH_CSRF_TOKEN_NAME, 'state', all_methods=True)
def _render_ologin_error(service_name, error_message=None, register_redirect=False):
""" Returns a Flask response indicating an OAuth error. """
user_creation = bool(features.USER_CREATION and features.DIRECT_LOGIN)
error_info = {
'reason': 'ologinerror',
'service_name': service_name,
'error_message': error_message or 'Could not load user data. The token may have expired',
'service_url': get_app_url(),
'user_creation': user_creation,
'register_redirect': register_redirect,
}
resp = index('', error_info=error_info)
resp.status_code = 400
return resp
def _conduct_oauth_login(service_id, service_name, user_id, username, email, metadata=None):
""" Conducts login from the result of an OAuth service's login flow. """
to_login = model.user.verify_federated_login(service_id, user_id)
if not to_login:
# See if we can create a new user.
if not features.USER_CREATION:
error_message = 'User creation is disabled. Please contact your administrator'
return _render_ologin_error(service_name, error_message)
# Try to create the user
try:
# Generate a valid username.
new_username = None
for valid in generate_valid_usernames(username):
if model.user.get_user_or_org(valid):
continue
new_username = valid
break
# Generate a valid email. If the email is None and the MAILING feature is turned
# off, simply place in a fake email address.
if email is None and not features.MAILING:
email = '%s@fake.example.com' % (str(uuid.uuid4()))
prompts = model.user.get_default_user_prompts(features)
to_login = model.user.create_federated_user(new_username, email, service_id,
user_id, set_password_notification=True,
metadata=metadata or {},
prompts=prompts)
# Success, tell analytics
analytics.track(to_login.username, 'register', {'service': service_name.lower()})
except model.InvalidEmailAddressException:
message = "The e-mail address %s is already associated " % (email, )
message = message + "with an existing %s account." % (app.config['REGISTRY_TITLE_SHORT'], )
message = message + "\nPlease log in with your username and password and "
message = message + "associate your %s account to use it in the future." % (service_name, )
return _render_ologin_error(service_name, message, register_redirect=True)
except model.DataModelException as ex:
return _render_ologin_error(service_name, ex.message)
if common_login(to_login):
if model.user.has_user_prompts(to_login):
return redirect(url_for('web.updateuser'))
else:
return redirect(url_for('web.index'))
return _render_ologin_error(service_name)
def _register_service(login_service):
""" Registers the given login service, adding its callback and attach routes to the blueprint. """
@oauthlogin_csrf_protect
def callback_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, lemail = login_service.exchange_code_for_login(app.config, client, code, '')
except OAuthLoginException as ole:
logger.exception('Got login exception')
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct login.
metadata = {
'service_username': lusername
}
return _conduct_oauth_login(login_service.service_id(), login_service.service_name(), lid,
lusername, lemail, metadata=metadata)
@require_session_login
@oauthlogin_csrf_protect
def attach_func():
# Check for a callback error.
error = request.args.get('error', None)
if error:
return _render_ologin_error(login_service.service_name(), error)
# Exchange the OAuth code for login information.
code = request.args.get('code')
try:
lid, lusername, _ = login_service.exchange_code_for_login(app.config, client, code, '/attach')
except OAuthLoginException as ole:
return _render_ologin_error(login_service.service_name(), ole.message)
# Conduct attach.
metadata = {
'service_username': lusername
}
user_obj = get_authenticated_user()
try:
model.user.attach_federated_login(user_obj, login_service.service_id(), lid,
metadata=metadata)
except IntegrityError:
err = '%s account %s is already attached to a %s account' % (
login_service.service_name(), lusername, app.config['REGISTRY_TITLE_SHORT'])
return _render_ologin_error(login_service.service_name(), err)
return redirect(url_for('web.user_view', path=user_obj.username, tab='external'))
oauthlogin.add_url_rule('/%s/callback' % login_service.service_id(),
'%s_oauth_callback' % login_service.service_id(),
callback_func,
methods=['GET'])
oauthlogin.add_url_rule('/%s/callback/attach' % login_service.service_id(),
'%s_oauth_attach' % login_service.service_id(),
attach_func,
methods=['GET'])
# Register the routes for each of the login services.
for current_service in oauth_login.services:
_register_service(current_service)