This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/auth/auth.py

120 lines
3.2 KiB
Python

import logging
from functools import wraps
from flask import request, make_response, _request_ctx_stack, abort
from flask.ext.principal import identity_changed, Identity
from flask.ext.login import UserMixin
from base64 import b64decode
from data import model
from app import app, login_manager
from util import parse_namespace_repository
logger = logging.getLogger(__name__)
class _LoginWrappedDBUser(UserMixin):
def __init__(self, db_user):
self.db_user = db_user
def is_active(self):
return self.db_user.verified
def get_id(self):
return unicode(self.db_user.username)
@login_manager.user_loader
def load_user(username):
db_user = model.get_user(username)
if db_user:
return _LoginWrappedDBUser(db_user)
else:
return None
def get_authenticated_user():
return getattr(_request_ctx_stack.top, 'authenticated_user', None)
def get_validated_token():
return getattr(_request_ctx_stack.top, 'validated_token', None)
def process_basic_auth():
auth = request.headers.get('authorization', '')
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'basic' or len(normalized) != 2:
logger.debug('Invalid basic auth format.')
return False
credentials = b64decode(normalized[1]).split(':')
if len(credentials) != 2:
logger.debug('Invalid basic auth credential formet.')
authenticated = model.verify_user(credentials[0], credentials[1])
if authenticated:
logger.debug('Successfully validated user: %s' % authenticated.username)
ctx = _request_ctx_stack.top
ctx.authenticated_user = authenticated
identity_changed.send(app, identity=Identity(authenticated.username))
return True
# We weren't able to authenticate via basic auth.
return False
def process_token():
auth = request.headers.get('authorization', '')
logger.debug('Validating auth token: %s' % auth)
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'token' or len(normalized) != 3:
logger.debug('Invalid token format.')
return False
token_details = normalized[2].split(',')
if len(token_details) != 3:
logger.debug('Invalid token format.')
return False
token_vals = {val[0]: val[1] for val in
(detail.split('=') for detail in token_details)}
if ('signature' not in token_vals or 'access' not in token_vals or
'repository' not in token_vals):
logger.debug('Invalid token components.')
return False
unquoted = token_vals['repository'][1:-1]
namespace, repository = parse_namespace_repository(unquoted)
logger.debug('Validing signature: %s' % token_vals['signature'])
validated = model.verify_token(token_vals['signature'])
if validated:
logger.debug('Successfully validated token: %s' % validated.code)
ctx = _request_ctx_stack.top
ctx.validated_token = validated
identity_changed.send(app, identity=Identity(validated.code))
return True
# WE weren't able to authenticate the token
logger.debug('Token could not be validated.')
return False
def process_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
process_token()
process_basic_auth()
return f(*args, **kwargs)
return wrapper