This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/repositorynotification.py

108 lines
3.5 KiB
Python
Raw Normal View History

import json
from flask import request
from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin,
log_action, validate_json_request, api, NotFound)
from data import model
def notification_view(notification):
config = {}
try:
config = json.loads(notification.config_json)
except:
config = {}
return {
'uuid': notification.uuid,
'kind': notification.kind,
'method': notification.method,
'config': config
}
@resource('/v1/repository/<repopath:repository>/notification/')
class NotificaitonList(RepositoryParamResource):
""" Resource for dealing with listing and creating notifications on a repository. """
schemas = {
'NotificationCreateRequest': {
'id': 'NotificationCreateRequest',
'type': 'object',
'description': 'Information for creating a notification on a repository',
'required': [
'event',
'method',
'config'
],
'properties': {
'event': {
'type': 'string',
'description': 'The event on which the notification will respond',
},
'method': {
'type': 'string',
'description': 'The method of notification (such as email or web callback)',
},
'config': {
'type': 'object',
'description': 'JSON config information for the specific method of notification'
}
}
},
}
@require_repo_admin
@nickname('createRepoNotification')
@validate_json_request('NotificationCreateRequest')
def post(self, namespace, repository):
""" Create a new notification for the specified repository. """
repo = model.get_repository(namespace, repository)
json = request.get_json()
notification = model.create_repo_notification(repo, json['event'], json['method'],
json['config'])
resp = notification_view(notification)
log_action('add_repo_notification', namespace,
{'repo': repository, 'notification_id': notification.uuid,
'event': json['event'], 'method': json['method']},
repo=repo)
return resp, 201
@require_repo_admin
@nickname('listRepoNotifications')
def get(self, namespace, repository):
""" List the notifications for the specified repository. """
notifications = model.list_repo_notifications(namespace, repository)
return {
'notifications': [notification_view(n) for n in notifications]
}
@resource('/v1/repository/<repopath:repository>/notification/<uuid>')
class Notification(RepositoryParamResource):
""" Resource for dealing with specific notifications. """
@require_repo_admin
@nickname('getRepoNotification')
def get(self, namespace, repository, uuid):
""" Get information for the specified notification. """
try:
notification = model.get_repo_notification(namespace, repository, uuid)
except model.InvalidNotificationException:
raise NotFound()
return notification_view(notification)
@require_repo_admin
@nickname('deleteRepoNotification')
def delete(self, namespace, repository, uuid):
""" Deletes the specified notification. """
notification = model.delete_repo_notification(namespace, repository, uuid)
log_action('delete_repo_notification', namespace,
{'repo': repository, 'notification_id': uuid,
'event': notification.event.name, 'method': notification.method.name},
repo=model.get_repository(namespace, repository))
return 'No Content', 204