import logging from flask import Blueprint, request from flask.ext.restful import Resource, abort, Api, reqparse from flask.ext.restful.utils.cors import crossdomain from flask.ext.login import current_user from calendar import timegm from email.utils import formatdate from functools import partial, wraps from jsonschema import validate, ValidationError from data import model from util.names import parse_namespace_repository from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission) logger = logging.getLogger(__name__) api_bp = Blueprint('api', __name__) api = Api(api_bp) api.decorators = [crossdomain(origin='*')] def resource(*urls, **kwargs): def wrapper(api_resource): api.add_resource(api_resource, *urls, **kwargs) return api_resource return wrapper def truthy_bool(param): return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'} def format_date(date): """ Output an RFC822 date format. """ return formatdate(timegm(date.utctimetuple())) def add_method_metadata(name, value): def modifier(func): if '__api_metadata' not in dir(func): func.__api_metadata = {} func.__api_metadata[name] = value return func return modifier def method_metadata(func, name): if '__api_metadata' in dir(func): return func.__api_metadata.get(name, None) return None nickname = partial(add_method_metadata, 'nickname') def query_param(name, help_str, type=reqparse.text_type, default=None, choices=(), required=False): def add_param(func): if '__api_query_params' not in dir(func): func.__api_query_params = [] func.__api_query_params.append({ 'name': name, 'type': type, 'help': help_str, 'default': default, 'choices': choices, 'required': required, }) return func return add_param def parse_args(func): @wraps(func) def wrapper(self, *args, **kwargs): if '__api_query_params' not in dir(func): abort(400) parser = reqparse.RequestParser() for arg_spec in func.__api_query_params: parser.add_argument(**arg_spec) parsed_args = parser.parse_args() return func(self, parsed_args, *args, **kwargs) return wrapper def parse_repository_name(func): @wraps(func) def wrapper(repository, *args, **kwargs): (namespace, repository) = parse_namespace_repository(repository) return func(namespace, repository, *args, **kwargs) return wrapper class RepositoryParamResource(Resource): method_decorators = [parse_repository_name] def require_repo_permission(permission_class, allow_public=False): def wrapper(func): @wraps(func) def wrapped(self, namespace, repository, *args, **kwargs): permission = permission_class(namespace, repository) if (permission.can() or (allow_public and model.repository_is_public(namespace, repository))): return func(self, namespace, repository, *args, **kwargs) abort(403) return wrapped return wrapper require_repo_read = require_repo_permission(ReadRepositoryPermission, True) require_repo_write = require_repo_permission(ModifyRepositoryPermission) require_repo_admin = require_repo_permission(AdministerRepositoryPermission) def validate_json_request(schema_name): def wrapper(func): @add_method_metadata('request_schema', schema_name) @wraps(func) def wrapped(self, namespace, repository, *args, **kwargs): schema = self.schemas[schema_name] try: validate(request.get_json(), schema) return func(self, namespace, repository, *args, **kwargs) except ValidationError as ex: abort(400, message=ex.message) return wrapped return wrapper def log_action(kind, user_or_orgname, metadata={}, repo=None): performer = current_user.db_user() model.log_action(kind, user_or_orgname, performer=performer, ip=request.remote_addr, metadata=metadata, repository=repo) import endpoints.api.legacy import endpoints.api.repository import endpoints.api.discovery