Change notificationworker to use a data interface

This commit is contained in:
Joseph Schorr 2017-07-12 15:50:47 +03:00
parent 8ec198228c
commit b6f1782642
10 changed files with 149 additions and 56 deletions

View file

@ -8,6 +8,7 @@ from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
from endpoints.notificationevent import VulnerabilityFoundEvent
from endpoints.v2 import v2_bp
from initdb import setup_database_for_testing, finished_database_for_testing
from util.morecollections import AttrDict
from util.secscan.api import SecurityScannerAPI, APIRequestFailure
from util.secscan.analyzer import LayerAnalyzer
from util.secscan.fake import fake_security_scanner
@ -531,6 +532,14 @@ class TestSecurityScanner(unittest.TestCase):
# Ensure that there are no event queue items for the layer.
self.assertIsNone(notification_queue.get())
def notification_tuple(self, notification):
# TODO(jschorr): Replace this with a method once we refactor the notification stuff into its
# own module.
return AttrDict({
'event_config_dict': json.loads(notification.event_config_json),
'method_config_dict': json.loads(notification.config_json),
})
def test_notification_no_new_layers_increased_severity(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
@ -591,18 +600,22 @@ class TestSecurityScanner(unittest.TestCase):
# Verify that an event would be raised.
event_data = item_body['event_data']
notification = self.notification_tuple(notification)
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
# Create another notification with a matching level and verify it will be raised.
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
'quay_notification', {},
{'level': 1})
notification = self.notification_tuple(notification)
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
# Create another notification with a higher level and verify it won't be raised.
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
'quay_notification', {},
{'level': 0})
notification = self.notification_tuple(notification)
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
def test_select_images_to_scan(self):