This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/__init__.py

279 lines
8.3 KiB
Python

import logging
import json
from flask import Blueprint, request, make_response, jsonify
from flask.ext.restful import Resource, abort, Api, reqparse
from flask.ext.restful.utils.cors import crossdomain
from werkzeug.exceptions import HTTPException
from calendar import timegm
from email.utils import formatdate
from functools import partial, wraps
from jsonschema import validate, ValidationError
from data import model
from util.names import parse_namespace_repository
from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission,
AdministerRepositoryPermission, UserReadPermission,
UserAdminPermission)
from auth import scopes
from auth.auth_context import get_authenticated_user, get_validated_oauth_token
from auth.auth import process_oauth
from endpoints.csrf import csrf_protect
logger = logging.getLogger(__name__)
api_bp = Blueprint('api', __name__)
api = Api()
api.init_app(api_bp)
api.decorators = [csrf_protect,
process_oauth,
crossdomain(origin='*', headers=['Authorization', 'Content-Type'])]
class ApiException(Exception):
def __init__(self, error_type, status_code, error_description, payload=None):
Exception.__init__(self)
self.error_description = error_description
self.status_code = status_code
self.payload = payload
self.error_type = error_type
def to_dict(self):
rv = dict(self.payload or ())
if self.error_description is not None:
rv['error_description'] = self.error_description
rv['error_type'] = self.error_type
return rv
class InvalidRequest(ApiException):
def __init__(self, error_description, payload=None):
ApiException.__init__(self, 'invalid_request', 400, error_description, payload)
class InvalidToken(ApiException):
def __init__(self, error_description, payload=None):
ApiException.__init__(self, 'invalid_token', 401, error_description, payload)
class Unauthorized(ApiException):
def __init__(self, payload=None):
user = get_authenticated_user()
if user is None or user.organization:
ApiException.__init__(self, 'invalid_token', 401, "Requires authentication", payload)
else:
ApiException.__init__(self, 'insufficient_scope', 403, 'Unauthorized', payload)
class NotFound(ApiException):
def __init__(self, payload=None):
ApiException.__init__(self, None, 404, 'Not Found', payload)
@api_bp.app_errorhandler(ApiException)
@crossdomain(origin='*', headers=['Authorization', 'Content-Type'])
def handle_api_error(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
if error.error_type is not None:
response.headers['WWW-Authenticate'] = ('Bearer error="%s" error_description="%s"' %
(error.error_type, error.error_description))
return response
def resource(*urls, **kwargs):
def wrapper(api_resource):
api.add_resource(api_resource, *urls, **kwargs)
return api_resource
return wrapper
def truthy_bool(param):
return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'}
def format_date(date):
""" Output an RFC822 date format. """
return formatdate(timegm(date.utctimetuple()))
def add_method_metadata(name, value):
def modifier(func):
if '__api_metadata' not in dir(func):
func.__api_metadata = {}
func.__api_metadata[name] = value
return func
return modifier
def method_metadata(func, name):
if '__api_metadata' in dir(func):
return func.__api_metadata.get(name, None)
return None
nickname = partial(add_method_metadata, 'nickname')
related_user_resource = partial(add_method_metadata, 'related_user_resource')
internal_only = add_method_metadata('internal', True)
def query_param(name, help_str, type=reqparse.text_type, default=None,
choices=(), required=False):
def add_param(func):
if '__api_query_params' not in dir(func):
func.__api_query_params = []
func.__api_query_params.append({
'name': name,
'type': type,
'help': help_str,
'default': default,
'choices': choices,
'required': required,
})
return func
return add_param
def parse_args(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if '__api_query_params' not in dir(func):
abort(500)
parser = reqparse.RequestParser()
for arg_spec in func.__api_query_params:
parser.add_argument(**arg_spec)
parsed_args = parser.parse_args()
return func(self, parsed_args, *args, **kwargs)
return wrapper
def parse_repository_name(func):
@wraps(func)
def wrapper(repository, *args, **kwargs):
(namespace, repository) = parse_namespace_repository(repository)
return func(namespace, repository, *args, **kwargs)
return wrapper
class ApiResource(Resource):
def options(self):
return None, 200
class RepositoryParamResource(ApiResource):
method_decorators = [parse_repository_name]
def require_repo_permission(permission_class, scope, allow_public=False):
def wrapper(func):
@add_method_metadata('oauth2_scope', scope)
@wraps(func)
def wrapped(self, namespace, repository, *args, **kwargs):
logger.debug('Checking permission %s for repo: %s/%s', permission_class, namespace,
repository)
permission = permission_class(namespace, repository)
if (permission.can() or
(allow_public and
model.repository_is_public(namespace, repository))):
return func(self, namespace, repository, *args, **kwargs)
raise Unauthorized()
return wrapped
return wrapper
require_repo_read = require_repo_permission(ReadRepositoryPermission, scopes.READ_REPO, True)
require_repo_write = require_repo_permission(ModifyRepositoryPermission, scopes.WRITE_REPO)
require_repo_admin = require_repo_permission(AdministerRepositoryPermission, scopes.ADMIN_REPO)
def require_user_permission(permission_class, scope=None):
def wrapper(func):
@add_method_metadata('oauth2_scope', scope)
@wraps(func)
def wrapped(self, *args, **kwargs):
user = get_authenticated_user()
if not user:
raise Unauthorized()
logger.debug('Checking permission %s for user %s', permission_class, user.username)
permission = permission_class(user.username)
if permission.can():
return func(self, *args, **kwargs)
raise Unauthorized()
return wrapped
return wrapper
require_user_read = require_user_permission(UserReadPermission, scopes.READ_USER)
require_user_admin = require_user_permission(UserAdminPermission, None)
def require_scope(scope_object):
def wrapper(func):
@add_method_metadata('oauth2_scope', scope_object)
@wraps(func)
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
return wrapped
return wrapper
def validate_json_request(schema_name):
def wrapper(func):
@add_method_metadata('request_schema', schema_name)
@wraps(func)
def wrapped(self, *args, **kwargs):
schema = self.schemas[schema_name]
try:
validate(request.get_json(), schema)
return func(self, *args, **kwargs)
except ValidationError as ex:
raise InvalidRequest(ex.message)
return wrapped
return wrapper
def request_error(exception=None, **kwargs):
data = kwargs.copy()
message = 'Request error.'
if exception:
message = exception.message
raise InvalidRequest(message, data)
def log_action(kind, user_or_orgname, metadata=None, repo=None):
if not metadata:
metadata = {}
oauth_token = get_validated_oauth_token()
if oauth_token:
metadata['oauth_token_id'] = oauth_token.id
metadata['oauth_token_application_id'] = oauth_token.application.client_id
metadata['oauth_token_application'] = oauth_token.application.name
performer = get_authenticated_user()
model.log_action(kind, user_or_orgname, performer=performer, ip=request.remote_addr,
metadata=metadata, repository=repo)
import endpoints.api.billing
import endpoints.api.build
import endpoints.api.discovery
import endpoints.api.image
import endpoints.api.logs
import endpoints.api.organization
import endpoints.api.permission
import endpoints.api.prototype
import endpoints.api.repository
import endpoints.api.repotoken
import endpoints.api.robot
import endpoints.api.search
import endpoints.api.tag
import endpoints.api.team
import endpoints.api.trigger
import endpoints.api.user
import endpoints.api.webhook