import logging import json from flask import Blueprint, request, make_response from flask.ext.restful import Resource, abort, Api, reqparse from flask.ext.restful.utils.cors import crossdomain from calendar import timegm from email.utils import formatdate from functools import partial, wraps from jsonschema import validate, ValidationError from data import model from util.names import parse_namespace_repository from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission) from auth import scopes from auth.auth_context import get_authenticated_user from auth.auth import process_oauth logger = logging.getLogger(__name__) api_bp = Blueprint('api', __name__) api = Api(api_bp) api.decorators = [process_oauth, crossdomain(origin='*', headers=['Authorization', 'Content-Type'])] def resource(*urls, **kwargs): def wrapper(api_resource): api.add_resource(api_resource, *urls, **kwargs) return api_resource return wrapper def truthy_bool(param): return param not in {False, 'false', 'False', '0', 'FALSE', '', 'null'} def format_date(date): """ Output an RFC822 date format. """ return formatdate(timegm(date.utctimetuple())) def add_method_metadata(name, value): def modifier(func): if '__api_metadata' not in dir(func): func.__api_metadata = {} func.__api_metadata[name] = value return func return modifier def method_metadata(func, name): if '__api_metadata' in dir(func): return func.__api_metadata.get(name, None) return None nickname = partial(add_method_metadata, 'nickname') internal_api = add_method_metadata('internal_api', True) def query_param(name, help_str, type=reqparse.text_type, default=None, choices=(), required=False): def add_param(func): if '__api_query_params' not in dir(func): func.__api_query_params = [] func.__api_query_params.append({ 'name': name, 'type': type, 'help': help_str, 'default': default, 'choices': choices, 'required': required, }) return func return add_param def parse_args(func): @wraps(func) def wrapper(self, *args, **kwargs): if '__api_query_params' not in dir(func): abort(400) parser = reqparse.RequestParser() for arg_spec in func.__api_query_params: parser.add_argument(**arg_spec) parsed_args = parser.parse_args() return func(self, parsed_args, *args, **kwargs) return wrapper def parse_repository_name(func): @wraps(func) def wrapper(repository, *args, **kwargs): (namespace, repository) = parse_namespace_repository(repository) return func(namespace, repository, *args, **kwargs) return wrapper class ApiResource(Resource): def options(self): return None, 200 class RepositoryParamResource(ApiResource): method_decorators = [parse_repository_name] def require_repo_permission(permission_class, scope, allow_public=False): def wrapper(func): @add_method_metadata('oauth2_scope', scope) @wraps(func) def wrapped(self, namespace, repository, *args, **kwargs): logger.debug('Checking permission %s for repo: %s/%s', permission_class, namespace, repository) permission = permission_class(namespace, repository) if (permission.can() or (allow_public and model.repository_is_public(namespace, repository))): return func(self, namespace, repository, *args, **kwargs) abort(403) return wrapped return wrapper require_repo_read = require_repo_permission(ReadRepositoryPermission, scopes.READ_REPO, True) require_repo_write = require_repo_permission(ModifyRepositoryPermission, scopes.WRITE_REPO) require_repo_admin = require_repo_permission(AdministerRepositoryPermission, scopes.ADMIN_REPO) def require_scope(scope_object): def wrapper(func): @add_method_metadata('oauth2_scope', scope_object['scope']) @wraps(func) def wrapped(*args, **kwargs): return func(*args, **kwargs) return wrapped return wrapper def validate_json_request(schema_name): def wrapper(func): @add_method_metadata('request_schema', schema_name) @wraps(func) def wrapped(self, *args, **kwargs): schema = self.schemas[schema_name] try: validate(request.get_json(), schema) return func(self, *args, **kwargs) except ValidationError as ex: abort(400, message=ex.message) return wrapped return wrapper def request_error(exception=None, **kwargs): data = kwargs.copy() if exception: data['message'] = exception.message return json.dumps(data), 400 def log_action(kind, user_or_orgname, metadata={}, repo=None): performer = get_authenticated_user() model.log_action(kind, user_or_orgname, performer=performer, ip=request.remote_addr, metadata=metadata, repository=repo) import endpoints.api.legacy import endpoints.api.repository import endpoints.api.discovery import endpoints.api.user import endpoints.api.search import endpoints.api.build import endpoints.api.webhook import endpoints.api.trigger