import logging from flask import request, make_response, jsonify, abort from flask.ext.login import login_required, current_user from functools import wraps from data import model from app import app from util.email import send_confirmation_email from util.names import parse_repository_name from util.gravatar import compute_hash from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission) from endpoints import registry logger = logging.getLogger(__name__) def api_login_required(f): @wraps(f) def decorated_view(*args, **kwargs): if not current_user.is_authenticated(): abort(401) return f(*args, **kwargs) return decorated_view @app.errorhandler(model.DataModelException) def handle_dme(ex): return make_response(ex.message, 400) @app.route('/api/') def welcome(): return make_response('welcome', 200) @app.route('/api/user/', methods=['GET']) def get_logged_in_user(): if current_user.is_anonymous(): return jsonify({'anonymous': True}) user = current_user.db_user return jsonify({ 'verified': user.verified, 'anonymous': False, 'username': user.username, 'email': user.email, 'gravatar': compute_hash(user.email), }) @app.route('/api/user/', methods=['POST']) def create_user_api(): user_data = request.get_json() try: new_user = model.create_user(user_data['username'], user_data['password'], user_data['email']) code = model.create_confirm_email_code(new_user) send_confirmation_email(new_user.username, new_user.email, code.code) return make_response('Created', 201) except model.DataModelException as ex: error_resp = jsonify({ 'message': ex.message, }) error_resp.status_code = 400 return error_resp @app.route('/api/users/', methods=['GET']) @api_login_required def get_matching_users(prefix): users = model.get_matching_users(prefix) return jsonify({ 'users': [user.username for user in users] }) @app.route('/api/repository/', methods=['POST']) @api_login_required def create_repo_api(): pass @app.route('/api/repository/find/', methods=['GET']) def match_repos_api(prefix): def repo_view(repo): return { 'namespace': repo.namespace, 'name': repo.name, 'description': repo.description } username = current_user.db_user.username if current_user.is_authenticated() else None matching = model.get_matching_repositories(prefix, username) response = { 'repositories': [repo_view(repo) for repo in matching] } return jsonify(response) @app.route('/api/repository/', methods=['GET']) def list_repos_api(): def repo_view(repo_obj): return { 'namespace': repo_obj.namespace, 'name': repo_obj.name, 'description': repo_obj.description, } username = current_user.db_user.username if current_user.is_authenticated() else None repos = [repo_view(repo) for repo in model.get_visible_repositories(username)] response = { 'repositories': repos } return jsonify(response) @app.route('/api/repository/', methods=['PUT']) @api_login_required @parse_repository_name def update_repo_api(namespace, repository): permission = ModifyRepositoryPermission(namespace, repository) if permission.can(): repo = model.get_repository(namespace, repository) if repo: values = request.get_json() repo.description = values['description'] repo.save() return jsonify({ 'success': True }) abort(404) @app.route('/api/repository//changevisibility', methods=['POST']) @api_login_required @parse_repository_name def change_repo_visibility_api(namespace, repository): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): repo = model.get_repository(namespace, repository) if repo: values = request.get_json() model.set_repository_visibility(repo, values['visibility']) return jsonify({ 'success': True }) abort(404) @app.route('/api/repository/', methods=['DELETE']) @api_login_required @parse_repository_name def delete_repository(namespace, repository): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): model.purge_repository(namespace, repository) registry.delete_repository_storage(namespace, repository) return make_response('Deleted', 204) abort(404) def image_view(image): return { 'id': image.docker_image_id, 'created': image.created, 'comment': image.comment, } @app.route('/api/repository/', methods=['GET']) @parse_repository_name def get_repo_api(namespace, repository): logger.debug('Get repo: %s/%s' % (namespace, repository)) def tag_view(tag): image = model.get_tag_image(namespace, repository, tag.name) if not image: return {} return { 'name': tag.name, 'image': image_view(image), } permission = ReadRepositoryPermission(namespace, repository) is_public = model.repository_is_public(namespace, repository) if permission.can() or is_public: repo = model.get_repository(namespace, repository) if repo: tags = model.list_repository_tags(namespace, repository) tag_dict = {tag.name: tag_view(tag) for tag in tags} can_write = ModifyRepositoryPermission(namespace, repository).can() can_admin = AdministerRepositoryPermission(namespace, repository).can() return jsonify({ 'namespace': namespace, 'name': repository, 'description': repo.description, 'tags': tag_dict, 'can_write': can_write, 'can_admin': can_admin, 'is_public': is_public }) abort(404) # Not fount abort(403) # Permission denied def role_view(repo_perm_obj): return { 'role': repo_perm_obj.role.name } @app.route('/api/repository//tag//images', methods=['GET']) @parse_repository_name def list_tag_images(namespace, repository, tag): permission = ReadRepositoryPermission(namespace, repository) if permission.can() or model.repository_is_public(namespace, repository): tag_image = model.get_tag_image(namespace, repository, tag) parent_images = model.get_parent_images(tag_image) parents = list(parent_images) parents.reverse() all_images = [tag_image] + parents return jsonify({ 'images': [image_view(image) for image in all_images] }) abort(403) # Permission denied @app.route('/api/repository//permissions/', methods=['GET']) @api_login_required @parse_repository_name def list_repo_permissions(namespace, repository): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): repo_perms = model.get_all_repo_users(namespace, repository) return jsonify({ 'permissions': {repo_perm.user.username: role_view(repo_perm) for repo_perm in repo_perms} }) abort(403) # Permission denied @app.route('/api/repository//permissions/', methods=['GET']) @api_login_required @parse_repository_name def get_permissions(namespace, repository, username): logger.debug('Get repo: %s/%s permissions for user %s' % (namespace, repository, username)) permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): perm = model.get_user_reponame_permission(username, namespace, repository) return jsonify(role_view(perm)) abort(403) # Permission denied @app.route('/api/repository//permissions/', methods=['PUT', 'POST']) @api_login_required @parse_repository_name def change_permissions(namespace, repository, username): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): new_permission = request.get_json() logger.debug('Setting permission to: %s for user %s' % (new_permission['role'], username)) try: perm = model.set_user_repo_permission(username, namespace, repository, new_permission['role']) except model.DataModelException: logger.warning('User tried to remove themselves as admin.') abort(409) resp = jsonify(role_view(perm)) if request.method == 'POST': resp.status_code = 201 return resp abort(403) # Permission denied @app.route('/api/repository//permissions/', methods=['DELETE']) @api_login_required @parse_repository_name def delete_permissions(namespace, repository, username): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): try: model.delete_user_permission(username, namespace, repository) except model.DataModelException: logger.warning('User tried to remove themselves as admin.') abort(409) return make_response('Deleted', 204) abort(403) # Permission denied