This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/decorators.py
Joseph Schorr 188ea98441 Add new decorator to prevent reflected text attacks
Instead of disabling repo names with periods in them, we simply disallow calls to the API when they are GET requests, whose path ends in a dot, and that do not have a referrer from the frontend.
2018-02-20 11:33:45 -05:00

118 lines
3.8 KiB
Python

""" Various decorators for endpoint and API handlers. """
import logging
from functools import wraps
from flask import abort, request, make_response
import features
from app import app
from auth.auth_context import get_authenticated_context
from util.names import parse_namespace_repository
from util.http import abort
logger = logging.getLogger(__name__)
def parse_repository_name(include_tag=False,
ns_kwarg_name='namespace_name',
repo_kwarg_name='repo_name',
tag_kwarg_name='tag_name',
incoming_repo_kwarg='repository'):
""" Decorator which parses the repository name found in the incoming_repo_kwarg argument,
and applies its pieces to the decorated function.
"""
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
app.config['LIBRARY_NAMESPACE'],
include_tag=include_tag)
del kwargs[incoming_repo_kwarg]
kwargs[ns_kwarg_name] = repo_name_components[0]
kwargs[repo_kwarg_name] = repo_name_components[1]
if include_tag:
kwargs[tag_kwarg_name] = repo_name_components[2]
return func(*args, **kwargs)
return wrapper
return inner
def param_required(param_name, allow_body=False):
""" Marks a route as requiring a parameter with the given name to exist in the request's arguments
or (if allow_body=True) in its body values. If the parameter is not present, the request will
fail with a 400.
"""
def wrapper(wrapped):
@wraps(wrapped)
def decorated(*args, **kwargs):
if param_name not in request.args:
if not allow_body or param_name not in request.values:
abort(make_response('Required param: %s' % param_name, 400))
return wrapped(*args, **kwargs)
return decorated
return wrapper
def anon_allowed(func):
""" Marks a method to allow anonymous access where it would otherwise be disallowed. """
func.__anon_allowed = True
return func
def anon_protect(func):
""" Marks a method as requiring some form of valid user auth before it can be executed. """
func.__anon_protected = True
return check_anon_protection(func)
def check_anon_protection(func):
""" Validates a method as requiring some form of valid user auth before it can be executed. """
@wraps(func)
def wrapper(*args, **kwargs):
# Skip if anonymous access is allowed.
if features.ANONYMOUS_ACCESS or '__anon_allowed' in dir(func):
return func(*args, **kwargs)
# Check for validated context. If none exists, fail with a 401.
if get_authenticated_context() and not get_authenticated_context().is_anonymous:
return func(*args, **kwargs)
abort(401)
return wrapper
def route_show_if(value):
""" Adds/shows the decorated route if the given value is True. """
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def require_xhr_from_browser(func):
""" Requires that API GET calls made from browsers are made via XHR, in order to prevent
reflected text attacks.
"""
@wraps(func)
def wrapper(*args, **kwargs):
if app.config.get('BROWSER_API_CALLS_XHR_ONLY', False):
if request.method == 'GET' and request.user_agent.browser:
has_xhr_header = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
if not has_xhr_header:
logger.warning('Disallowed possible RTA to URL %s with user agent %s',
request.path, request.user_agent)
abort(400)
return func(*args, **kwargs)
return wrapper