parent
460ccf2dfd
commit
3f8d51ebd7
2 changed files with 41 additions and 0 deletions
|
@ -293,6 +293,24 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
}
|
||||
|
||||
|
||||
def _get_delete_notification_data(self, old_layer_ids):
|
||||
return {
|
||||
"Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
|
||||
"Created": "1456247389",
|
||||
"Notified": "1456246708",
|
||||
"Limit": 2,
|
||||
"Old": {
|
||||
"Vulnerability": {
|
||||
"Name": "CVE-TEST",
|
||||
"Namespace": "debian:8",
|
||||
"Description": "New CVE",
|
||||
"Severity": "Low",
|
||||
"FixedIn": []
|
||||
},
|
||||
"LayersIntroducingVulnerability": old_layer_ids,
|
||||
}
|
||||
}
|
||||
|
||||
def test_notification_new_layers_not_vulnerable(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
@ -331,6 +349,25 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
|
||||
def test_notification_delete(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
# Add a repo event for the layer.
|
||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
# Fire off the notification processing.
|
||||
notification_data = self._get_delete_notification_data([layer_id])
|
||||
self.assertTrue(process_notification_data(notification_data))
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
|
||||
def test_notification_new_layers(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
|
|
@ -18,6 +18,10 @@ def process_notification_data(notification_data):
|
|||
""" Processes the given notification data to spawn vulnerability notifications as necessary.
|
||||
Returns whether the processing succeeded.
|
||||
"""
|
||||
if not 'New' in notification_data:
|
||||
# Nothing to do.
|
||||
return True
|
||||
|
||||
new_data = notification_data['New']
|
||||
old_data = notification_data.get('Old', {})
|
||||
|
||||
|
|
Reference in a new issue