189 lines
4.5 KiB
Python
189 lines
4.5 KiB
Python
import logging
|
|
import requests
|
|
|
|
from flask import (abort, redirect, request, url_for, render_template,
|
|
make_response)
|
|
from flask.ext.login import login_user, UserMixin, login_required
|
|
from flask.ext.principal import identity_changed, Identity, AnonymousIdentity
|
|
|
|
from data import model
|
|
from app import app, login_manager, mixpanel
|
|
from auth.permissions import QuayDeferredPermissionUser
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _LoginWrappedDBUser(UserMixin):
|
|
def __init__(self, db_username, db_user=None):
|
|
|
|
self._db_username = db_username
|
|
self._db_user = db_user
|
|
|
|
def db_user(self):
|
|
if not self._db_user:
|
|
self._db_user = model.get_user(self._db_username)
|
|
return self._db_user
|
|
|
|
def is_authenticated(self):
|
|
return self.db_user() is not None
|
|
|
|
def is_active(self):
|
|
return self.db_user().verified
|
|
|
|
def get_id(self):
|
|
return unicode(self._db_username)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(username):
|
|
logger.debug('Loading user: %s' % username)
|
|
return _LoginWrappedDBUser(username)
|
|
|
|
|
|
@app.route('/', methods=['GET'], defaults={'path': ''})
|
|
@app.route('/repository/<path:path>', methods=['GET'])
|
|
def index(path):
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/plans/')
|
|
def plans():
|
|
return index('')
|
|
|
|
|
|
@app.route('/guide/')
|
|
def guide():
|
|
return index('')
|
|
|
|
|
|
@app.route('/user/')
|
|
def user():
|
|
return index('')
|
|
|
|
|
|
@app.route('/signin/')
|
|
def signin():
|
|
return index('')
|
|
|
|
|
|
@app.route('/repository/')
|
|
def repository():
|
|
return index('')
|
|
|
|
|
|
@app.route('/status', methods=['GET'])
|
|
def status():
|
|
return make_response('Healthy')
|
|
|
|
|
|
@app.route('/tos', methods=['GET'])
|
|
def tos():
|
|
return render_template('tos.html')
|
|
|
|
|
|
@app.route('/privacy', methods=['GET'])
|
|
def privacy():
|
|
return render_template('privacy.html')
|
|
|
|
|
|
def common_login(db_user):
|
|
if login_user(_LoginWrappedDBUser(db_user.username, db_user)):
|
|
logger.debug('Successfully signed in as: %s' % db_user.username)
|
|
new_identity = QuayDeferredPermissionUser(db_user.username, 'username')
|
|
identity_changed.send(app, identity=new_identity)
|
|
return True
|
|
else:
|
|
logger.debug('User could not be logged in, inactive?.')
|
|
return False
|
|
|
|
|
|
@app.route('/oauth2/github/callback', methods=['GET'])
|
|
def github_oauth_callback():
|
|
code = request.args.get('code')
|
|
payload = {
|
|
'client_id': app.config['GITHUB_CLIENT_ID'],
|
|
'client_secret': app.config['GITHUB_CLIENT_SECRET'],
|
|
'code': code,
|
|
}
|
|
headers = {
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
get_access_token = requests.post(app.config['GITHUB_TOKEN_URL'],
|
|
params=payload, headers=headers)
|
|
|
|
token = get_access_token.json()['access_token']
|
|
|
|
token_param = {
|
|
'access_token': token,
|
|
}
|
|
get_user = requests.get(app.config['GITHUB_USER_URL'], params=token_param)
|
|
|
|
user_data = get_user.json()
|
|
username = user_data['login']
|
|
github_id = user_data['id']
|
|
|
|
v3_media_type = {
|
|
'Accept': 'application/vnd.github.v3'
|
|
}
|
|
get_email = requests.get(app.config['GITHUB_USER_EMAILS'],
|
|
params=token_param, headers=v3_media_type)
|
|
|
|
# We will accept any email, but we prefer the primary
|
|
found_email = None
|
|
for user_email in get_email.json():
|
|
found_email = user_email['email']
|
|
if user_email['primary']:
|
|
break
|
|
|
|
to_login = model.verify_federated_login('github', github_id)
|
|
if not to_login:
|
|
# try to create the user
|
|
try:
|
|
to_login = model.create_federated_user(username, found_email, 'github',
|
|
github_id)
|
|
|
|
# Success, tell mixpanel
|
|
mixpanel.track(to_login.username, 'register', {'service': 'github'})
|
|
|
|
state = request.args.get('state', None)
|
|
if state:
|
|
logger.debug('Aliasing with state: %s' % state)
|
|
mixpanel.alias(to_login.username, state)
|
|
|
|
except model.DataModelException, ex:
|
|
return render_template('githuberror.html', error_message=ex.message)
|
|
|
|
if common_login(to_login):
|
|
return redirect(url_for('index'))
|
|
|
|
# TODO something bad happened, we need to tell the user somehow
|
|
return render_template('githuberror.html')
|
|
|
|
|
|
@app.route('/confirm', methods=['GET'])
|
|
def confirm_email():
|
|
code = request.values['code']
|
|
user = model.confirm_user_email(code)
|
|
|
|
common_login(user)
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/recovery', methods=['GET'])
|
|
def confirm_recovery():
|
|
code = request.values['code']
|
|
user = model.validate_reset_code(code)
|
|
|
|
if user:
|
|
common_login(user)
|
|
return redirect(url_for('user'))
|
|
else:
|
|
abort(403)
|
|
|
|
|
|
@app.route('/reset', methods=['GET'])
|
|
def password_reset():
|
|
pass
|