77adf9dd77
Now we raise a 400 as expected
123 lines
4 KiB
Python
123 lines
4 KiB
Python
""" Various decorators for endpoint and API handlers. """
|
|
|
|
import logging
|
|
|
|
from functools import wraps
|
|
from flask import abort, request, make_response
|
|
|
|
import features
|
|
|
|
from app import app
|
|
from auth.auth_context import get_authenticated_context
|
|
from util.names import parse_namespace_repository, ImplicitLibraryNamespaceNotAllowed
|
|
from util.http import abort
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_repository_name(include_tag=False,
|
|
ns_kwarg_name='namespace_name',
|
|
repo_kwarg_name='repo_name',
|
|
tag_kwarg_name='tag_name',
|
|
incoming_repo_kwarg='repository'):
|
|
""" Decorator which parses the repository name found in the incoming_repo_kwarg argument,
|
|
and applies its pieces to the decorated function.
|
|
"""
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
|
|
app.config['LIBRARY_NAMESPACE'],
|
|
include_tag=include_tag,
|
|
allow_library=features.LIBRARY_SUPPORT)
|
|
except ImplicitLibraryNamespaceNotAllowed:
|
|
abort(400)
|
|
|
|
del kwargs[incoming_repo_kwarg]
|
|
kwargs[ns_kwarg_name] = repo_name_components[0]
|
|
kwargs[repo_kwarg_name] = repo_name_components[1]
|
|
if include_tag:
|
|
kwargs[tag_kwarg_name] = repo_name_components[2]
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return inner
|
|
|
|
|
|
def param_required(param_name, allow_body=False):
|
|
""" Marks a route as requiring a parameter with the given name to exist in the request's arguments
|
|
or (if allow_body=True) in its body values. If the parameter is not present, the request will
|
|
fail with a 400.
|
|
"""
|
|
def wrapper(wrapped):
|
|
@wraps(wrapped)
|
|
def decorated(*args, **kwargs):
|
|
if param_name not in request.args:
|
|
if not allow_body or param_name not in request.values:
|
|
abort(make_response('Required param: %s' % param_name, 400))
|
|
return wrapped(*args, **kwargs)
|
|
return decorated
|
|
return wrapper
|
|
|
|
|
|
def anon_allowed(func):
|
|
""" Marks a method to allow anonymous access where it would otherwise be disallowed. """
|
|
func.__anon_allowed = True
|
|
return func
|
|
|
|
|
|
def anon_protect(func):
|
|
""" Marks a method as requiring some form of valid user auth before it can be executed. """
|
|
func.__anon_protected = True
|
|
return check_anon_protection(func)
|
|
|
|
|
|
def check_anon_protection(func):
|
|
""" Validates a method as requiring some form of valid user auth before it can be executed. """
|
|
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
# Skip if anonymous access is allowed.
|
|
if features.ANONYMOUS_ACCESS or '__anon_allowed' in dir(func):
|
|
return func(*args, **kwargs)
|
|
|
|
# Check for validated context. If none exists, fail with a 401.
|
|
if get_authenticated_context() and not get_authenticated_context().is_anonymous:
|
|
return func(*args, **kwargs)
|
|
|
|
abort(401)
|
|
|
|
return wrapper
|
|
|
|
|
|
def route_show_if(value):
|
|
""" Adds/shows the decorated route if the given value is True. """
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not value:
|
|
abort(404)
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
def require_xhr_from_browser(func):
|
|
""" Requires that API GET calls made from browsers are made via XHR, in order to prevent
|
|
reflected text attacks.
|
|
"""
|
|
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if app.config.get('BROWSER_API_CALLS_XHR_ONLY', False):
|
|
if request.method == 'GET' and request.user_agent.browser:
|
|
has_xhr_header = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
|
|
if not has_xhr_header:
|
|
logger.warning('Disallowed possible RTA to URL %s with user agent %s',
|
|
request.path, request.user_agent)
|
|
abort(400)
|
|
|
|
return func(*args, **kwargs)
|
|
return wrapper
|