Add another test for security notification filtering
This commit is contained in:
parent
d036ff6d0d
commit
6a4584b87a
2 changed files with 23 additions and 7 deletions
|
@ -122,10 +122,14 @@ class VulnerabilityFoundEvent(NotificationEvent):
|
||||||
|
|
||||||
def should_perform(self, event_data, notification_data):
|
def should_perform(self, event_data, notification_data):
|
||||||
event_config = json.loads(notification_data.event_config_json)
|
event_config = json.loads(notification_data.event_config_json)
|
||||||
expected_level_index = event_config['level']
|
filter_level_index = int(event_config['level'])
|
||||||
priority = PRIORITY_LEVELS[event_data['vulnerability']['priority']]
|
|
||||||
actual_level_index = priority['index']
|
event_severity = PRIORITY_LEVELS.get(event_data['vulnerability']['priority'])
|
||||||
return actual_level_index <= expected_level_index
|
if event_severity is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
actual_level_index = int(event_severity['index'])
|
||||||
|
return actual_level_index <= filter_level_index
|
||||||
|
|
||||||
def get_summary(self, event_data, notification_data):
|
def get_summary(self, event_data, notification_data):
|
||||||
msg = '%s vulnerability detected in repository %s in tags %s'
|
msg = '%s vulnerability detected in repository %s in tags %s'
|
||||||
|
|
|
@ -4,6 +4,7 @@ import os
|
||||||
from httmock import urlmatch, all_requests, HTTMock
|
from httmock import urlmatch, all_requests, HTTMock
|
||||||
|
|
||||||
from app import app, config_provider, storage, notification_queue
|
from app import app, config_provider, storage, notification_queue
|
||||||
|
from endpoints.notificationevent import VulnerabilityFoundEvent
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
||||||
from util.secscan.analyzer import LayerAnalyzer
|
from util.secscan.analyzer import LayerAnalyzer
|
||||||
|
@ -405,7 +406,7 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
|
|
||||||
# Add a repo event for the layer.
|
# Add a repo event for the layer.
|
||||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||||
|
|
||||||
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
||||||
def get_matching_layer_vulnerable(url, request):
|
def get_matching_layer_vulnerable(url, request):
|
||||||
|
@ -436,7 +437,7 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
|
|
||||||
# Fire off the notification processing.
|
# Fire off the notification processing.
|
||||||
with HTTMock(get_matching_layer_vulnerable, response_content):
|
with HTTMock(get_matching_layer_vulnerable, response_content):
|
||||||
notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High')
|
notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical')
|
||||||
self.assertTrue(process_notification_data(notification_data))
|
self.assertTrue(process_notification_data(notification_data))
|
||||||
|
|
||||||
# Ensure an event was written for the tag.
|
# Ensure an event was written for the tag.
|
||||||
|
@ -446,9 +447,20 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
body = json.loads(queue_item.body)
|
body = json.loads(queue_item.body)
|
||||||
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
||||||
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
||||||
self.assertEquals('High', body['event_data']['vulnerability']['priority'])
|
self.assertEquals('Critical', body['event_data']['vulnerability']['priority'])
|
||||||
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
||||||
|
|
||||||
|
# Verify that an event would be raised.
|
||||||
|
event_data = body['event_data']
|
||||||
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||||
|
|
||||||
|
# Create another notification with a matching level and verify it will be raised.
|
||||||
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1})
|
||||||
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||||
|
|
||||||
|
# Create another notification with a higher level and verify it won't be raised.
|
||||||
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0})
|
||||||
|
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
Reference in a new issue