This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.

100 lines
2.7 KiB
Raw Normal View History

import logging
from functools import wraps
from flask import request, make_response, _request_ctx_stack, abort
from flask.ext.principal import identity_changed, Identity
from base64 import b64decode
import model
from app import app
from util import parse_namespace_repository
logger = logging.getLogger(__name__)
def get_authenticated_user():
return getattr(, 'authenticated_user', None)
def get_validated_token():
return getattr(, 'validated_token', None)
def process_basic_auth():
auth = request.headers.get('authorization', '')
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'basic' or len(normalized) != 2:
logger.debug('Invalid basic auth format.')
return False
credentials = b64decode(normalized[1]).split(':')
if len(credentials) != 2:
logger.debug('Invalid basic auth credential formet.')
authenticated = model.verify_user(credentials[0], credentials[1])
if authenticated:
logger.debug('Successfully validated user: %s' % authenticated.username)
ctx =
ctx.authenticated_user = authenticated
identity_changed.send(app, identity=Identity(authenticated.username))
return True
# We weren't able to authenticate via basic auth.
return False
def process_token():
auth = request.headers.get('authorization', '')
logger.debug('Validating auth token: %s' % auth)
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'token' or len(normalized) != 3:
logger.debug('Invalid token format.')
return False
token_details = normalized[2].split(',')
if len(token_details) != 3:
logger.debug('Invalid token format.')
return False
token_vals = {val[0]: val[1] for val in
(detail.split('=') for detail in token_details)}
if ('signature' not in token_vals or 'access' not in token_vals or
'repository' not in token_vals):
logger.debug('Invalid token components.')
return False
unquoted = token_vals['repository'][1:-1]
namespace, repository = parse_namespace_repository(unquoted)
logger.debug('Validing signature: %s' % token_vals['signature'])
validated = model.verify_token(token_vals['signature'])
if validated:
logger.debug('Successfully validated token: %s' % validated.code)
ctx =
ctx.validated_token = validated
identity_changed.send(app, identity=Identity(validated.code))
return True
# WE weren't able to authenticate the token
logger.debug('Token could not be validated.')
return False
def process_auth(f):
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper