diff --git a/test/test_morecollections.py b/test/test_morecollections.py index ad458f533..b739b24e7 100644 --- a/test/test_morecollections.py +++ b/test/test_morecollections.py @@ -71,6 +71,17 @@ class IndexedStreamingDiffTrackerTests(unittest.TestCase): self.assertEquals(['a', 'c'], added) + def test_multiple_done(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 3) + tracker.push_new([('a', 0), ('b', 1), ('c', 2)]) + tracker.push_old([('b', 1)]) + tracker.done() + tracker.done() + + self.assertEquals(['a', 'c'], added) + def test_same_streams(self): added = [] @@ -105,6 +116,20 @@ class IndexedStreamingDiffTrackerTests(unittest.TestCase): self.assertEquals(['a', 'b', 'c'], added) + def test_old_pagination_no_repeat(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('new1', 3), ('new2', 4)]) + tracker.push_old([('old1', 1), ('old2', 2)]) + + tracker.push_new([]) + tracker.push_old([('new1', 3)]) + + tracker.done() + + self.assertEquals(['new2'], added) + def test_old_pagination(self): added = [] diff --git a/test/test_secscan.py b/test/test_secscan.py index e6a9d3f1e..dce1ac13e 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -11,7 +11,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI from util.secscan.analyzer import LayerAnalyzer from util.secscan.fake import fake_security_scanner -from util.secscan.notifier import process_notification_data +from util.secscan.notifier import SecurityNotificationHandler, ProcessNotificationPageResult from workers.security_notification_worker import SecurityNotificationWorker @@ -20,6 +20,13 @@ SIMPLE_REPO = 'simple' COMPLEX_REPO = 'complex' +def process_notification_data(notification_data): + handler = SecurityNotificationHandler(100) + result = handler.process_notification_page_data(notification_data) + handler.send_notifications() + return result == ProcessNotificationPageResult.FINISHED_PROCESSING + + class TestSecurityScanner(unittest.TestCase): def setUp(self): # Enable direct download in fake storage. @@ -634,7 +641,7 @@ class TestSecurityScanner(unittest.TestCase): security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info]) layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)] - notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info) + notification_data = security_scanner.add_notification([], layer_ids, None, new_vuln_info) # Test with a known notification with pages. data = { @@ -642,7 +649,7 @@ class TestSecurityScanner(unittest.TestCase): } worker = SecurityNotificationWorker(None) - self.assertTrue(worker.perform_notification_work(data, layer_limit=1)) + self.assertTrue(worker.perform_notification_work(data, layer_limit=2)) # Make sure all pages were processed by ensuring we have two notifications. time.sleep(1) @@ -650,108 +657,98 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNotNone(notification_queue.get()) - def test_notification_worker_offset_pages(self): + def test_notification_worker_offset_pages_not_indexed(self): + # Try without indexes. + self.assert_notification_worker_offset_pages(indexed=False) - def get_layer_id(repo_name, tag): - # Create a repository notification for the repo, if it doesn't exist. - has_notification = model.notification.list_repo_notifications(ADMIN_ACCESS_USER, repo_name, - 'vulnerability_found') - if not list(has_notification): - repo = model.repository.get_repository(ADMIN_ACCESS_USER, repo_name) - model.notification.create_repo_notification(repo, 'vulnerability_found', - 'quay_notification', {}, {'level': 100}) - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, repo_name, tag, include_storage=True) - return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + def test_notification_worker_offset_pages_indexed(self): + # Try with indexes. + self.assert_notification_worker_offset_pages(indexed=True) - # Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we - # will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs, - # but since the first 2 'Old' layer IDs are "earlier" than the shared ID of - # `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page. The - # notification handling system should correctly not notify for this layer, even though it is - # marked 'New' on page 1 and marked 'Old' on page 2. In practice, Clair will served these IDs - # sorted in the same manner. - new_layer_ids = [get_layer_id('simple', 'latest'), get_layer_id('complex', 'prod')] - old_layer_ids = ['someid1', 'someid2', get_layer_id('simple', 'latest')] - apis_called = [] + def assert_notification_worker_offset_pages(self, indexed=False): + layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) + layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_vulnerable(url, request): - apis_called.append('VULN') - return json.dumps({ - "Layer": { - "Name": 'somelayerid', - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [ - { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Severity": "Low", - } - ], - } - ] - } - }) + # Add a repo events for the layers. + simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') - def delete_notification(url, request): - apis_called.append('DELETE') - return {'status_code': 201, 'content': ''} + model.notification.create_repo_notification(simple_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) + model.notification.create_repo_notification(complex_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') - def get_notification(url, request): - if url.query.find('page=nextpage') >= 0: - apis_called.append('GET-2') - - data = { - 'Notification': self._get_notification_data(new_layer_ids[2:], old_layer_ids[2:]), - } - - return json.dumps(data) - else: - apis_called.append('GET-1') - - notification_data = self._get_notification_data(new_layer_ids[0:2], old_layer_ids[0:2]) - notification_data['NextPage'] = 'nextpage' - - data = { - 'Notification': notification_data, - } - - return json.dumps(data) - - # Ensure that there are no event queue items for any layers. + # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) - # Test with a known notification with pages. - data = { - 'Name': 'somenotification' - } - - with HTTMock(get_notification, delete_notification, get_matching_layer_vulnerable): + with fake_security_scanner() as security_scanner: + # Test with an unknown notification. worker = SecurityNotificationWorker(None) - self.assertTrue(worker.perform_notification_work(data)) + self.assertFalse(worker.perform_notification_work({ + 'Name': 'unknownnotification' + })) - # Verify each of the expected API calls were made. - self.assertEquals(set(['GET-1', 'GET-2', 'DELETE', 'VULN']), set(apis_called)) + # Add some analyzed layers. + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer1) + analyzer.analyze_recursively(layer2) - # Verify that we have notifications *just* for the New layer. - expected_item = notification_queue.get() - self.assertIsNotNone(expected_item) - item_body = json.loads(expected_item['body']) - self.assertEquals('devtable/complex', item_body['event_data']['repository']) - self.assertEquals(['prod'], item_body['event_data']['tags']) + # Add a notification with pages of data. + new_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Critical", + "FixedIn": {'Version': "9.23-5"}, + } + + security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info]) + security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info]) + + # Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we + # will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs, + # but since the first 2 'Old' layer IDs are "earlier" than the shared ID of + # `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page. + # The notification handling system should correctly not notify for this layer, even though it + # is marked 'New' on page 1 and marked 'Old' on page 2. Clair will served these + # IDs sorted in the same manner. + idx_old_layer_ids = [{'LayerName': 'old1', 'Index': 1}, + {'LayerName': 'old2', 'Index': 2}, + {'LayerName': security_scanner.layer_id(layer1), 'Index': 3}] + + idx_new_layer_ids = [{'LayerName': security_scanner.layer_id(layer1), 'Index': 3}, + {'LayerName': security_scanner.layer_id(layer2), 'Index': 4}] + + old_layer_ids = [t['LayerName'] for t in idx_old_layer_ids] + new_layer_ids = [t['LayerName'] for t in idx_new_layer_ids] + + if not indexed: + idx_old_layer_ids = None + idx_new_layer_ids = None + + notification_data = security_scanner.add_notification(old_layer_ids, new_layer_ids, None, + new_vuln_info, max_per_page=2, + indexed_old_layer_ids=idx_old_layer_ids, + indexed_new_layer_ids=idx_new_layer_ids) + + # Test with a known notification with pages. + data = { + 'Name': notification_data['Name'], + } + + worker = SecurityNotificationWorker(None) + self.assertTrue(worker.perform_notification_work(data, layer_limit=2)) + + # Make sure all pages were processed by ensuring we have only one notification. If the second + # page was not processed, then the `Old` entry for layer1 will not be found, and we'd get two + # notifications. + time.sleep(1) + self.assertIsNotNone(notification_queue.get()) + self.assertIsNone(notification_queue.get()) - # Make sure we have no additional notifications. - self.assertIsNone(notification_queue.get()) if __name__ == '__main__': diff --git a/util/secscan/fake.py b/util/secscan/fake.py index a741868c5..0ed5c12f5 100644 --- a/util/secscan/fake.py +++ b/util/secscan/fake.py @@ -58,15 +58,22 @@ class FakeSecurityScanner(object): """ Returns whether a notification with the given ID is found in the scanner. """ return notification_id in self.notifications - def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln): + def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln, max_per_page=100, + indexed_old_layer_ids=None, indexed_new_layer_ids=None): """ Adds a new notification over the given sets of layer IDs and vulnerability information, returning the structural data of the notification created. """ notification_id = str(uuid.uuid4()) + if old_vuln is None: + old_vuln = dict(new_vuln) + self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids, new_layer_ids=new_layer_ids, old_vuln=old_vuln, - new_vuln=new_vuln) + new_vuln=new_vuln, + max_per_page=max_per_page, + indexed_old_layer_ids=indexed_old_layer_ids, + indexed_new_layer_ids=indexed_new_layer_ids) return self._get_notification_data(notification_id, 0, 100) @@ -106,6 +113,8 @@ class FakeSecurityScanner(object): """ Returns the structural data for the notification with the given ID, paginated using the given page and limit. """ notification = self.notifications[notification_id] + limit = min(limit, notification['max_per_page']) + notification_data = { "Name": notification_id, "Created": "1456247389", @@ -127,6 +136,11 @@ class FakeSecurityScanner(object): 'LayersIntroducingVulnerability': old_layer_ids, } + if notification.get('indexed_old_layer_ids', None): + indexed_old_layer_ids = notification['indexed_old_layer_ids'][start_index:end_index] + notification_data['Old']['OrderedLayersIntroducingVulnerability'] = indexed_old_layer_ids + + if notification.get('new_vuln'): new_layer_ids = notification['new_layer_ids'] new_layer_ids = new_layer_ids[start_index:end_index] @@ -137,6 +151,11 @@ class FakeSecurityScanner(object): 'LayersIntroducingVulnerability': new_layer_ids, } + if notification.get('indexed_new_layer_ids', None): + indexed_new_layer_ids = notification['indexed_new_layer_ids'][start_index:end_index] + notification_data['New']['OrderedLayersIntroducingVulnerability'] = indexed_new_layer_ids + + if has_additional_page: notification_data['NextPage'] = str(page+1) diff --git a/util/secscan/notifier.py b/util/secscan/notifier.py index a31905cad..336514ce9 100644 --- a/util/secscan/notifier.py +++ b/util/secscan/notifier.py @@ -1,9 +1,8 @@ import logging import sys -from enum import Enum - from collections import defaultdict +from enum import Enum from app import secscan_api from data.model.tag import filter_tags_have_repository_event, get_matching_tags @@ -12,111 +11,169 @@ from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repos from endpoints.notificationhelper import notification_batch from util.secscan import PRIORITY_LEVELS from util.secscan.api import APIRequestFailure -from util.morecollections import AttrDict, StreamingDiffTracker +from util.morecollections import AttrDict, StreamingDiffTracker, IndexedStreamingDiffTracker + logger = logging.getLogger(__name__) - class ProcessNotificationPageResult(Enum): FINISHED_PAGE = 'Finished Page' FINISHED_PROCESSING = 'Finished Processing' FAILED = 'Failed' -def process_notification_page_data(notification_page_data): - """ Processes the given notification page data to spawn vulnerability notifications as necessary. - Returns the status of the processing. +class SecurityNotificationHandler(object): + """ Class to process paginated notifications from the security scanner and issue + Quay vulnerability_found notifications for all necessary tags. Callers should + initialize, call process_notification_page_data for each page until it returns + FINISHED_PROCESSING or FAILED and, if succeeded, then call send_notifications + to send out the notifications queued. """ - if not 'New' in notification_page_data: - # Nothing more to do. + def __init__(self, results_per_stream): + self.tag_map = defaultdict(set) + self.repository_map = {} + self.check_map = {} + + self.stream_tracker = None + self.results_per_stream = results_per_stream + self.reporting_failed = False + self.vulnerability_info = None + + self.event = ExternalNotificationEvent.get(name='vulnerability_found') + + def send_notifications(self): + """ Sends all queued up notifications. """ + if self.vulnerability_info is None: + return + + new_vuln = self.vulnerability_info + new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) + + # For each of the tags found, issue a notification. + with notification_batch() as spawn_notification: + for repository_id in self.tag_map: + tags = self.tag_map[repository_id] + event_data = { + 'tags': list(tags), + 'vulnerability': { + 'id': new_vuln['Name'], + 'description': new_vuln.get('Description', None), + 'link': new_vuln.get('Link', None), + 'priority': new_severity['title'], + 'has_fix': 'FixedIn' in new_vuln, + }, + } + + # TODO(jzelinskie): remove when more endpoints have been converted to using interfaces + repository = AttrDict({ + 'namespace_name': self.repository_map[repository_id].namespace_user.username, + 'name': self.repository_map[repository_id].name, + }) + + spawn_notification(repository, 'vulnerability_found', event_data) + + def process_notification_page_data(self, notification_page_data): + """ Processes the given notification page data to spawn vulnerability notifications as + necessary. Returns the status of the processing. + """ + if not 'New' in notification_page_data: + return self._done() + + new_data = notification_page_data['New'] + old_data = notification_page_data.get('Old', {}) + + new_vuln = new_data['Vulnerability'] + old_vuln = old_data.get('Vulnerability', {}) + + self.vulnerability_info = new_vuln + + new_layer_ids = new_data.get('LayersIntroducingVulnerability', []) + old_layer_ids = old_data.get('LayersIntroducingVulnerability', []) + + new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) + old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) + + # Check if the severity of the vulnerability has increased. If so, then we report this + # vulnerability for *all* layers, rather than a difference, as it is important for everyone. + if new_severity['index'] < old_severity['index']: + # The vulnerability has had its severity increased. Report for *all* layers. + all_layer_ids = set(new_layer_ids) | set(old_layer_ids) + for layer_id in all_layer_ids: + self._report(layer_id) + + if 'NextPage' not in notification_page_data: + return self._done() + else: + return ProcessNotificationPageResult.FINISHED_PAGE + + # Otherwise, only send the notification to new layers. To find only the new layers, we + # need to do a streaming diff vs the old layer IDs stream. + + # Check for ordered data. If found, we use the indexed tracker, which is faster and + # more memory efficient. + is_indexed = False + if 'OrderedLayersIntroducingVulnerability' in new_data: + def tuplize(stream): + return [(entry['LayerName'], entry['Index']) for entry in stream] + + new_layer_ids = tuplize(new_data.get('OrderedLayersIntroducingVulnerability', [])) + old_layer_ids = tuplize(old_data.get('OrderedLayersIntroducingVulnerability', [])) + is_indexed = True + + # If this is the first call, initialize the tracker. + if self.stream_tracker is None: + self.stream_tracker = (IndexedStreamingDiffTracker(self._report, self.results_per_stream) + if is_indexed + else StreamingDiffTracker(self._report, self.results_per_stream)) + + # Call to add the old and new layer ID streams to the tracker. The tracker itself will + # call _report whenever it has determined a new layer has been found. + self.stream_tracker.push_new(new_layer_ids) + self.stream_tracker.push_old(old_layer_ids) + + # If the reporting failed at any point, nothing more we can do. + if self.reporting_failed: + return ProcessNotificationPageResult.FAILED + + # Check to see if there are any additional pages to process. + if 'NextPage' not in notification_page_data: + return self._done() + else: + return ProcessNotificationPageResult.FINISHED_PAGE + + def _done(self): + if self.stream_tracker is not None: + self.stream_tracker.done() + + if self.reporting_failed: + return ProcessNotificationPageResult.FAILED + return ProcessNotificationPageResult.FINISHED_PROCESSING - new_data = notification_page_data['New'] - old_data = notification_page_data.get('Old', {}) - - new_vuln = new_data['Vulnerability'] - old_vuln = old_data.get('Vulnerability', {}) - - new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', [])) - old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', [])) - - new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) - old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) - - # By default we only notify the new layers that are affected by the vulnerability. If, however, - # the severity of the vulnerability has increased, we need to notify *all* layers, as we might - # need to send new notifications for older layers. - notify_layers = new_layer_ids - old_layer_ids - if new_severity['index'] < old_severity['index']: - notify_layers = new_layer_ids | old_layer_ids - - if not notify_layers: - # Nothing more to do. - return ProcessNotificationPageResult.FINISHED_PAGE - - # Lookup the external event for when we have vulnerabilities. - event = ExternalNotificationEvent.get(name='vulnerability_found') - - # For each layer, retrieving the matching tags and join with repository to determine which - # require new notifications. - tag_map = defaultdict(set) - repository_map = {} - cve_id = new_vuln['Name'] - - # Find all tags that contain the layer(s) introducing the vulnerability, - # in repositories that have the event setup. - for layer_id in notify_layers: + def _report(self, new_layer_id): # Split the layer ID into its Docker Image ID and storage ID. - (docker_image_id, storage_uuid) = layer_id.split('.', 2) + (docker_image_id, storage_uuid) = new_layer_id.split('.', 2) # Find the matching tags. matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, Image, ImageStorage) - tags = list(filter_tags_have_repository_event(matching, event)) + tags = list(filter_tags_have_repository_event(matching, self.event)) - check_map = {} + cve_id = self.vulnerability_info['Name'] for tag in tags: - # Verify that the tag's root image has the vulnerability. + # Verify that the tag's *top layer* has the vulnerability. tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) - logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) - - if not tag_layer_id in check_map: + if not tag_layer_id in self.check_map: + logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) try: - is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) + self.check_map[tag_layer_id] = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) except APIRequestFailure: - return ProcessNotificationPageResult.FAILED - - check_map[tag_layer_id] = is_vulerable + self.reporting_failed = True + return logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, - check_map[tag_layer_id]) - - if check_map[tag_layer_id]: + self.check_map[tag_layer_id]) + if self.check_map[tag_layer_id]: # Add the vulnerable tag to the list. - tag_map[tag.repository_id].add(tag.name) - repository_map[tag.repository_id] = tag.repository - - # For each of the tags found, issue a notification. - with notification_batch() as spawn_notification: - for repository_id in tag_map: - tags = tag_map[repository_id] - event_data = { - 'tags': list(tags), - 'vulnerability': { - 'id': cve_id, - 'description': new_vuln.get('Description', None), - 'link': new_vuln.get('Link', None), - 'priority': new_severity['title'], - 'has_fix': 'FixedIn' in new_vuln, - }, - } - - # TODO(jzelinskie): remove when more endpoints have been converted to using interfaces - repository = AttrDict({ - 'namespace_name': repository_map[repository_id].namespace_user.username, - 'name': repository_map[repository_id].name, - }) - spawn_notification(repository, 'vulnerability_found', event_data) - - return ProcessNotificationPageResult.FINISHED_PAGE - + self.tag_map[tag.repository_id].add(tag.name) + self.repository_map[tag.repository_id] = tag.repository diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index 7717048df..11e15f295 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -6,7 +6,7 @@ import features from app import secscan_notification_queue, secscan_api from workers.queueworker import QueueWorker, JobException -from util.secscan.notifier import process_notification_data +from util.secscan.notifier import SecurityNotificationHandler, ProcessNotificationPageResult logger = logging.getLogger(__name__) @@ -28,11 +28,15 @@ class SecurityNotificationWorker(QueueWorker): notification_name = data['Name'] current_page = data.get('page', None) + handler = SecurityNotificationHandler(layer_limit) while True: + # Retrieve the current page of notification data from the security scanner. (response_data, should_retry) = secscan_api.get_notification(notification_name, layer_limit=layer_limit, page=current_page) + + # If no response, something went wrong. if response_data is None: if should_retry: raise JobException() @@ -44,25 +48,34 @@ class SecurityNotificationWorker(QueueWorker): # Return to mark the job as "complete", as we'll never be able to finish it. return False + # Extend processing on the queue item so it doesn't expire while we're working. self.extend_processing(_PROCESSING_SECONDS, json.dumps(data)) - notification_data = response_data['Notification'] - if not process_notification_data(notification_data): - raise JobException() - # Check for a next page of results. If none, we're done. - if 'NextPage' not in notification_data: - # Mark the notification as read and processed. + # Process the notification data. + notification_data = response_data['Notification'] + result = handler.process_notification_page_data(notification_data) + + # Possible states after processing: failed to process, finished processing entirely + # or finished processing the page. + if result == ProcessNotificationPageResult.FAILED: + # Something went wrong. + raise JobException + + if result == ProcessNotificationPageResult.FINISHED_PROCESSING: + # Mark the notification as read. if not secscan_api.mark_notification_read(notification_name): # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to mark notification %s as read', notification_name) return False + # Send the generated Quay notifications. + handler.send_notifications() return True - # Otherwise, save the next page token into the queue item (so we can pick up from here if - # something goes wrong in the next loop iteration), and continue. - current_page = notification_data['NextPage'] - data['page'] = current_page + if result == ProcessNotificationPageResult.FINISHED_PAGE: + # Continue onto the next page. + current_page = notification_data['NextPage'] + continue if __name__ == '__main__':