Merge pull request #2777 from coreos-inc/joseph.schorr/QUAY-618/notificationworker-data-interface
Change notificationworker to use data interface
This commit is contained in:
commit
fdb21aa5dc
13 changed files with 167 additions and 87 deletions
|
@ -5,6 +5,6 @@ echo 'Starting notification worker'
|
|||
QUAYPATH=${QUAYPATH:-"."}
|
||||
cd ${QUAYDIR:-"/"}
|
||||
|
||||
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.notificationworker
|
||||
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.notificationworker.notificationworker
|
||||
|
||||
echo 'Notification worker exited'
|
|
@ -125,10 +125,12 @@ def delete_matching_notifications(target, kind_name, **kwargs):
|
|||
notification.delete_instance()
|
||||
|
||||
|
||||
def increment_notification_failure_count(notification_id):
|
||||
def increment_notification_failure_count(uuid):
|
||||
""" This increments the number of failures by one """
|
||||
RepositoryNotification.update(number_of_failures=RepositoryNotification.number_of_failures + 1).where(
|
||||
RepositoryNotification.id == notification_id).execute()
|
||||
(RepositoryNotification
|
||||
.update(number_of_failures=RepositoryNotification.number_of_failures + 1)
|
||||
.where(RepositoryNotification.uuid == uuid)
|
||||
.execute())
|
||||
|
||||
|
||||
def reset_notification_number_of_failures(namespace_name, repository_name, uuid):
|
||||
|
|
|
@ -132,7 +132,7 @@ class VulnerabilityFoundEvent(NotificationEvent):
|
|||
return 'info'
|
||||
|
||||
def get_sample_data(self, notification):
|
||||
event_config = json.loads(notification.event_config_json)
|
||||
event_config = notification.event_config_dict
|
||||
|
||||
# TODO(jzelinskie): remove when more endpoints have been converted to using
|
||||
# interfaces
|
||||
|
@ -154,7 +154,7 @@ class VulnerabilityFoundEvent(NotificationEvent):
|
|||
})
|
||||
|
||||
def should_perform(self, event_data, notification_data):
|
||||
event_config = json.loads(notification_data.event_config_json)
|
||||
event_config = notification_data.event_config_dict
|
||||
if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config:
|
||||
return True
|
||||
|
||||
|
@ -197,10 +197,10 @@ class BaseBuildEvent(NotificationEvent):
|
|||
return None
|
||||
|
||||
def should_perform(self, event_data, notification_data):
|
||||
if not notification_data.event_config_json:
|
||||
if not notification_data.event_config_dict:
|
||||
return True
|
||||
|
||||
event_config = json.loads(notification_data.event_config_json)
|
||||
event_config = notification_data.event_config_dict
|
||||
ref_regex = event_config.get('ref-regex') or None
|
||||
if ref_regex is None:
|
||||
return True
|
||||
|
|
|
@ -56,7 +56,7 @@ class NotificationMethod(object):
|
|||
"""
|
||||
Performs the notification method.
|
||||
|
||||
notification_obj: The noticication record itself.
|
||||
notification_obj: The noticication namedtuple.
|
||||
event_handler: The NotificationEvent handler.
|
||||
notification_data: The dict of notification data placed in the queue.
|
||||
"""
|
||||
|
@ -122,7 +122,7 @@ class QuayNotificationMethod(NotificationMethod):
|
|||
return
|
||||
|
||||
# Lookup the target user or team to which we'll send the notification.
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
config_data = notification_obj.method_config_dict
|
||||
status, err_message, target_users = self.find_targets(repository, config_data)
|
||||
if not status:
|
||||
raise NotificationMethodPerformException(err_message)
|
||||
|
@ -151,7 +151,7 @@ class EmailMethod(NotificationMethod):
|
|||
'notifications for this repository')
|
||||
|
||||
def perform(self, notification_obj, event_handler, notification_data):
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
config_data = notification_obj.method_config_dict
|
||||
email = config_data.get('email', '')
|
||||
if not email:
|
||||
return
|
||||
|
@ -179,7 +179,7 @@ class WebhookMethod(NotificationMethod):
|
|||
raise CannotValidateNotificationMethodException('Missing webhook URL')
|
||||
|
||||
def perform(self, notification_obj, event_handler, notification_data):
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
config_data = notification_obj.method_config_dict
|
||||
url = config_data.get('url', '')
|
||||
if not url:
|
||||
return
|
||||
|
@ -216,7 +216,7 @@ class FlowdockMethod(NotificationMethod):
|
|||
raise CannotValidateNotificationMethodException('Missing Flowdock API Token')
|
||||
|
||||
def perform(self, notification_obj, event_handler, notification_data):
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
config_data = notification_obj.method_config_dict
|
||||
token = config_data.get('flow_api_token', '')
|
||||
if not token:
|
||||
return
|
||||
|
@ -270,8 +270,7 @@ class HipchatMethod(NotificationMethod):
|
|||
raise CannotValidateNotificationMethodException('Missing Hipchat Room ID')
|
||||
|
||||
def perform(self, notification_obj, event_handler, notification_data):
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
|
||||
config_data = notification_obj.method_config_dict
|
||||
token = config_data.get('notification_token', '')
|
||||
room_id = config_data.get('room_id', '')
|
||||
|
||||
|
@ -385,8 +384,7 @@ class SlackMethod(NotificationMethod):
|
|||
return adjust_tags(message)
|
||||
|
||||
def perform(self, notification_obj, event_handler, notification_data):
|
||||
config_data = json.loads(notification_obj.config_json)
|
||||
|
||||
config_data = notification_obj.method_config_dict
|
||||
url = config_data.get('url', '')
|
||||
if not url:
|
||||
return
|
||||
|
|
|
@ -12,9 +12,9 @@ def test_all_notifications(app):
|
|||
'namespace_user': AttrDict(dict(username='foo')),
|
||||
'name': 'bar',
|
||||
}),
|
||||
'event_config_json': json.dumps({
|
||||
'event_config_dict': {
|
||||
'level': 'low',
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
for subc in NotificationEvent.__subclasses__():
|
||||
|
|
|
@ -18,7 +18,7 @@ class TestCreate(unittest.TestCase):
|
|||
class TestShouldPerform(unittest.TestCase):
|
||||
def test_build_emptyjson(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': None,
|
||||
'event_config_dict': None,
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -26,7 +26,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_build_nofilter(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{}',
|
||||
'event_config_dict': {},
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -47,7 +47,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_build_emptyfilter(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"ref-regex": ""}',
|
||||
'event_config_dict': {"ref-regex": ""},
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -68,7 +68,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_build_invalidfilter(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"ref-regex": "]["}',
|
||||
'event_config_dict': {"ref-regex": "]["},
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -89,7 +89,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_build_withfilter(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"ref-regex": "refs/heads/master"}',
|
||||
'event_config_dict': {"ref-regex": "refs/heads/master"},
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -117,7 +117,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_build_withwildcardfilter(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"ref-regex": "refs/heads/.+"}',
|
||||
'event_config_dict': {"ref-regex": "refs/heads/.+"},
|
||||
})
|
||||
|
||||
# No build data at all.
|
||||
|
@ -152,7 +152,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_vulnerability_notification_nolevel(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{}',
|
||||
'event_config_dict': {},
|
||||
})
|
||||
|
||||
# No level specified.
|
||||
|
@ -161,7 +161,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_vulnerability_notification_nopvulninfo(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"level": 3}',
|
||||
'event_config_dict': {"level": 3},
|
||||
})
|
||||
|
||||
# No vuln info.
|
||||
|
@ -170,7 +170,7 @@ class TestShouldPerform(unittest.TestCase):
|
|||
|
||||
def test_vulnerability_notification_normal(self):
|
||||
notification_data = AttrDict({
|
||||
'event_config_json': '{"level": 3}',
|
||||
'event_config_dict': {"level": 3},
|
||||
})
|
||||
|
||||
info = {"vulnerability": {"priority": "Critical"}}
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from data import model
|
||||
from workers.notificationworker import NotificationWorker
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
|
||||
class NotificationWorkerTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
def test_basic_notification(self):
|
||||
# Ensure the public user doesn't have any notifications.
|
||||
target_user = model.user.get_user('public')
|
||||
self.assertEquals(0, len(list(model.notification.list_notifications(target_user))))
|
||||
|
||||
# Add a basic build notification.
|
||||
repo = model.repository.get_repository('devtable', 'simple')
|
||||
method_data = {
|
||||
'target': {
|
||||
'kind': 'user',
|
||||
'name': 'public',
|
||||
}
|
||||
}
|
||||
notification = model.notification.create_repo_notification(repo, 'build_success',
|
||||
'quay_notification', method_data, {})
|
||||
|
||||
notification_uuid = notification.uuid
|
||||
event_data = {}
|
||||
|
||||
# Fire off the queue processing.
|
||||
worker = NotificationWorker(None)
|
||||
worker.process_queue_item({
|
||||
'notification_uuid': notification_uuid,
|
||||
'event_data': event_data,
|
||||
})
|
||||
|
||||
# Ensure the notification was handled.
|
||||
self.assertEquals(1, len(list(model.notification.list_notifications(target_user))))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -8,6 +8,7 @@ from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|||
from endpoints.notificationevent import VulnerabilityFoundEvent
|
||||
from endpoints.v2 import v2_bp
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from util.morecollections import AttrDict
|
||||
from util.secscan.api import SecurityScannerAPI, APIRequestFailure
|
||||
from util.secscan.analyzer import LayerAnalyzer
|
||||
from util.secscan.fake import fake_security_scanner
|
||||
|
@ -531,6 +532,14 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
def notification_tuple(self, notification):
|
||||
# TODO(jschorr): Replace this with a method once we refactor the notification stuff into its
|
||||
# own module.
|
||||
return AttrDict({
|
||||
'event_config_dict': json.loads(notification.event_config_json),
|
||||
'method_config_dict': json.loads(notification.config_json),
|
||||
})
|
||||
|
||||
def test_notification_no_new_layers_increased_severity(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
@ -591,18 +600,22 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
|
||||
# Verify that an event would be raised.
|
||||
event_data = item_body['event_data']
|
||||
notification = self.notification_tuple(notification)
|
||||
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||
|
||||
# Create another notification with a matching level and verify it will be raised.
|
||||
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
||||
'quay_notification', {},
|
||||
{'level': 1})
|
||||
|
||||
notification = self.notification_tuple(notification)
|
||||
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||
|
||||
# Create another notification with a higher level and verify it won't be raised.
|
||||
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
||||
'quay_notification', {},
|
||||
{'level': 0})
|
||||
notification = self.notification_tuple(notification)
|
||||
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||
|
||||
def test_select_images_to_scan(self):
|
||||
|
|
0
workers/notificationworker/__init__.py
Normal file
0
workers/notificationworker/__init__.py
Normal file
50
workers/notificationworker/models_interface.py
Normal file
50
workers/notificationworker/models_interface.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
from abc import ABCMeta, abstractmethod
|
||||
from collections import namedtuple
|
||||
from six import add_metaclass
|
||||
|
||||
|
||||
class Repository(namedtuple('Repository', ['namespace_name', 'name'])):
|
||||
"""
|
||||
Repository represents a repository.
|
||||
"""
|
||||
|
||||
|
||||
class Notification(
|
||||
namedtuple('Notification', [
|
||||
'uuid', 'event_name', 'method_name', 'event_config_dict', 'method_config_dict',
|
||||
'repository'])):
|
||||
"""
|
||||
Notification represents a registered notification of some kind.
|
||||
"""
|
||||
|
||||
|
||||
@add_metaclass(ABCMeta)
|
||||
class NotificationWorkerDataInterface(object):
|
||||
"""
|
||||
Interface that represents all data store interactions required by the notification worker.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_enabled_notification(self, notification_uuid):
|
||||
""" Returns an *enabled* notification with the given UUID, or None if none. """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def reset_number_of_failures_to_zero(self, notification):
|
||||
""" Resets the number of failures for the given notification back to zero. """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def increment_notification_failure_count(self, notification):
|
||||
""" Increments the number of failures on the given notification. """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_notification_for_testing(self, target_username):
|
||||
""" Creates a notification for testing. """
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def user_has_local_notifications(self, target_username):
|
||||
""" Returns whether there are any Quay-local notifications for the given user. """
|
||||
pass
|
46
workers/notificationworker/models_pre_oci.py
Normal file
46
workers/notificationworker/models_pre_oci.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
import json
|
||||
|
||||
from data import model
|
||||
from workers.notificationworker.models_interface import (
|
||||
NotificationWorkerDataInterface, Notification, Repository)
|
||||
|
||||
|
||||
class PreOCIModel(NotificationWorkerDataInterface):
|
||||
def get_enabled_notification(self, notification_uuid):
|
||||
try:
|
||||
notification_row = model.notification.get_enabled_notification(notification_uuid)
|
||||
except model.InvalidNotificationException:
|
||||
return None
|
||||
|
||||
return Notification(uuid=notification_uuid, event_name=notification_row.event.name,
|
||||
method_name=notification_row.method.name,
|
||||
event_config_dict=json.loads(notification_row.event_config_json),
|
||||
method_config_dict=json.loads(notification_row.config_json),
|
||||
repository=Repository(notification_row.repository.namespace_user.username,
|
||||
notification_row.repository.name))
|
||||
|
||||
def reset_number_of_failures_to_zero(self, notification):
|
||||
model.notification.reset_notification_number_of_failures(
|
||||
notification.repository.namespace_name, notification.repository.name, notification.uuid)
|
||||
|
||||
def increment_notification_failure_count(self, notification):
|
||||
model.notification.increment_notification_failure_count(notification.uuid)
|
||||
|
||||
def create_notification_for_testing(self, target_username):
|
||||
repo = model.repository.get_repository('devtable', 'simple')
|
||||
method_data = {
|
||||
'target': {
|
||||
'kind': 'user',
|
||||
'name': target_username,
|
||||
}
|
||||
}
|
||||
notification = model.notification.create_repo_notification(repo, 'build_success',
|
||||
'quay_notification', method_data, {})
|
||||
return notification.uuid
|
||||
|
||||
def user_has_local_notifications(self, target_username):
|
||||
user = model.user.get_namespace_user(target_username)
|
||||
return bool(list(model.notification.list_notifications(user)))
|
||||
|
||||
|
||||
pre_oci_model = PreOCIModel()
|
|
@ -1,29 +1,22 @@
|
|||
import logging
|
||||
|
||||
from app import notification_queue
|
||||
from data.model.notification import increment_notification_failure_count, reset_number_of_failures_to_zero
|
||||
|
||||
from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException
|
||||
from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException
|
||||
from workers.notificationworker.models_pre_oci import pre_oci_model as model
|
||||
from workers.queueworker import QueueWorker, JobException
|
||||
|
||||
from data import model
|
||||
from data.model import InvalidNotificationException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationWorker(QueueWorker):
|
||||
def process_queue_item(self, job_details):
|
||||
notification_uuid = job_details['notification_uuid']
|
||||
|
||||
try:
|
||||
notification = model.notification.get_enabled_notification(notification_uuid)
|
||||
except InvalidNotificationException:
|
||||
notification = model.get_enabled_notification(job_details['notification_uuid'])
|
||||
if notification is None:
|
||||
return
|
||||
|
||||
event_name = notification.event.name
|
||||
method_name = notification.method.name
|
||||
event_name = notification.event_name
|
||||
method_name = notification.method_name
|
||||
|
||||
try:
|
||||
event_handler = NotificationEvent.get_event(event_name)
|
||||
|
@ -38,9 +31,9 @@ class NotificationWorker(QueueWorker):
|
|||
if event_handler.should_perform(job_details['event_data'], notification):
|
||||
try:
|
||||
method_handler.perform(notification, event_handler, job_details)
|
||||
reset_number_of_failures_to_zero(notification.id)
|
||||
model.reset_number_of_failures_to_zero(notification)
|
||||
except (JobException, KeyError) as exc:
|
||||
increment_notification_failure_count(notification.id)
|
||||
model.increment_notification_failure_count(notification)
|
||||
raise exc
|
||||
|
||||
|
23
workers/notificationworker/test/test_notificationworker.py
Normal file
23
workers/notificationworker/test/test_notificationworker.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
from workers.notificationworker.notificationworker import NotificationWorker
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
from workers.notificationworker.models_pre_oci import pre_oci_model as model
|
||||
|
||||
def test_basic_notification(initialized_db):
|
||||
# Ensure the public user doesn't have any notifications.
|
||||
assert not model.user_has_local_notifications('public')
|
||||
|
||||
# Add a basic build notification.
|
||||
notification_uuid = model.create_notification_for_testing('public')
|
||||
event_data = {}
|
||||
|
||||
# Fire off the queue processing.
|
||||
worker = NotificationWorker(None)
|
||||
worker.process_queue_item({
|
||||
'notification_uuid': notification_uuid,
|
||||
'event_data': event_data,
|
||||
})
|
||||
|
||||
# Ensure the notification was handled.
|
||||
assert model.user_has_local_notifications('public')
|
Reference in a new issue