diff --git a/endpoints/notificationevent.py b/endpoints/notificationevent.py index 9fb40d8e2..036fb010f 100644 --- a/endpoints/notificationevent.py +++ b/endpoints/notificationevent.py @@ -111,6 +111,9 @@ def _build_summary(event_data): class VulnerabilityFoundEvent(NotificationEvent): + CONFIG_LEVEL = 'level' + VULNERABILITY_KEY = 'vulnerability' + @classmethod def event_name(cls): return 'vulnerability_found' @@ -141,19 +144,25 @@ class VulnerabilityFoundEvent(NotificationEvent): 'id': 'CVE-FAKE-CVE', 'description': 'A futurist vulnerability', 'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE', - 'priority': get_priority_for_index(event_config['level']) + 'priority': get_priority_for_index(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL]) }, }) def should_perform(self, event_data, notification_data): event_config = json.loads(notification_data.event_config_json) - filter_level_index = int(event_config['level']) + if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config: + return True - event_severity = PRIORITY_LEVELS.get(event_data['vulnerability']['priority']) + if VulnerabilityFoundEvent.VULNERABILITY_KEY not in event_data: + return False + + vuln_info = event_data.get(VulnerabilityFoundEvent.VULNERABILITY_KEY, {}) + event_severity = PRIORITY_LEVELS.get(vuln_info.get('priority', 'Unknown')) if event_severity is None: return False actual_level_index = int(event_severity['index']) + filter_level_index = int(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL]) return actual_level_index <= filter_level_index def get_summary(self, event_data, notification_data): diff --git a/test/test_notifications.py b/test/test_notifications.py index 336670261..bf490c77b 100644 --- a/test/test_notifications.py +++ b/test/test_notifications.py @@ -1,6 +1,7 @@ import unittest -from endpoints.notificationevent import BuildSuccessEvent, NotificationEvent +from endpoints.notificationevent import (BuildSuccessEvent, NotificationEvent, + VulnerabilityFoundEvent) from util.morecollections import AttrDict class TestCreate(unittest.TestCase): @@ -10,6 +11,7 @@ class TestCreate(unittest.TestCase): self.assertIsNotNone(NotificationEvent.get_event('build_success')) self.assertIsNotNone(NotificationEvent.get_event('build_failure')) self.assertIsNotNone(NotificationEvent.get_event('build_start')) + self.assertIsNotNone(NotificationEvent.get_event('vulnerability_found')) class TestShouldPerform(unittest.TestCase): @@ -147,6 +149,35 @@ class TestShouldPerform(unittest.TestCase): }, notification_data)) + def test_vulnerability_notification_nolevel(self): + notification_data = AttrDict({ + 'event_config_json': '{}', + }) + + # No level specified. + self.assertTrue(VulnerabilityFoundEvent().should_perform({}, notification_data)) + + + def test_vulnerability_notification_nopvulninfo(self): + notification_data = AttrDict({ + 'event_config_json': '{"level": 3}', + }) + + # No vuln info. + self.assertFalse(VulnerabilityFoundEvent().should_perform({}, notification_data)) + + + def test_vulnerability_notification_normal(self): + notification_data = AttrDict({ + 'event_config_json': '{"level": 3}', + }) + + info = {"vulnerability": {"priority": "Critical"}} + self.assertTrue(VulnerabilityFoundEvent().should_perform(info, notification_data)) + + + + if __name__ == '__main__': unittest.main()