Change security notification code to use the new stream diff reporters

This ensures that even if security scanner pagination sends Old and New layer IDs on different pages, they will properly be handled across the entire notification.

Fixes https://www.pivotaltracker.com/story/show/136133657
This commit is contained in:
Joseph Schorr 2016-12-19 17:15:59 -05:00
parent ced0149520
commit 5b3212ea0e
5 changed files with 301 additions and 190 deletions

View file

@ -71,6 +71,17 @@ class IndexedStreamingDiffTrackerTests(unittest.TestCase):
self.assertEquals(['a', 'c'], added) self.assertEquals(['a', 'c'], added)
def test_multiple_done(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 3)
tracker.push_new([('a', 0), ('b', 1), ('c', 2)])
tracker.push_old([('b', 1)])
tracker.done()
tracker.done()
self.assertEquals(['a', 'c'], added)
def test_same_streams(self): def test_same_streams(self):
added = [] added = []
@ -105,6 +116,20 @@ class IndexedStreamingDiffTrackerTests(unittest.TestCase):
self.assertEquals(['a', 'b', 'c'], added) self.assertEquals(['a', 'b', 'c'], added)
def test_old_pagination_no_repeat(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('new1', 3), ('new2', 4)])
tracker.push_old([('old1', 1), ('old2', 2)])
tracker.push_new([])
tracker.push_old([('new1', 3)])
tracker.done()
self.assertEquals(['new2'], added)
def test_old_pagination(self): def test_old_pagination(self):
added = [] added = []

View file

@ -11,7 +11,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing
from util.secscan.api import SecurityScannerAPI from util.secscan.api import SecurityScannerAPI
from util.secscan.analyzer import LayerAnalyzer from util.secscan.analyzer import LayerAnalyzer
from util.secscan.fake import fake_security_scanner from util.secscan.fake import fake_security_scanner
from util.secscan.notifier import process_notification_data from util.secscan.notifier import SecurityNotificationHandler, ProcessNotificationPageResult
from workers.security_notification_worker import SecurityNotificationWorker from workers.security_notification_worker import SecurityNotificationWorker
@ -20,6 +20,13 @@ SIMPLE_REPO = 'simple'
COMPLEX_REPO = 'complex' COMPLEX_REPO = 'complex'
def process_notification_data(notification_data):
handler = SecurityNotificationHandler(100)
result = handler.process_notification_page_data(notification_data)
handler.send_notifications()
return result == ProcessNotificationPageResult.FINISHED_PROCESSING
class TestSecurityScanner(unittest.TestCase): class TestSecurityScanner(unittest.TestCase):
def setUp(self): def setUp(self):
# Enable direct download in fake storage. # Enable direct download in fake storage.
@ -634,7 +641,7 @@ class TestSecurityScanner(unittest.TestCase):
security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info]) security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info])
layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)] layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)]
notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info) notification_data = security_scanner.add_notification([], layer_ids, None, new_vuln_info)
# Test with a known notification with pages. # Test with a known notification with pages.
data = { data = {
@ -642,7 +649,7 @@ class TestSecurityScanner(unittest.TestCase):
} }
worker = SecurityNotificationWorker(None) worker = SecurityNotificationWorker(None)
self.assertTrue(worker.perform_notification_work(data, layer_limit=1)) self.assertTrue(worker.perform_notification_work(data, layer_limit=2))
# Make sure all pages were processed by ensuring we have two notifications. # Make sure all pages were processed by ensuring we have two notifications.
time.sleep(1) time.sleep(1)
@ -650,108 +657,98 @@ class TestSecurityScanner(unittest.TestCase):
self.assertIsNotNone(notification_queue.get()) self.assertIsNotNone(notification_queue.get())
def test_notification_worker_offset_pages(self): def test_notification_worker_offset_pages_not_indexed(self):
# Try without indexes.
self.assert_notification_worker_offset_pages(indexed=False)
def get_layer_id(repo_name, tag):
# Create a repository notification for the repo, if it doesn't exist.
has_notification = model.notification.list_repo_notifications(ADMIN_ACCESS_USER, repo_name,
'vulnerability_found')
if not list(has_notification):
repo = model.repository.get_repository(ADMIN_ACCESS_USER, repo_name)
model.notification.create_repo_notification(repo, 'vulnerability_found',
'quay_notification', {}, {'level': 100})
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, repo_name, tag, include_storage=True) def test_notification_worker_offset_pages_indexed(self):
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Try with indexes.
self.assert_notification_worker_offset_pages(indexed=True)
# Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we
# will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs,
# but since the first 2 'Old' layer IDs are "earlier" than the shared ID of
# `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page. The
# notification handling system should correctly not notify for this layer, even though it is
# marked 'New' on page 1 and marked 'Old' on page 2. In practice, Clair will served these IDs
# sorted in the same manner.
new_layer_ids = [get_layer_id('simple', 'latest'), get_layer_id('complex', 'prod')]
old_layer_ids = ['someid1', 'someid2', get_layer_id('simple', 'latest')]
apis_called = [] def assert_notification_worker_offset_pages(self, indexed=False):
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') # Add a repo events for the layers.
def get_matching_layer_vulnerable(url, request): simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
apis_called.append('VULN') complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO)
return json.dumps({
"Layer": {
"Name": 'somelayerid',
"Namespace": "debian:8",
"IndexedByVersion": 1,
"Features": [
{
"Name": "coreutils",
"Namespace": "debian:8",
"Version": "8.23-4",
"Vulnerabilities": [
{
"Name": "CVE-TEST",
"Namespace": "debian:8",
"Severity": "Low",
}
],
}
]
}
})
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') model.notification.create_repo_notification(simple_repo, 'vulnerability_found',
def delete_notification(url, request): 'quay_notification', {}, {'level': 100})
apis_called.append('DELETE') model.notification.create_repo_notification(complex_repo, 'vulnerability_found',
return {'status_code': 201, 'content': ''} 'quay_notification', {}, {'level': 100})
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') # Ensure that there are no event queue items for the layer.
def get_notification(url, request):
if url.query.find('page=nextpage') >= 0:
apis_called.append('GET-2')
data = {
'Notification': self._get_notification_data(new_layer_ids[2:], old_layer_ids[2:]),
}
return json.dumps(data)
else:
apis_called.append('GET-1')
notification_data = self._get_notification_data(new_layer_ids[0:2], old_layer_ids[0:2])
notification_data['NextPage'] = 'nextpage'
data = {
'Notification': notification_data,
}
return json.dumps(data)
# Ensure that there are no event queue items for any layers.
self.assertIsNone(notification_queue.get()) self.assertIsNone(notification_queue.get())
# Test with a known notification with pages. with fake_security_scanner() as security_scanner:
data = { # Test with an unknown notification.
'Name': 'somenotification'
}
with HTTMock(get_notification, delete_notification, get_matching_layer_vulnerable):
worker = SecurityNotificationWorker(None) worker = SecurityNotificationWorker(None)
self.assertTrue(worker.perform_notification_work(data)) self.assertFalse(worker.perform_notification_work({
'Name': 'unknownnotification'
}))
# Verify each of the expected API calls were made. # Add some analyzed layers.
self.assertEquals(set(['GET-1', 'GET-2', 'DELETE', 'VULN']), set(apis_called)) analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer1)
analyzer.analyze_recursively(layer2)
# Verify that we have notifications *just* for the New layer. # Add a notification with pages of data.
expected_item = notification_queue.get() new_vuln_info = {
self.assertIsNotNone(expected_item) "Name": "CVE-TEST",
item_body = json.loads(expected_item['body']) "Namespace": "debian:8",
self.assertEquals('devtable/complex', item_body['event_data']['repository']) "Description": "Some service",
self.assertEquals(['prod'], item_body['event_data']['tags']) "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
"Severity": "Critical",
"FixedIn": {'Version': "9.23-5"},
}
security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info])
security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info])
# Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we
# will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs,
# but since the first 2 'Old' layer IDs are "earlier" than the shared ID of
# `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page.
# The notification handling system should correctly not notify for this layer, even though it
# is marked 'New' on page 1 and marked 'Old' on page 2. Clair will served these
# IDs sorted in the same manner.
idx_old_layer_ids = [{'LayerName': 'old1', 'Index': 1},
{'LayerName': 'old2', 'Index': 2},
{'LayerName': security_scanner.layer_id(layer1), 'Index': 3}]
idx_new_layer_ids = [{'LayerName': security_scanner.layer_id(layer1), 'Index': 3},
{'LayerName': security_scanner.layer_id(layer2), 'Index': 4}]
old_layer_ids = [t['LayerName'] for t in idx_old_layer_ids]
new_layer_ids = [t['LayerName'] for t in idx_new_layer_ids]
if not indexed:
idx_old_layer_ids = None
idx_new_layer_ids = None
notification_data = security_scanner.add_notification(old_layer_ids, new_layer_ids, None,
new_vuln_info, max_per_page=2,
indexed_old_layer_ids=idx_old_layer_ids,
indexed_new_layer_ids=idx_new_layer_ids)
# Test with a known notification with pages.
data = {
'Name': notification_data['Name'],
}
worker = SecurityNotificationWorker(None)
self.assertTrue(worker.perform_notification_work(data, layer_limit=2))
# Make sure all pages were processed by ensuring we have only one notification. If the second
# page was not processed, then the `Old` entry for layer1 will not be found, and we'd get two
# notifications.
time.sleep(1)
self.assertIsNotNone(notification_queue.get())
self.assertIsNone(notification_queue.get())
# Make sure we have no additional notifications.
self.assertIsNone(notification_queue.get())
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -58,15 +58,22 @@ class FakeSecurityScanner(object):
""" Returns whether a notification with the given ID is found in the scanner. """ """ Returns whether a notification with the given ID is found in the scanner. """
return notification_id in self.notifications return notification_id in self.notifications
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln): def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln, max_per_page=100,
indexed_old_layer_ids=None, indexed_new_layer_ids=None):
""" Adds a new notification over the given sets of layer IDs and vulnerability information, """ Adds a new notification over the given sets of layer IDs and vulnerability information,
returning the structural data of the notification created. returning the structural data of the notification created.
""" """
notification_id = str(uuid.uuid4()) notification_id = str(uuid.uuid4())
if old_vuln is None:
old_vuln = dict(new_vuln)
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids, self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
new_layer_ids=new_layer_ids, new_layer_ids=new_layer_ids,
old_vuln=old_vuln, old_vuln=old_vuln,
new_vuln=new_vuln) new_vuln=new_vuln,
max_per_page=max_per_page,
indexed_old_layer_ids=indexed_old_layer_ids,
indexed_new_layer_ids=indexed_new_layer_ids)
return self._get_notification_data(notification_id, 0, 100) return self._get_notification_data(notification_id, 0, 100)
@ -106,6 +113,8 @@ class FakeSecurityScanner(object):
""" Returns the structural data for the notification with the given ID, paginated using """ Returns the structural data for the notification with the given ID, paginated using
the given page and limit. """ the given page and limit. """
notification = self.notifications[notification_id] notification = self.notifications[notification_id]
limit = min(limit, notification['max_per_page'])
notification_data = { notification_data = {
"Name": notification_id, "Name": notification_id,
"Created": "1456247389", "Created": "1456247389",
@ -127,6 +136,11 @@ class FakeSecurityScanner(object):
'LayersIntroducingVulnerability': old_layer_ids, 'LayersIntroducingVulnerability': old_layer_ids,
} }
if notification.get('indexed_old_layer_ids', None):
indexed_old_layer_ids = notification['indexed_old_layer_ids'][start_index:end_index]
notification_data['Old']['OrderedLayersIntroducingVulnerability'] = indexed_old_layer_ids
if notification.get('new_vuln'): if notification.get('new_vuln'):
new_layer_ids = notification['new_layer_ids'] new_layer_ids = notification['new_layer_ids']
new_layer_ids = new_layer_ids[start_index:end_index] new_layer_ids = new_layer_ids[start_index:end_index]
@ -137,6 +151,11 @@ class FakeSecurityScanner(object):
'LayersIntroducingVulnerability': new_layer_ids, 'LayersIntroducingVulnerability': new_layer_ids,
} }
if notification.get('indexed_new_layer_ids', None):
indexed_new_layer_ids = notification['indexed_new_layer_ids'][start_index:end_index]
notification_data['New']['OrderedLayersIntroducingVulnerability'] = indexed_new_layer_ids
if has_additional_page: if has_additional_page:
notification_data['NextPage'] = str(page+1) notification_data['NextPage'] = str(page+1)

View file

@ -1,9 +1,8 @@
import logging import logging
import sys import sys
from enum import Enum
from collections import defaultdict from collections import defaultdict
from enum import Enum
from app import secscan_api from app import secscan_api
from data.model.tag import filter_tags_have_repository_event, get_matching_tags from data.model.tag import filter_tags_have_repository_event, get_matching_tags
@ -12,111 +11,169 @@ from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repos
from endpoints.notificationhelper import notification_batch from endpoints.notificationhelper import notification_batch
from util.secscan import PRIORITY_LEVELS from util.secscan import PRIORITY_LEVELS
from util.secscan.api import APIRequestFailure from util.secscan.api import APIRequestFailure
from util.morecollections import AttrDict, StreamingDiffTracker from util.morecollections import AttrDict, StreamingDiffTracker, IndexedStreamingDiffTracker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ProcessNotificationPageResult(Enum): class ProcessNotificationPageResult(Enum):
FINISHED_PAGE = 'Finished Page' FINISHED_PAGE = 'Finished Page'
FINISHED_PROCESSING = 'Finished Processing' FINISHED_PROCESSING = 'Finished Processing'
FAILED = 'Failed' FAILED = 'Failed'
def process_notification_page_data(notification_page_data): class SecurityNotificationHandler(object):
""" Processes the given notification page data to spawn vulnerability notifications as necessary. """ Class to process paginated notifications from the security scanner and issue
Returns the status of the processing. Quay vulnerability_found notifications for all necessary tags. Callers should
initialize, call process_notification_page_data for each page until it returns
FINISHED_PROCESSING or FAILED and, if succeeded, then call send_notifications
to send out the notifications queued.
""" """
if not 'New' in notification_page_data: def __init__(self, results_per_stream):
# Nothing more to do. self.tag_map = defaultdict(set)
self.repository_map = {}
self.check_map = {}
self.stream_tracker = None
self.results_per_stream = results_per_stream
self.reporting_failed = False
self.vulnerability_info = None
self.event = ExternalNotificationEvent.get(name='vulnerability_found')
def send_notifications(self):
""" Sends all queued up notifications. """
if self.vulnerability_info is None:
return
new_vuln = self.vulnerability_info
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
# For each of the tags found, issue a notification.
with notification_batch() as spawn_notification:
for repository_id in self.tag_map:
tags = self.tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': new_vuln['Name'],
'description': new_vuln.get('Description', None),
'link': new_vuln.get('Link', None),
'priority': new_severity['title'],
'has_fix': 'FixedIn' in new_vuln,
},
}
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repository = AttrDict({
'namespace_name': self.repository_map[repository_id].namespace_user.username,
'name': self.repository_map[repository_id].name,
})
spawn_notification(repository, 'vulnerability_found', event_data)
def process_notification_page_data(self, notification_page_data):
""" Processes the given notification page data to spawn vulnerability notifications as
necessary. Returns the status of the processing.
"""
if not 'New' in notification_page_data:
return self._done()
new_data = notification_page_data['New']
old_data = notification_page_data.get('Old', {})
new_vuln = new_data['Vulnerability']
old_vuln = old_data.get('Vulnerability', {})
self.vulnerability_info = new_vuln
new_layer_ids = new_data.get('LayersIntroducingVulnerability', [])
old_layer_ids = old_data.get('LayersIntroducingVulnerability', [])
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
# Check if the severity of the vulnerability has increased. If so, then we report this
# vulnerability for *all* layers, rather than a difference, as it is important for everyone.
if new_severity['index'] < old_severity['index']:
# The vulnerability has had its severity increased. Report for *all* layers.
all_layer_ids = set(new_layer_ids) | set(old_layer_ids)
for layer_id in all_layer_ids:
self._report(layer_id)
if 'NextPage' not in notification_page_data:
return self._done()
else:
return ProcessNotificationPageResult.FINISHED_PAGE
# Otherwise, only send the notification to new layers. To find only the new layers, we
# need to do a streaming diff vs the old layer IDs stream.
# Check for ordered data. If found, we use the indexed tracker, which is faster and
# more memory efficient.
is_indexed = False
if 'OrderedLayersIntroducingVulnerability' in new_data:
def tuplize(stream):
return [(entry['LayerName'], entry['Index']) for entry in stream]
new_layer_ids = tuplize(new_data.get('OrderedLayersIntroducingVulnerability', []))
old_layer_ids = tuplize(old_data.get('OrderedLayersIntroducingVulnerability', []))
is_indexed = True
# If this is the first call, initialize the tracker.
if self.stream_tracker is None:
self.stream_tracker = (IndexedStreamingDiffTracker(self._report, self.results_per_stream)
if is_indexed
else StreamingDiffTracker(self._report, self.results_per_stream))
# Call to add the old and new layer ID streams to the tracker. The tracker itself will
# call _report whenever it has determined a new layer has been found.
self.stream_tracker.push_new(new_layer_ids)
self.stream_tracker.push_old(old_layer_ids)
# If the reporting failed at any point, nothing more we can do.
if self.reporting_failed:
return ProcessNotificationPageResult.FAILED
# Check to see if there are any additional pages to process.
if 'NextPage' not in notification_page_data:
return self._done()
else:
return ProcessNotificationPageResult.FINISHED_PAGE
def _done(self):
if self.stream_tracker is not None:
self.stream_tracker.done()
if self.reporting_failed:
return ProcessNotificationPageResult.FAILED
return ProcessNotificationPageResult.FINISHED_PROCESSING return ProcessNotificationPageResult.FINISHED_PROCESSING
new_data = notification_page_data['New'] def _report(self, new_layer_id):
old_data = notification_page_data.get('Old', {})
new_vuln = new_data['Vulnerability']
old_vuln = old_data.get('Vulnerability', {})
new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', []))
old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', []))
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
# By default we only notify the new layers that are affected by the vulnerability. If, however,
# the severity of the vulnerability has increased, we need to notify *all* layers, as we might
# need to send new notifications for older layers.
notify_layers = new_layer_ids - old_layer_ids
if new_severity['index'] < old_severity['index']:
notify_layers = new_layer_ids | old_layer_ids
if not notify_layers:
# Nothing more to do.
return ProcessNotificationPageResult.FINISHED_PAGE
# Lookup the external event for when we have vulnerabilities.
event = ExternalNotificationEvent.get(name='vulnerability_found')
# For each layer, retrieving the matching tags and join with repository to determine which
# require new notifications.
tag_map = defaultdict(set)
repository_map = {}
cve_id = new_vuln['Name']
# Find all tags that contain the layer(s) introducing the vulnerability,
# in repositories that have the event setup.
for layer_id in notify_layers:
# Split the layer ID into its Docker Image ID and storage ID. # Split the layer ID into its Docker Image ID and storage ID.
(docker_image_id, storage_uuid) = layer_id.split('.', 2) (docker_image_id, storage_uuid) = new_layer_id.split('.', 2)
# Find the matching tags. # Find the matching tags.
matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository,
Image, ImageStorage) Image, ImageStorage)
tags = list(filter_tags_have_repository_event(matching, event)) tags = list(filter_tags_have_repository_event(matching, self.event))
check_map = {} cve_id = self.vulnerability_info['Name']
for tag in tags: for tag in tags:
# Verify that the tag's root image has the vulnerability. # Verify that the tag's *top layer* has the vulnerability.
tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid)
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) if not tag_layer_id in self.check_map:
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)
if not tag_layer_id in check_map:
try: try:
is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) self.check_map[tag_layer_id] = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id)
except APIRequestFailure: except APIRequestFailure:
return ProcessNotificationPageResult.FAILED self.reporting_failed = True
return
check_map[tag_layer_id] = is_vulerable
logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id,
check_map[tag_layer_id]) self.check_map[tag_layer_id])
if self.check_map[tag_layer_id]:
if check_map[tag_layer_id]:
# Add the vulnerable tag to the list. # Add the vulnerable tag to the list.
tag_map[tag.repository_id].add(tag.name) self.tag_map[tag.repository_id].add(tag.name)
repository_map[tag.repository_id] = tag.repository self.repository_map[tag.repository_id] = tag.repository
# For each of the tags found, issue a notification.
with notification_batch() as spawn_notification:
for repository_id in tag_map:
tags = tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': cve_id,
'description': new_vuln.get('Description', None),
'link': new_vuln.get('Link', None),
'priority': new_severity['title'],
'has_fix': 'FixedIn' in new_vuln,
},
}
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repository = AttrDict({
'namespace_name': repository_map[repository_id].namespace_user.username,
'name': repository_map[repository_id].name,
})
spawn_notification(repository, 'vulnerability_found', event_data)
return ProcessNotificationPageResult.FINISHED_PAGE

View file

@ -6,7 +6,7 @@ import features
from app import secscan_notification_queue, secscan_api from app import secscan_notification_queue, secscan_api
from workers.queueworker import QueueWorker, JobException from workers.queueworker import QueueWorker, JobException
from util.secscan.notifier import process_notification_data from util.secscan.notifier import SecurityNotificationHandler, ProcessNotificationPageResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,11 +28,15 @@ class SecurityNotificationWorker(QueueWorker):
notification_name = data['Name'] notification_name = data['Name']
current_page = data.get('page', None) current_page = data.get('page', None)
handler = SecurityNotificationHandler(layer_limit)
while True: while True:
# Retrieve the current page of notification data from the security scanner.
(response_data, should_retry) = secscan_api.get_notification(notification_name, (response_data, should_retry) = secscan_api.get_notification(notification_name,
layer_limit=layer_limit, layer_limit=layer_limit,
page=current_page) page=current_page)
# If no response, something went wrong.
if response_data is None: if response_data is None:
if should_retry: if should_retry:
raise JobException() raise JobException()
@ -44,25 +48,34 @@ class SecurityNotificationWorker(QueueWorker):
# Return to mark the job as "complete", as we'll never be able to finish it. # Return to mark the job as "complete", as we'll never be able to finish it.
return False return False
# Extend processing on the queue item so it doesn't expire while we're working.
self.extend_processing(_PROCESSING_SECONDS, json.dumps(data)) self.extend_processing(_PROCESSING_SECONDS, json.dumps(data))
notification_data = response_data['Notification']
if not process_notification_data(notification_data):
raise JobException()
# Check for a next page of results. If none, we're done. # Process the notification data.
if 'NextPage' not in notification_data: notification_data = response_data['Notification']
# Mark the notification as read and processed. result = handler.process_notification_page_data(notification_data)
# Possible states after processing: failed to process, finished processing entirely
# or finished processing the page.
if result == ProcessNotificationPageResult.FAILED:
# Something went wrong.
raise JobException
if result == ProcessNotificationPageResult.FINISHED_PROCESSING:
# Mark the notification as read.
if not secscan_api.mark_notification_read(notification_name): if not secscan_api.mark_notification_read(notification_name):
# Return to mark the job as "complete", as we'll never be able to finish it. # Return to mark the job as "complete", as we'll never be able to finish it.
logger.error('Failed to mark notification %s as read', notification_name) logger.error('Failed to mark notification %s as read', notification_name)
return False return False
# Send the generated Quay notifications.
handler.send_notifications()
return True return True
# Otherwise, save the next page token into the queue item (so we can pick up from here if if result == ProcessNotificationPageResult.FINISHED_PAGE:
# something goes wrong in the next loop iteration), and continue. # Continue onto the next page.
current_page = notification_data['NextPage'] current_page = notification_data['NextPage']
data['page'] = current_page continue
if __name__ == '__main__': if __name__ == '__main__':