651666b60b
Breaks out the validation code from the auth context modification calls, makes decorators easier to define and adds testing for each individual piece. Will be the basis of better error messaging in the following change.
100 lines
3.9 KiB
Python
100 lines
3.9 KiB
Python
import logging
|
|
|
|
from base64 import b64decode
|
|
from flask import request
|
|
|
|
from app import authentication
|
|
from auth.oauth import validate_oauth_token
|
|
from auth.validateresult import ValidateResult, AuthKind
|
|
from data import model
|
|
from util.names import parse_robot_username
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ACCESS_TOKEN_USERNAME = '$token'
|
|
OAUTH_TOKEN_USERNAME = '$oauthtoken'
|
|
|
|
|
|
def has_basic_auth(username):
|
|
""" Returns true if a basic auth header exists with a username and password pair that validates
|
|
against the internal authentication system. Returns True on full success and False on any
|
|
failure (missing header, invalid header, invalid credentials, etc).
|
|
"""
|
|
auth_header = request.headers.get('authorization', '')
|
|
result = validate_basic_auth(auth_header)
|
|
return result.has_user and result.user.username == username
|
|
|
|
|
|
def validate_basic_auth(auth_header):
|
|
""" Validates the specified basic auth header, returning whether its credentials point
|
|
to a valid user or token.
|
|
"""
|
|
if not auth_header:
|
|
return ValidateResult(AuthKind.basic, missing=True)
|
|
|
|
logger.debug('Attempt to process basic auth header')
|
|
|
|
# Parse the basic auth header.
|
|
credentials, err = _parse_basic_auth_header(auth_header)
|
|
if err is not None:
|
|
logger.debug('Got invalid basic auth header: %s', auth_header)
|
|
return ValidateResult(AuthKind.basic, missing=True)
|
|
|
|
auth_username, auth_password_or_token = credentials
|
|
|
|
# Check for access tokens.
|
|
if auth_username == ACCESS_TOKEN_USERNAME:
|
|
logger.debug('Found basic auth header for access token')
|
|
try:
|
|
token = model.token.load_token_data(auth_password_or_token)
|
|
logger.debug('Successfully validated basic auth for access token %s', token.id)
|
|
return ValidateResult(AuthKind.basic, token=token)
|
|
except model.DataModelException:
|
|
logger.warning('Failed to validate basic auth for access token %s', auth_password_or_token)
|
|
return ValidateResult(AuthKind.basic, error_message='Invalid access token')
|
|
|
|
# Check for OAuth tokens.
|
|
if auth_username == OAUTH_TOKEN_USERNAME:
|
|
return validate_oauth_token(auth_password_or_token)
|
|
|
|
# Check for robots and users.
|
|
is_robot = parse_robot_username(auth_username)
|
|
if is_robot:
|
|
logger.debug('Found basic auth header for robot %s', auth_username)
|
|
try:
|
|
robot = model.user.verify_robot(auth_username, auth_password_or_token)
|
|
|
|
logger.debug('Successfully validated basic auth for robot %s', auth_username)
|
|
return ValidateResult(AuthKind.basic, robot=robot)
|
|
except model.InvalidRobotException as ire:
|
|
logger.warning('Failed to validate basic auth for robot %s: %s', auth_username, ire.message)
|
|
return ValidateResult(AuthKind.basic, error_message=ire.message)
|
|
|
|
# Otherwise, treat as a standard user.
|
|
(authenticated, err) = authentication.verify_and_link_user(auth_username, auth_password_or_token,
|
|
basic_auth=True)
|
|
if authenticated:
|
|
logger.debug('Successfully validated basic auth for user %s', authenticated.username)
|
|
return ValidateResult(AuthKind.basic, user=authenticated)
|
|
else:
|
|
logger.warning('Failed to validate basic auth for user %s: %s', auth_username, err)
|
|
return ValidateResult(AuthKind.basic, error_message=err)
|
|
|
|
|
|
def _parse_basic_auth_header(auth):
|
|
""" Parses the given basic auth header, returning the credentials found inside.
|
|
"""
|
|
normalized = [part.strip() for part in auth.split(' ') if part]
|
|
if normalized[0].lower() != 'basic' or len(normalized) != 2:
|
|
return None, 'Invalid basic auth header'
|
|
|
|
try:
|
|
credentials = [part.decode('utf-8') for part in b64decode(normalized[1]).split(':', 1)]
|
|
except TypeError:
|
|
logger.exception('Exception when parsing basic auth header: %s', auth)
|
|
return None, 'Could not parse basic auth header'
|
|
|
|
if len(credentials) != 2:
|
|
return None, 'Unexpected number of credentials found in basic auth header'
|
|
|
|
return credentials, None
|