Fix NPE raised if a vulnerability notification doesn't have a level filter
Fixes #1990
This commit is contained in:
parent
7f9e01a1fe
commit
886489c666
2 changed files with 44 additions and 4 deletions
|
@ -111,6 +111,9 @@ def _build_summary(event_data):
|
||||||
|
|
||||||
|
|
||||||
class VulnerabilityFoundEvent(NotificationEvent):
|
class VulnerabilityFoundEvent(NotificationEvent):
|
||||||
|
CONFIG_LEVEL = 'level'
|
||||||
|
VULNERABILITY_KEY = 'vulnerability'
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def event_name(cls):
|
def event_name(cls):
|
||||||
return 'vulnerability_found'
|
return 'vulnerability_found'
|
||||||
|
@ -141,19 +144,25 @@ class VulnerabilityFoundEvent(NotificationEvent):
|
||||||
'id': 'CVE-FAKE-CVE',
|
'id': 'CVE-FAKE-CVE',
|
||||||
'description': 'A futurist vulnerability',
|
'description': 'A futurist vulnerability',
|
||||||
'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE',
|
'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE',
|
||||||
'priority': get_priority_for_index(event_config['level'])
|
'priority': get_priority_for_index(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL])
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
def should_perform(self, event_data, notification_data):
|
def should_perform(self, event_data, notification_data):
|
||||||
event_config = json.loads(notification_data.event_config_json)
|
event_config = json.loads(notification_data.event_config_json)
|
||||||
filter_level_index = int(event_config['level'])
|
if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config:
|
||||||
|
return True
|
||||||
|
|
||||||
event_severity = PRIORITY_LEVELS.get(event_data['vulnerability']['priority'])
|
if VulnerabilityFoundEvent.VULNERABILITY_KEY not in event_data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
vuln_info = event_data.get(VulnerabilityFoundEvent.VULNERABILITY_KEY, {})
|
||||||
|
event_severity = PRIORITY_LEVELS.get(vuln_info.get('priority', 'Unknown'))
|
||||||
if event_severity is None:
|
if event_severity is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
actual_level_index = int(event_severity['index'])
|
actual_level_index = int(event_severity['index'])
|
||||||
|
filter_level_index = int(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL])
|
||||||
return actual_level_index <= filter_level_index
|
return actual_level_index <= filter_level_index
|
||||||
|
|
||||||
def get_summary(self, event_data, notification_data):
|
def get_summary(self, event_data, notification_data):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from endpoints.notificationevent import BuildSuccessEvent, NotificationEvent
|
from endpoints.notificationevent import (BuildSuccessEvent, NotificationEvent,
|
||||||
|
VulnerabilityFoundEvent)
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
|
|
||||||
class TestCreate(unittest.TestCase):
|
class TestCreate(unittest.TestCase):
|
||||||
|
@ -10,6 +11,7 @@ class TestCreate(unittest.TestCase):
|
||||||
self.assertIsNotNone(NotificationEvent.get_event('build_success'))
|
self.assertIsNotNone(NotificationEvent.get_event('build_success'))
|
||||||
self.assertIsNotNone(NotificationEvent.get_event('build_failure'))
|
self.assertIsNotNone(NotificationEvent.get_event('build_failure'))
|
||||||
self.assertIsNotNone(NotificationEvent.get_event('build_start'))
|
self.assertIsNotNone(NotificationEvent.get_event('build_start'))
|
||||||
|
self.assertIsNotNone(NotificationEvent.get_event('vulnerability_found'))
|
||||||
|
|
||||||
|
|
||||||
class TestShouldPerform(unittest.TestCase):
|
class TestShouldPerform(unittest.TestCase):
|
||||||
|
@ -147,6 +149,35 @@ class TestShouldPerform(unittest.TestCase):
|
||||||
}, notification_data))
|
}, notification_data))
|
||||||
|
|
||||||
|
|
||||||
|
def test_vulnerability_notification_nolevel(self):
|
||||||
|
notification_data = AttrDict({
|
||||||
|
'event_config_json': '{}',
|
||||||
|
})
|
||||||
|
|
||||||
|
# No level specified.
|
||||||
|
self.assertTrue(VulnerabilityFoundEvent().should_perform({}, notification_data))
|
||||||
|
|
||||||
|
|
||||||
|
def test_vulnerability_notification_nopvulninfo(self):
|
||||||
|
notification_data = AttrDict({
|
||||||
|
'event_config_json': '{"level": 3}',
|
||||||
|
})
|
||||||
|
|
||||||
|
# No vuln info.
|
||||||
|
self.assertFalse(VulnerabilityFoundEvent().should_perform({}, notification_data))
|
||||||
|
|
||||||
|
|
||||||
|
def test_vulnerability_notification_normal(self):
|
||||||
|
notification_data = AttrDict({
|
||||||
|
'event_config_json': '{"level": 3}',
|
||||||
|
})
|
||||||
|
|
||||||
|
info = {"vulnerability": {"priority": "Critical"}}
|
||||||
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(info, notification_data))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
||||||
|
|
Reference in a new issue