diff --git a/test/test_secscan.py b/test/test_secscan.py index f5be0797f..8311d7792 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -293,6 +293,24 @@ class TestSecurityScanner(unittest.TestCase): } + def _get_delete_notification_data(self, old_layer_ids): + return { + "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", + "Created": "1456247389", + "Notified": "1456246708", + "Limit": 2, + "Old": { + "Vulnerability": { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "New CVE", + "Severity": "Low", + "FixedIn": [] + }, + "LayersIntroducingVulnerability": old_layer_ids, + } + } + def test_notification_new_layers_not_vulnerable(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) @@ -331,6 +349,25 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) + def test_notification_delete(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + # Fire off the notification processing. + notification_data = self._get_delete_notification_data([layer_id]) + self.assertTrue(process_notification_data(notification_data)) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + def test_notification_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) diff --git a/util/secscan/notifier.py b/util/secscan/notifier.py index a6df8ed9a..e3e3ce9c4 100644 --- a/util/secscan/notifier.py +++ b/util/secscan/notifier.py @@ -18,6 +18,10 @@ def process_notification_data(notification_data): """ Processes the given notification data to spawn vulnerability notifications as necessary. Returns whether the processing succeeded. """ + if not 'New' in notification_data: + # Nothing to do. + return True + new_data = notification_data['New'] old_data = notification_data.get('Old', {})