diff --git a/endpoints/notificationevent.py b/endpoints/notificationevent.py index 925247809..f1bf0a09d 100644 --- a/endpoints/notificationevent.py +++ b/endpoints/notificationevent.py @@ -122,10 +122,14 @@ class VulnerabilityFoundEvent(NotificationEvent): def should_perform(self, event_data, notification_data): event_config = json.loads(notification_data.event_config_json) - expected_level_index = event_config['level'] - priority = PRIORITY_LEVELS[event_data['vulnerability']['priority']] - actual_level_index = priority['index'] - return actual_level_index <= expected_level_index + filter_level_index = int(event_config['level']) + + event_severity = PRIORITY_LEVELS.get(event_data['vulnerability']['priority']) + if event_severity is None: + return False + + actual_level_index = int(event_severity['index']) + return actual_level_index <= filter_level_index def get_summary(self, event_data, notification_data): msg = '%s vulnerability detected in repository %s in tags %s' diff --git a/test/test_secscan.py b/test/test_secscan.py index 4d98fae86..78c3c9ca3 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -4,6 +4,7 @@ import os from httmock import urlmatch, all_requests, HTTMock from app import app, config_provider, storage, notification_queue +from endpoints.notificationevent import VulnerabilityFoundEvent from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer @@ -405,7 +406,7 @@ class TestSecurityScanner(unittest.TestCase): # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) - model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_matching_layer_vulnerable(url, request): @@ -436,7 +437,7 @@ class TestSecurityScanner(unittest.TestCase): # Fire off the notification processing. with HTTMock(get_matching_layer_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High') + notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical') self.assertTrue(process_notification_data(notification_data)) # Ensure an event was written for the tag. @@ -446,9 +447,20 @@ class TestSecurityScanner(unittest.TestCase): body = json.loads(queue_item.body) self.assertEquals(['prod', 'latest'], body['event_data']['tags']) self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) - self.assertEquals('High', body['event_data']['vulnerability']['priority']) + self.assertEquals('Critical', body['event_data']['vulnerability']['priority']) self.assertTrue(body['event_data']['vulnerability']['has_fix']) + # Verify that an event would be raised. + event_data = body['event_data'] + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + + # Create another notification with a matching level and verify it will be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1}) + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + + # Create another notification with a higher level and verify it won't be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0}) + self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) if __name__ == '__main__': unittest.main() \ No newline at end of file