This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/webhook.py

77 lines
2.5 KiB
Python

import json
from flask import request
from endpoints.api import (RepositoryParamResource, nickname, resource, require_repo_admin,
log_action, validate_json_request, api, NotFound)
from data import model
def webhook_view(webhook):
return {
'public_id': webhook.public_id,
'parameters': json.loads(webhook.parameters),
}
@resource('/v1/repository/<path:repository>/webhook/')
class WebhookList(RepositoryParamResource):
""" Resource for dealing with listing and creating webhooks. """
schemas = {
'WebhookCreateRequest': {
'id': 'WebhookCreateRequest',
'type': 'object',
'description': 'Arbitrary json.',
},
}
@require_repo_admin
@nickname('createWebhook')
@validate_json_request('WebhookCreateRequest')
def post(self, namespace, repository):
""" Create a new webhook for the specified repository. """
repo = model.get_repository(namespace, repository)
webhook = model.create_webhook(repo, request.get_json())
resp = webhook_view(webhook)
repo_string = '%s/%s' % (namespace, repository)
headers = {
'Location': api.url_for(Webhook, repository=repo_string, public_id=webhook.public_id),
}
log_action('add_repo_webhook', namespace,
{'repo': repository, 'webhook_id': webhook.public_id},
repo=repo)
return resp, 201, headers
@require_repo_admin
@nickname('listWebhooks')
def get(self, namespace, repository):
""" List the webhooks for the specified repository. """
webhooks = model.list_webhooks(namespace, repository)
return {
'webhooks': [webhook_view(webhook) for webhook in webhooks]
}
@resource('/v1/repository/<path:repository>/webhook/<public_id>')
class Webhook(RepositoryParamResource):
""" Resource for dealing with specific webhooks. """
@require_repo_admin
@nickname('getWebhook')
def get(self, namespace, repository, public_id):
""" Get information for the specified webhook. """
try:
webhook = model.get_webhook(namespace, repository, public_id)
except model.InvalidWebhookException:
raise NotFound()
return webhook_view(webhook)
@require_repo_admin
@nickname('deleteWebhook')
def delete(self, namespace, repository, public_id):
""" Delete the specified webhook. """
model.delete_webhook(namespace, repository, public_id)
log_action('delete_repo_webhook', namespace,
{'repo': repository, 'webhook_id': public_id},
repo=model.get_repository(namespace, repository))
return 'No Content', 204