This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/auth/auth.py

133 lines
4.3 KiB
Python
Raw Normal View History

import logging
from functools import wraps
from flask import request, _request_ctx_stack, session
from flask.ext.principal import identity_changed, Identity
from base64 import b64decode
from data import model
from app import app
from permissions import QuayDeferredPermissionUser
from util.names import parse_namespace_repository
from util.http import abort
logger = logging.getLogger(__name__)
def process_basic_auth(auth):
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'basic' or len(normalized) != 2:
logger.debug('Invalid basic auth format.')
return
credentials = b64decode(normalized[1]).split(':', 1)
if len(credentials) != 2:
logger.debug('Invalid basic auth credential format.')
elif credentials[0] == '$token':
# Use as token auth
try:
token = model.load_token_data(credentials[1])
logger.debug('Successfully validated token: %s' % credentials[1])
ctx = _request_ctx_stack.top
ctx.validated_token = token
identity_changed.send(app, identity=Identity(token.code, 'token'))
return
except model.DataModelException:
logger.debug('Invalid token: %s' % credentials[1])
elif '+' in credentials[0]:
logger.debug('Trying robot auth with credentials %s' % str(credentials))
# Use as robot auth
try:
robot = model.verify_robot(credentials[0], credentials[1])
logger.debug('Successfully validated robot: %s' % credentials[0])
ctx = _request_ctx_stack.top
ctx.authenticated_user = robot
identity_changed.send(app, identity=Identity(robot.username, 'username'))
return
except model.InvalidRobotException:
logger.debug('Invalid robot or password for robot: %s' % credentials[0])
else:
authenticated = model.verify_user(credentials[0], credentials[1])
if authenticated:
logger.debug('Successfully validated user: %s' % authenticated.username)
ctx = _request_ctx_stack.top
ctx.authenticated_user = authenticated
new_identity = QuayDeferredPermissionUser(authenticated.username,
'username')
identity_changed.send(app, identity=new_identity)
return
# We weren't able to authenticate via basic auth.
logger.debug('Basic auth present but could not be validated.')
def process_token(auth):
normalized = [part.strip() for part in auth.split(' ') if part]
if normalized[0].lower() != 'token' or len(normalized) != 2:
logger.debug('Not an auth token: %s' % auth)
return
token_details = normalized[1].split(',')
if len(token_details) != 1:
logger.warning('Invalid token format: %s' % auth)
abort(401, message="Invalid token format: %(auth)", issue='invalid-auth-token', auth=auth)
token_vals = {val[0]: val[1] for val in
(detail.split('=') for detail in token_details)}
if 'signature' not in token_vals:
logger.warning('Token does not contain signature: %s' % auth)
abort(401, message="Token does not contain a valid signature: %(auth)", issue='invalid-auth-token', auth=auth)
try:
token_data = model.load_token_data(token_vals['signature'])
except model.InvalidTokenException:
logger.warning('Token could not be validated: %s' %
token_vals['signature'])
abort(401, message="Token could not be validated: %(auth)", issue='invalid-auth-token', auth=auth)
logger.debug('Successfully validated token: %s' % token_data.code)
ctx = _request_ctx_stack.top
ctx.validated_token = token_data
identity_changed.send(app, identity=Identity(token_data.code, 'token'))
def process_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
auth = request.headers.get('authorization', '')
if auth:
logger.debug('Validating auth header: %s' % auth)
process_token(auth)
process_basic_auth(auth)
else:
logger.debug('No auth header.')
return f(*args, **kwargs)
return wrapper
def extract_namespace_repo_from_session(f):
@wraps(f)
def wrapper(*args, **kwargs):
if 'namespace' not in session or 'repository' not in session:
logger.error('Unable to load namespace or repository from session: %s' %
session)
abort(400, message="Missing namespace in request")
return f(session['namespace'], session['repository'], *args, **kwargs)
return wrapper