This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/repositorynotification_models_interface.py
2017-07-17 17:56:32 -04:00

79 lines
1.9 KiB
Python

import json
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from six import add_metaclass
class RepositoryNotification(
namedtuple('RepositoryNotification', [
'uuid',
'title',
'event_name',
'method_name',
'config_json',
'event_config_json',
'number_of_failures',
])):
"""
RepositoryNotification represents a notification for a repository.
:type uuid: string
:type event: string
:type method: string
:type config: string
:type title: string
:type event_config: string
:type number_of_failures: int
"""
def to_dict(self):
try:
config = json.loads(self.config_json)
except:
config = {}
try:
event_config = json.loads(self.event_config_json)
except:
event_config = {}
return {
'uuid': self.uuid,
'title': self.title,
'event': self.event_name,
'method': self.method_name,
'config': config,
'event_config': event_config,
'number_of_failures': self.number_of_failures,
}
@add_metaclass(ABCMeta)
class RepoNotificationInterface(object):
"""
Interface that represents all data store interactions required by the RepositoryNotification API
"""
@abstractmethod
def create_repo_notification(self, namespace_name, repository_name, event_name, method_name, method_config, event_config, title=None):
pass
@abstractmethod
def list_repo_notifications(self, namespace_name, repository_name, event_name=None):
pass
@abstractmethod
def get_repo_notification(self, uuid):
pass
@abstractmethod
def delete_repo_notification(self, namespace_name, repository_name, uuid):
pass
@abstractmethod
def reset_notification_number_of_failures(self, namespace_name, repository_name, uuid):
pass
@abstractmethod
def queue_test_notification(self, uuid):
pass