From aa5587c93cd923fffde8aac1bff198c7da65943a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 18 Mar 2016 20:28:06 -0400 Subject: [PATCH] Fixes and added tests for the security notification worker Fixes #1301 - Ensures that the worker uses pagination properly - Ensures that the worker handles failure as expected - Moves marking the notification as read to after the worker processes it - Increases the number of layers requested to 100 --- endpoints/secscan.py | 11 +---- test/test_secscan.py | 60 +++++++++++++++++++++++++ util/secscan/api.py | 4 +- workers/security_notification_worker.py | 15 +++++-- 4 files changed, 75 insertions(+), 15 deletions(-) diff --git a/endpoints/secscan.py b/endpoints/secscan.py index cd842f88e..9151026a4 100644 --- a/endpoints/secscan.py +++ b/endpoints/secscan.py @@ -19,14 +19,5 @@ def secscan_notification(): abort(400) notification = data['Notification'] - - # Queue the notification to be processed. - item_id = secscan_notification_queue.put(['named', notification['Name']], - json.dumps(notification)) - - # Mark the notification as read. - if not secscan_api.mark_notification_read(notification['Name']): - secscan_notification_queue.cancel(item_id) - abort(400) - + secscan_notification_queue.put(['named', notification['Name']], json.dumps(notification)) return make_response('Okay') diff --git a/test/test_secscan.py b/test/test_secscan.py index 78c3c9ca3..f5be0797f 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -10,10 +10,12 @@ from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer from util.secscan.notifier import process_notification_data from data import model +from workers.security_notification_worker import SecurityNotificationWorker ADMIN_ACCESS_USER = 'devtable' SIMPLE_REPO = 'simple' +COMPLEX_REPO = 'complex' _PORT_NUMBER = 5001 @@ -462,5 +464,63 @@ class TestSecurityScanner(unittest.TestCase): notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0}) self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) + + def test_notification_worker(self): + pages_called = [] + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') + def delete_notification(url, request): + pages_called.append('DELETE') + return {'status_code': 201, 'content': ''} + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') + def get_notification(url, request): + if url.query.find('page=nextpage') >= 0: + pages_called.append('GET-2') + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + data = { + 'Notification': self._get_notification_data([layer_id], [layer_id]), + } + + return json.dumps(data) + else: + pages_called.append('GET-1') + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + notification_data = self._get_notification_data([layer_id], [layer_id]) + notification_data['NextPage'] = 'nextpage' + + data = { + 'Notification': notification_data, + } + + return json.dumps(data) + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)') + def unknown_notification(url, request): + return {'status_code': 404, 'content': 'Unknown notification'} + + # Test with an unknown notification. + with HTTMock(get_notification, unknown_notification): + worker = SecurityNotificationWorker(None) + self.assertFalse(worker.process_queue_item({ + 'Name': 'unknownnotification' + })) + + # Test with a known notification with pages. + data = { + 'Name': 'somenotification' + } + + with HTTMock(get_notification, delete_notification, unknown_notification): + worker = SecurityNotificationWorker(None) + self.assertTrue(worker.process_queue_item(data)) + + self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called) + + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/util/secscan/api.py b/util/secscan/api.py index 5755134e1..65ff8b37a 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -160,7 +160,7 @@ class SecurityScannerAPI(object): return False - def get_notification(self, notification_name, layer_limit=10, page=None): + def get_notification(self, notification_name, layer_limit=100, page=None): """ Gets the data for a specific notification, with optional page token. Returns a tuple of the data (None on failure) and whether to retry. """ @@ -194,7 +194,7 @@ class SecurityScannerAPI(object): """ Marks a security scanner notification as read. """ try: response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name) - return response.status_code == 200 + return response.status_code / 100 == 2 except requests.exceptions.RequestException: logger.exception('Failed to mark notification as read: %s', notification_name) return False diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index a31075d50..03ac5da9a 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -11,6 +11,7 @@ from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) _EXTENDED_SECONDS = 600 +_LAYER_LIMIT = 100 # The number of layers to request on each page. class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): @@ -18,14 +19,16 @@ class SecurityNotificationWorker(QueueWorker): current_page = data.get('page', None) while True: - (response_data, should_retry) = secscan_api.get_notification(notification_name) + (response_data, should_retry) = secscan_api.get_notification(notification_name, + layer_limit=_LAYER_LIMIT, + page=current_page) if response_data is None: if should_retry: raise JobException() else: # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to handle security notification %s', notification_name) - return + return False notification_data = response_data['Notification'] if not process_notification_data(notification_data): @@ -33,7 +36,13 @@ class SecurityNotificationWorker(QueueWorker): # Check for a next page of results. If none, we're done. if 'NextPage' not in notification_data: - return + # Mark the notification as read and processed. + if not secscan_api.mark_notification_read(notification_name): + # Return to mark the job as "complete", as we'll never be able to finish it. + logger.error('Failed to mark notification %s as read', notification_name) + return False + + return True # Otherwise, save the next page token into the queue item (so we can pick up from here if # something goes wrong in the next loop iteration), and continue.