This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/decorators.py

163 lines
5.8 KiB
Python

""" Various decorators for endpoint and API handlers. """
import os
import logging
from functools import wraps
from flask import abort, request, make_response
import features
from app import app, ip_resolver, model_cache
from auth.auth_context import get_authenticated_context
from data.registry_model import registry_model
from util.names import parse_namespace_repository, ImplicitLibraryNamespaceNotAllowed
from util.http import abort
logger = logging.getLogger(__name__)
def parse_repository_name(include_tag=False,
ns_kwarg_name='namespace_name',
repo_kwarg_name='repo_name',
tag_kwarg_name='tag_name',
incoming_repo_kwarg='repository'):
""" Decorator which parses the repository name found in the incoming_repo_kwarg argument,
and applies its pieces to the decorated function.
"""
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
repo_name_components = parse_namespace_repository(kwargs[incoming_repo_kwarg],
app.config['LIBRARY_NAMESPACE'],
include_tag=include_tag,
allow_library=features.LIBRARY_SUPPORT)
except ImplicitLibraryNamespaceNotAllowed:
abort(400, message='A namespace must be specified explicitly')
del kwargs[incoming_repo_kwarg]
kwargs[ns_kwarg_name] = repo_name_components[0]
kwargs[repo_kwarg_name] = repo_name_components[1]
if include_tag:
kwargs[tag_kwarg_name] = repo_name_components[2]
return func(*args, **kwargs)
return wrapper
return inner
def param_required(param_name, allow_body=False):
""" Marks a route as requiring a parameter with the given name to exist in the request's arguments
or (if allow_body=True) in its body values. If the parameter is not present, the request will
fail with a 400.
"""
def wrapper(wrapped):
@wraps(wrapped)
def decorated(*args, **kwargs):
if param_name not in request.args:
if not allow_body or param_name not in request.values:
abort(400, message='Required param: %s' % param_name)
return wrapped(*args, **kwargs)
return decorated
return wrapper
def anon_allowed(func):
""" Marks a method to allow anonymous access where it would otherwise be disallowed. """
func.__anon_allowed = True
return func
def anon_protect(func):
""" Marks a method as requiring some form of valid user auth before it can be executed. """
func.__anon_protected = True
return check_anon_protection(func)
def check_anon_protection(func):
""" Validates a method as requiring some form of valid user auth before it can be executed. """
@wraps(func)
def wrapper(*args, **kwargs):
# Skip if anonymous access is allowed.
if features.ANONYMOUS_ACCESS or '__anon_allowed' in dir(func):
return func(*args, **kwargs)
# Check for validated context. If none exists, fail with a 401.
if get_authenticated_context() and not get_authenticated_context().is_anonymous:
return func(*args, **kwargs)
abort(401, message='Anonymous access is not allowed')
return wrapper
def route_show_if(value):
""" Adds/shows the decorated route if the given value is True. """
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not value:
abort(404)
return f(*args, **kwargs)
return decorated_function
return decorator
def require_xhr_from_browser(func):
""" Requires that API GET calls made from browsers are made via XHR, in order to prevent
reflected text attacks.
"""
@wraps(func)
def wrapper(*args, **kwargs):
if app.config.get('BROWSER_API_CALLS_XHR_ONLY', False):
if request.method == 'GET' and request.user_agent.browser:
has_xhr_header = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
if not has_xhr_header and not app.config.get('DEBUGGING') == True:
logger.warning('Disallowed possible RTA to URL %s with user agent %s',
request.path, request.user_agent)
abort(400, message='API calls must be invoked with an X-Requested-With header ' +
'if called from a browser')
return func(*args, **kwargs)
return wrapper
def check_region_blacklisted(error_class=None, namespace_name_kwarg=None):
""" Decorator which checks if the incoming request is from a region geo IP blocked
for the current namespace. The first argument to the wrapped function must be
the namespace name.
"""
def wrapper(wrapped):
@wraps(wrapped)
def decorated(*args, **kwargs):
if namespace_name_kwarg:
namespace_name = kwargs[namespace_name_kwarg]
else:
namespace_name = args[0]
region_blacklist = registry_model.get_cached_namespace_region_blacklist(model_cache,
namespace_name)
if region_blacklist:
# Resolve the IP information and block if on the namespace's blacklist.
remote_addr = request.remote_addr
if os.getenv('TEST', 'false').lower() == 'true':
remote_addr = request.headers.get('X-Override-Remote-Addr-For-Testing', remote_addr)
resolved_ip_info = ip_resolver.resolve_ip(remote_addr)
logger.debug('Resolved IP information for IP %s: %s', remote_addr, resolved_ip_info)
if (resolved_ip_info and
resolved_ip_info.country_iso_code and
resolved_ip_info.country_iso_code in region_blacklist):
if error_class:
raise error_class()
abort(403, 'Pulls of this data have been restricted geographically')
return wrapped(*args, **kwargs)
return decorated
return wrapper