This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/repositorynotification.py

134 lines
4.6 KiB
Python

import json
from flask import request
from app import notification_queue
from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin,
log_action, validate_json_request, api, NotFound)
from endpoints.notificationevent import NotificationEvent
from data import model
def notification_view(notification):
config = {}
try:
config = json.loads(notification.config_json)
except:
config = {}
return {
'uuid': notification.uuid,
'event': notification.event.name,
'method': notification.method.name,
'config': config
}
@resource('/v1/repository/<repopath:repository>/notification/')
class RepositoryNotificationList(RepositoryParamResource):
""" Resource for dealing with listing and creating notifications on a repository. """
schemas = {
'NotificationCreateRequest': {
'id': 'NotificationCreateRequest',
'type': 'object',
'description': 'Information for creating a notification on a repository',
'required': [
'event',
'method',
'config'
],
'properties': {
'event': {
'type': 'string',
'description': 'The event on which the notification will respond',
},
'method': {
'type': 'string',
'description': 'The method of notification (such as email or web callback)',
},
'config': {
'type': 'object',
'description': 'JSON config information for the specific method of notification'
}
}
},
}
@require_repo_admin
@nickname('createRepoNotification')
@validate_json_request('NotificationCreateRequest')
def post(self, namespace, repository):
""" Create a new notification for the specified repository. """
repo = model.get_repository(namespace, repository)
json = request.get_json()
notification = model.create_repo_notification(repo, json['event'], json['method'],
json['config'])
resp = notification_view(notification)
log_action('add_repo_notification', namespace,
{'repo': repository, 'notification_id': notification.uuid,
'event': json['event'], 'method': json['method']},
repo=repo)
return resp, 201
@require_repo_admin
@nickname('listRepoNotifications')
def get(self, namespace, repository):
""" List the notifications for the specified repository. """
notifications = model.list_repo_notifications(namespace, repository)
return {
'notifications': [notification_view(n) for n in notifications]
}
@resource('/v1/repository/<repopath:repository>/notification/<uuid>')
class RepositoryNotification(RepositoryParamResource):
""" Resource for dealing with specific notifications. """
@require_repo_admin
@nickname('getRepoNotification')
def get(self, namespace, repository, uuid):
""" Get information for the specified notification. """
try:
notification = model.get_repo_notification(namespace, repository, uuid)
except model.InvalidNotificationException:
raise NotFound()
return notification_view(notification)
@require_repo_admin
@nickname('deleteRepoNotification')
def delete(self, namespace, repository, uuid):
""" Deletes the specified notification. """
notification = model.delete_repo_notification(namespace, repository, uuid)
log_action('delete_repo_notification', namespace,
{'repo': repository, 'notification_id': uuid,
'event': notification.event.name, 'method': notification.method.name},
repo=model.get_repository(namespace, repository))
return 'No Content', 204
@resource('/v1/repository/<repopath:repository>/notification/<uuid>/test')
class TestRepositoryNotification(RepositoryParamResource):
""" Resource for queuing a test of a notification. """
@require_repo_admin
@nickname('testRepoNotification')
def post(self, namespace, repository, uuid):
""" Queues a test notification for this repository. """
try:
notification = model.get_repo_notification(namespace, repository, uuid)
except model.InvalidNotificationException:
raise NotFound()
event_info = NotificationEvent.get_event(notification.event.name)
sample_data = event_info.get_sample_data(repository=notification.repository)
notification_data = {
'notification_id': notification.id,
'repository_id': notification.repository.id,
'event_data': sample_data
}
notification_queue.put([namespace, repository, notification.event.name],
json.dumps(notification_data))
return {}