import logging from base64 import b64encode import cnr from cnr.api.impl import registry as cnr_registry from cnr.api.registry import repo_name, _pull from cnr.exception import (CnrException, InvalidUsage, InvalidParams, InvalidRelease, UnableToLockResource, UnauthorizedAccess, Unsupported, ChannelNotFound, PackageAlreadyExists, PackageNotFound, PackageReleaseNotFound) from flask import request, jsonify from auth.process import process_auth from auth.auth_context import get_authenticated_user from auth.permissions import CreateRepositoryPermission, ModifyRepositoryPermission from endpoints.appr import appr_bp, require_app_repo_read, require_app_repo_write from endpoints.appr.cnr_backend import Package, Channel, Blob from endpoints.decorators import anon_allowed, anon_protect logger = logging.getLogger(__name__) @appr_bp.errorhandler(Unsupported) @appr_bp.errorhandler(PackageAlreadyExists) @appr_bp.errorhandler(InvalidRelease) @appr_bp.errorhandler(UnableToLockResource) @appr_bp.errorhandler(UnauthorizedAccess) @appr_bp.errorhandler(PackageNotFound) @appr_bp.errorhandler(PackageReleaseNotFound) @appr_bp.errorhandler(CnrException) @appr_bp.errorhandler(InvalidUsage) @appr_bp.errorhandler(InvalidParams) @appr_bp.errorhandler(ChannelNotFound) def render_error(error): response = jsonify({"error": error.to_dict()}) response.status_code = error.status_code return response @appr_bp.route("/version") @anon_allowed def version(): return jsonify({"cnr-api": cnr.__version__}) @appr_bp.route("/api/v1/users/login", methods=['POST']) @anon_allowed def login(): """ Todo: * Implement better login protocol """ values = request.get_json(force=True, silent=True) return jsonify({'token': "basic " + b64encode("%s:%s" % (values['user']['username'], values['user']['password']))}) # @TODO: Redirect to S3 url @appr_bp.route( "/api/v1/packages///blobs/sha256/", methods=['GET'], strict_slashes=False, ) def blobs(namespace, package_name, digest): reponame = repo_name(namespace, package_name) data = cnr_registry.pull_blob(reponame, digest, blob_class=Blob) json_format = request.args.get('format', None) == 'json' return _pull(data, json_format=json_format) @appr_bp.route("/api/v1/packages", methods=['GET'], strict_slashes=False) @process_auth @anon_protect def list_packages(): namespace = request.args.get('namespace', None) media_type = request.args.get('media_type', None) query = request.args.get('query', None) user = get_authenticated_user() username = None if user: username = user.username result_data = cnr_registry.list_packages(namespace, package_class=Package, search=query, media_type=media_type, username=username) return jsonify(result_data) @appr_bp.route( "/api/v1/packages////", methods=['DELETE'], strict_slashes=False) @process_auth @require_app_repo_write @anon_protect def delete_package(namespace, package_name, release, media_type): reponame = repo_name(namespace, package_name) result = cnr_registry.delete_package(reponame, release, media_type, package_class=Package) return jsonify(result) @appr_bp.route( "/api/v1/packages////", methods=['GET'], strict_slashes=False ) def show_package(namespace, package_name, release, media_type): reponame = repo_name(namespace, package_name) result = cnr_registry.show_package(reponame, release, media_type, channel_class=Channel, package_class=Package) return jsonify(result) @appr_bp.route("/api/v1/packages//", methods=['GET'], strict_slashes=False) @process_auth @require_app_repo_read @anon_protect def show_package_releases(namespace, package_name): reponame = repo_name(namespace, package_name) media_type = request.args.get('media_type', None) result = cnr_registry.show_package_releases(reponame, media_type=media_type, package_class=Package) return jsonify(result) @appr_bp.route("/api/v1/packages///", methods=['GET'], strict_slashes=False) @process_auth @require_app_repo_read @anon_protect def show_package_releasse_manifests(namespace, package_name, release): reponame = repo_name(namespace, package_name) result = cnr_registry.show_package_manifests(reponame, release, package_class=Package) return jsonify(result) @appr_bp.route( "/api/v1/packages/////pull", methods=['GET'], strict_slashes=False, ) @process_auth @require_app_repo_read @anon_protect def pull(namespace, package_name, release, media_type): reponame = repo_name(namespace, package_name) logger.info("pull %s", reponame) data = cnr_registry.pull(reponame, release, media_type, Package, blob_class=Blob) return _pull(data) @appr_bp.route("/api/v1/packages//", methods=['POST'], strict_slashes=False) @process_auth @anon_protect def push(namespace, package_name): reponame = repo_name(namespace, package_name) values = request.get_json(force=True, silent=True) release_version = values['release'] media_type = values['media_type'] force = request.args.get('force', 'false') == 'true' private = values.get('visibility', 'public') owner = get_authenticated_user() if not Package.exists(reponame): if not CreateRepositoryPermission(namespace).can(): raise UnauthorizedAccess("Unauthorized access for: %s" % reponame, {"package": reponame, "scopes": ['create']}) Package.create_repository(reponame, private, owner) if not ModifyRepositoryPermission(namespace, package_name).can(): raise UnauthorizedAccess("Unauthorized access for: %s" % reponame, {"package": reponame, "scopes": ['push']}) blob = Blob(reponame, values['blob']) app_release = cnr_registry.push(reponame, release_version, media_type, blob, force, package_class=Package, user=owner, visibility=private) return jsonify(app_release) @appr_bp.route("/api/v1/packages/search", methods=['GET'], strict_slashes=False) @process_auth @anon_protect def search_packages(): query = request.args.get("q") user = get_authenticated_user() username = None if user: username = user.username search_results = cnr_registry.search(query, Package, username=username) return jsonify(search_results) # CHANNELS @appr_bp.route("/api/v1/packages///channels", methods=['GET'], strict_slashes=False) @process_auth @require_app_repo_read @anon_protect def list_channels(namespace, package_name): reponame = repo_name(namespace, package_name) return jsonify(cnr_registry.list_channels(reponame, channel_class=Channel)) @appr_bp.route("/api/v1/packages///channels/", methods=['GET'], strict_slashes=False) @process_auth @require_app_repo_read @anon_protect def show_channel(namespace, package_name, channel_name): reponame = repo_name(namespace, package_name) channel = cnr_registry.show_channel(reponame, channel_name, channel_class=Channel) return jsonify(channel) @appr_bp.route( "/api/v1/packages///channels//", methods=['POST'], strict_slashes=False, ) @process_auth @require_app_repo_write @anon_protect def add_channel_release(namespace, package_name, channel_name, release): reponame = repo_name(namespace, package_name) result = cnr_registry.add_channel_release(reponame, channel_name, release, channel_class=Channel, package_class=Package) return jsonify(result) @appr_bp.route( "/api/v1/packages///channels//", methods=['DELETE'], strict_slashes=False, ) @process_auth @require_app_repo_write @anon_protect def delete_channel_release(namespace, package_name, channel_name, release): reponame = repo_name(namespace, package_name) result = cnr_registry.delete_channel_release(reponame, channel_name, release, channel_class=Channel, package_class=Package) return jsonify(result) @appr_bp.route( "/api/v1/packages///channels/", methods=['DELETE'], strict_slashes=False, ) @process_auth @require_app_repo_write @anon_protect def delete_channel(namespace, package_name, channel_name): reponame = repo_name(namespace, package_name) result = cnr_registry.delete_channel(reponame, channel_name, channel_class=Channel) return jsonify(result)