import json from flask import request, abort from app import notification_queue from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin, log_action, validate_json_request, api, NotFound, request_error) from endpoints.notificationevent import NotificationEvent from endpoints.notificationmethod import (NotificationMethod, CannotValidateNotificationMethodException) from endpoints.notificationhelper import build_notification_data from data import model def notification_view(notification): config = {} try: config = json.loads(notification.config_json) except: config = {} return { 'uuid': notification.uuid, 'event': notification.event.name, 'method': notification.method.name, 'config': config } @resource('/v1/repository//notification/') class RepositoryNotificationList(RepositoryParamResource): """ Resource for dealing with listing and creating notifications on a repository. """ schemas = { 'NotificationCreateRequest': { 'id': 'NotificationCreateRequest', 'type': 'object', 'description': 'Information for creating a notification on a repository', 'required': [ 'event', 'method', 'config' ], 'properties': { 'event': { 'type': 'string', 'description': 'The event on which the notification will respond', }, 'method': { 'type': 'string', 'description': 'The method of notification (such as email or web callback)', }, 'config': { 'type': 'object', 'description': 'JSON config information for the specific method of notification' } } }, } @require_repo_admin @nickname('createRepoNotification') @validate_json_request('NotificationCreateRequest') def post(self, namespace, repository): """ Create a new notification for the specified repository. """ repo = model.get_repository(namespace, repository) json = request.get_json() method_handler = NotificationMethod.get_method(json['method']) if not method_handler: raise request_error(message='Unknown method') try: method_handler.validate(repo, json['config']) except CannotValidateNotificationMethodException as ex: raise request_error(message=ex.message) notification = model.create_repo_notification(repo, json['event'], json['method'], json['config']) resp = notification_view(notification) log_action('add_repo_notification', namespace, {'repo': repository, 'notification_id': notification.uuid, 'event': json['event'], 'method': json['method']}, repo=repo) return resp, 201 @require_repo_admin @nickname('listRepoNotifications') def get(self, namespace, repository): """ List the notifications for the specified repository. """ notifications = model.list_repo_notifications(namespace, repository) return { 'notifications': [notification_view(n) for n in notifications] } @resource('/v1/repository//notification/') class RepositoryNotification(RepositoryParamResource): """ Resource for dealing with specific notifications. """ @require_repo_admin @nickname('getRepoNotification') def get(self, namespace, repository, uuid): """ Get information for the specified notification. """ try: notification = model.get_repo_notification(namespace, repository, uuid) except model.InvalidNotificationException: raise NotFound() return notification_view(notification) @require_repo_admin @nickname('deleteRepoNotification') def delete(self, namespace, repository, uuid): """ Deletes the specified notification. """ notification = model.delete_repo_notification(namespace, repository, uuid) log_action('delete_repo_notification', namespace, {'repo': repository, 'notification_id': uuid, 'event': notification.event.name, 'method': notification.method.name}, repo=model.get_repository(namespace, repository)) return 'No Content', 204 @resource('/v1/repository//notification//test') class TestRepositoryNotification(RepositoryParamResource): """ Resource for queuing a test of a notification. """ @require_repo_admin @nickname('testRepoNotification') def post(self, namespace, repository, uuid): """ Queues a test notification for this repository. """ try: notification = model.get_repo_notification(namespace, repository, uuid) except model.InvalidNotificationException: raise NotFound() event_info = NotificationEvent.get_event(notification.event.name) sample_data = event_info.get_sample_data(repository=notification.repository) notification_data = build_notification_data(notification, sample_data) notification_queue.put([namespace, repository, notification.event.name], json.dumps(notification_data)) return {}