import logging import requests from flask import (abort, redirect, request, url_for, render_template, make_response) from flask.ext.login import login_user, UserMixin, login_required from flask.ext.principal import identity_changed, Identity, AnonymousIdentity from data import model from app import app, login_manager, mixpanel from auth.permissions import QuayDeferredPermissionUser logger = logging.getLogger(__name__) class _LoginWrappedDBUser(UserMixin): def __init__(self, db_username, db_user=None): self._db_username = db_username self._db_user = db_user def db_user(self): if not self._db_user: self._db_user = model.get_user(self._db_username) return self._db_user def is_authenticated(self): return self.db_user() is not None def is_active(self): return self.db_user().verified def get_id(self): return unicode(self._db_username) @login_manager.user_loader def load_user(username): logger.debug('Loading user: %s' % username) return _LoginWrappedDBUser(username) @app.route('/', methods=['GET'], defaults={'path': ''}) @app.route('/repository/', methods=['GET']) def index(path): return render_template('index.html') @app.route('/plans/') def plans(): return index('') @app.route('/guide/') def guide(): return index('') @app.route('/user/') def user(): return index('') @app.route('/signin/') def signin(): return index('') @app.route('/new/') def new(): return index('') @app.route('/repository/') def repository(): return index('') @app.route('/v1') @app.route('/v1/') def v1(): return index('') @app.route('/status', methods=['GET']) def status(): return make_response('Healthy') @app.route('/tos', methods=['GET']) def tos(): return render_template('tos.html') @app.route('/privacy', methods=['GET']) def privacy(): return render_template('privacy.html') def common_login(db_user): if login_user(_LoginWrappedDBUser(db_user.username, db_user)): logger.debug('Successfully signed in as: %s' % db_user.username) new_identity = QuayDeferredPermissionUser(db_user.username, 'username') identity_changed.send(app, identity=new_identity) return True else: logger.debug('User could not be logged in, inactive?.') return False @app.route('/oauth2/github/callback', methods=['GET']) def github_oauth_callback(): code = request.args.get('code') payload = { 'client_id': app.config['GITHUB_CLIENT_ID'], 'client_secret': app.config['GITHUB_CLIENT_SECRET'], 'code': code, } headers = { 'Accept': 'application/json' } get_access_token = requests.post(app.config['GITHUB_TOKEN_URL'], params=payload, headers=headers) token = get_access_token.json()['access_token'] token_param = { 'access_token': token, } get_user = requests.get(app.config['GITHUB_USER_URL'], params=token_param) user_data = get_user.json() username = user_data['login'] github_id = user_data['id'] v3_media_type = { 'Accept': 'application/vnd.github.v3' } get_email = requests.get(app.config['GITHUB_USER_EMAILS'], params=token_param, headers=v3_media_type) # We will accept any email, but we prefer the primary found_email = None for user_email in get_email.json(): found_email = user_email['email'] if user_email['primary']: break to_login = model.verify_federated_login('github', github_id) if not to_login: # try to create the user try: to_login = model.create_federated_user(username, found_email, 'github', github_id) # Success, tell mixpanel mixpanel.track(to_login.username, 'register', {'service': 'github'}) state = request.args.get('state', None) if state: logger.debug('Aliasing with state: %s' % state) mixpanel.alias(to_login.username, state) except model.DataModelException, ex: return render_template('githuberror.html', error_message=ex.message) if common_login(to_login): return redirect(url_for('index')) # TODO something bad happened, we need to tell the user somehow return render_template('githuberror.html') @app.route('/confirm', methods=['GET']) def confirm_email(): code = request.values['code'] user = model.confirm_user_email(code) common_login(user) return redirect(url_for('index')) @app.route('/recovery', methods=['GET']) def confirm_recovery(): code = request.values['code'] user = model.validate_reset_code(code) if user: common_login(user) return redirect(url_for('user')) else: abort(403) @app.route('/reset', methods=['GET']) def password_reset(): pass