import logging from functools import wraps from flask import request, make_response, _request_ctx_stack, abort from flask.ext.principal import identity_changed, Identity from base64 import b64decode from data import model from app import app from util.names import parse_namespace_repository logger = logging.getLogger(__name__) def get_authenticated_user(): return getattr(_request_ctx_stack.top, 'authenticated_user', None) def get_validated_token(): return getattr(_request_ctx_stack.top, 'validated_token', None) def process_basic_auth(): auth = request.headers.get('authorization', '') normalized = [part.strip() for part in auth.split(' ') if part] if normalized[0].lower() != 'basic' or len(normalized) != 2: logger.debug('Invalid basic auth format.') return False credentials = b64decode(normalized[1]).split(':') if len(credentials) != 2: logger.debug('Invalid basic auth credential formet.') authenticated = model.verify_user(credentials[0], credentials[1]) if authenticated: logger.debug('Successfully validated user: %s' % authenticated.username) ctx = _request_ctx_stack.top ctx.authenticated_user = authenticated identity_changed.send(app, identity=Identity(authenticated.username)) return True # We weren't able to authenticate via basic auth. return False def process_token(): auth = request.headers.get('authorization', '') logger.debug('Validating auth token: %s' % auth) normalized = [part.strip() for part in auth.split(' ') if part] if normalized[0].lower() != 'token' or len(normalized) != 3: logger.debug('Invalid token format.') return False token_details = normalized[2].split(',') if len(token_details) != 3: logger.debug('Invalid token format.') return False token_vals = {val[0]: val[1] for val in (detail.split('=') for detail in token_details)} if ('signature' not in token_vals or 'access' not in token_vals or 'repository' not in token_vals): logger.debug('Invalid token components.') return False unquoted = token_vals['repository'][1:-1] namespace, repository = parse_namespace_repository(unquoted) logger.debug('Validing signature: %s' % token_vals['signature']) validated = model.verify_token(token_vals['signature']) if validated: logger.debug('Successfully validated token: %s' % validated.code) ctx = _request_ctx_stack.top ctx.validated_token = validated identity_changed.send(app, identity=Identity(validated.code)) return True # WE weren't able to authenticate the token logger.debug('Token could not be validated.') return False def process_auth(f): @wraps(f) def wrapper(*args, **kwargs): process_token() process_basic_auth() return f(*args, **kwargs) return wrapper