From f498e92d5822b7ecb7ef426c77f04e08139371f4 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 25 Feb 2016 15:58:42 -0500 Subject: [PATCH] Implement against new Clair paginated notification system --- config.py | 2 +- data/queue.py | 12 +- endpoints/secscan.py | 25 +-- test/test_api_usage.py | 4 +- test/test_secscan.py | 204 +++++++++++++++++++++++- util/secscan/__init__.py | 14 +- util/secscan/api.py | 92 +++++++++-- util/secscan/notifier.py | 103 ++++++++++++ workers/queueworker.py | 5 +- workers/security_notification_worker.py | 87 +++------- 10 files changed, 447 insertions(+), 101 deletions(-) create mode 100644 util/secscan/notifier.py diff --git a/config.py b/config.py index b3d230138..3e0b391f2 100644 --- a/config.py +++ b/config.py @@ -129,7 +129,7 @@ class DefaultConfig(object): NOTIFICATION_QUEUE_NAME = 'notification' DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' REPLICATION_QUEUE_NAME = 'imagestoragereplication' - SECSCAN_NOTIFICATION_QUEUE_NAME = 'secscan_notification' + SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification' # Super user config. Note: This MUST BE an empty list for the default config. SUPER_USERS = [] diff --git a/data/queue.py b/data/queue.py index 7503e5764..5b4408a4c 100644 --- a/data/queue.py +++ b/data/queue.py @@ -228,16 +228,26 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION): + def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION, + updated_data=None): with self._transaction_factory(db): try: queue_item = self._item_by_id_for_update(item.id) new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) + has_change = False # Only actually write the new expiration to the db if it moves the expiration some minimum if new_expiration - queue_item.processing_expires > minimum_extension: queue_item.processing_expires = new_expiration + has_change = True + + if updated_data is not None: + queue_item.body = updated_data + has_change = True + + if has_change: queue_item.save() + except QueueItem.DoesNotExist: return diff --git a/endpoints/secscan.py b/endpoints/secscan.py index 4326ea621..cd842f88e 100644 --- a/endpoints/secscan.py +++ b/endpoints/secscan.py @@ -3,23 +3,30 @@ import json import features -from app import secscan_notification_queue -from flask import request, make_response, Blueprint +from app import secscan_notification_queue, secscan_api +from flask import request, make_response, Blueprint, abort from endpoints.common import route_show_if logger = logging.getLogger(__name__) secscan = Blueprint('secscan', __name__) @route_show_if(features.SECURITY_SCANNER) -@secscan.route('/notification', methods=['POST']) +@secscan.route('/notify', methods=['POST']) def secscan_notification(): data = request.get_json() - logger.debug('Got notification from Clair: %s', data) + logger.debug('Got notification from Security Scanner: %s', data) + if 'Notification' not in data: + abort(400) - content = data['Content'] - layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) - if not layer_ids: - return make_response('Okay') + notification = data['Notification'] + + # Queue the notification to be processed. + item_id = secscan_notification_queue.put(['named', notification['Name']], + json.dumps(notification)) + + # Mark the notification as read. + if not secscan_api.mark_notification_read(notification['Name']): + secscan_notification_queue.cancel(item_id) + abort(400) - secscan_notification_queue.put(['notification', data['Name']], json.dumps(data)) return make_response('Okay') diff --git a/test/test_api_usage.py b/test/test_api_usage.py index c46a2f8ee..cd700fc1c 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -3455,10 +3455,10 @@ def get_layer_success_mock(url, request): } ] - if not request.url.endswith('?vulnerabilities'): + if not request.url.index('vulnerabilities') > 0: vulnerabilities = [] - if not request.url.endswith('?features'): + if not request.url.index('features') > 0: features = [] return py_json.dumps({ diff --git a/test/test_secscan.py b/test/test_secscan.py index 9afc9ee21..e4783e7a9 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -1,11 +1,13 @@ import unittest import json +import os from httmock import urlmatch, all_requests, HTTMock from app import app, config_provider, storage, notification_queue from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer +from util.secscan.notifier import process_notification_data from data import model @@ -69,10 +71,10 @@ def get_layer_success_mock(url, request): } ] - if not request.url.endswith('?vulnerabilities'): + if not request.url.find('vulnerabilities') > 0: vulnerabilities = [] - if not request.url.endswith('?features'): + if not request.url.find('features') > 0: features = [] return json.dumps({ @@ -97,7 +99,8 @@ class TestSecurityScanner(unittest.TestCase): storage.put_content(['local_us'], 'supports_direct_download', 'true') # Setup the database with fake storage. - setup_database_for_testing(self, with_storage=True, force_rebuild=True) + force_rebuild = os.environ.get('SKIP_REBUILD') != 'true' + setup_database_for_testing(self, with_storage=True, force_rebuild=force_rebuild) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() @@ -238,5 +241,200 @@ class TestSecurityScanner(unittest.TestCase): self.assertTrue(body['event_data']['vulnerability']['has_fix']) + def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'): + return { + "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", + "Created": "1456247389", + "Notified": "1456246708", + "Limit": 2, + "New": { + "Vulnerability": { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "New CVE", + "Severity": new_severity, + "FixedIn": [ + { + "Name": "grep", + "Namespace": "debian:8", + "Version": "2.25" + } + ] + }, + "LayersIntroducingVulnerability": new_layer_ids, + }, + "Old": { + "Vulnerability": { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "New CVE", + "Severity": "Low", + "FixedIn": [] + }, + "LayersIntroducingVulnerability": old_layer_ids, + } + } + + + def test_notification_new_layers_not_vulnerable(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') + def get_matching_layer_not_vulnerable(url, request): + return json.dumps({ + "Layer": { + "Name": layer_id, + "Namespace": "debian:8", + "IndexedByVersion": 1, + "Features": [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": [], # Report not vulnerable. + } + ] + } + }) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + # Fire off the notification processing. + with HTTMock(get_matching_layer_not_vulnerable, response_content): + notification_data = self._get_notification_data([layer_id], []) + self.assertTrue(process_notification_data(notification_data)) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + + def test_notification_new_layers(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') + def get_matching_layer_vulnerable(url, request): + return json.dumps({ + "Layer": { + "Name": layer_id, + "Namespace": "debian:8", + "IndexedByVersion": 1, + "Features": [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": [ + { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Severity": "Low", + } + ], + } + ] + } + }) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + # Fire off the notification processing. + with HTTMock(get_matching_layer_vulnerable, response_content): + notification_data = self._get_notification_data([layer_id], []) + self.assertTrue(process_notification_data(notification_data)) + + # Ensure an event was written for the tag. + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) + + body = json.loads(queue_item.body) + self.assertEquals(['prod', 'latest'], body['event_data']['tags']) + self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) + self.assertEquals('Low', body['event_data']['vulnerability']['priority']) + self.assertTrue(body['event_data']['vulnerability']['has_fix']) + + + def test_notification_no_new_layers(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + # Fire off the notification processing. + with HTTMock(response_content): + notification_data = self._get_notification_data([layer_id], [layer_id]) + self.assertTrue(process_notification_data(notification_data)) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + + def test_notification_no_new_layers_increased_severity(self): + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Add a repo event for the layer. + repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') + def get_matching_layer_vulnerable(url, request): + return json.dumps({ + "Layer": { + "Name": layer_id, + "Namespace": "debian:8", + "IndexedByVersion": 1, + "Features": [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": [ + { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Severity": "Low", + } + ], + } + ] + } + }) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) + + # Fire off the notification processing. + with HTTMock(get_matching_layer_vulnerable, response_content): + notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High') + self.assertTrue(process_notification_data(notification_data)) + + # Ensure an event was written for the tag. + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) + + body = json.loads(queue_item.body) + self.assertEquals(['prod', 'latest'], body['event_data']['tags']) + self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) + self.assertEquals('High', body['event_data']['vulnerability']['priority']) + self.assertTrue(body['event_data']['vulnerability']['has_fix']) + + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/util/secscan/__init__.py b/util/secscan/__init__.py index d09cadd73..1588e2a16 100644 --- a/util/secscan/__init__.py +++ b/util/secscan/__init__.py @@ -3,7 +3,7 @@ PRIORITY_LEVELS = { 'Unknown': { 'title': 'Unknown', - 'index': '6', + 'index': 6, 'level': 'info', 'description': 'Unknown is either a security problem that has not been assigned ' + @@ -13,7 +13,7 @@ PRIORITY_LEVELS = { 'Negligible': { 'title': 'Negligible', - 'index': '5', + 'index': 5, 'level': 'info', 'description': 'Negligible is technically a security problem, but is only theoretical ' + @@ -24,7 +24,7 @@ PRIORITY_LEVELS = { 'Low': { 'title': 'Low', - 'index': '4', + 'index': 4, 'level': 'warning', 'description': 'Low is a security problem, but is hard to exploit due to environment, ' + @@ -36,7 +36,7 @@ PRIORITY_LEVELS = { 'Medium': { 'title': 'Medium', 'value': 'Medium', - 'index': '3', + 'index': 3, 'level': 'warning', 'description': 'Medium is a real security problem, and is exploitable for many people. ' + @@ -48,7 +48,7 @@ PRIORITY_LEVELS = { 'High': { 'title': 'High', 'value': 'High', - 'index': '2', + 'index': 2, 'level': 'warning', 'description': 'High is a real problem, exploitable for many people in a default installation. ' + @@ -60,7 +60,7 @@ PRIORITY_LEVELS = { 'Critical': { 'title': 'Critical', 'value': 'Critical', - 'index': '1', + 'index': 1, 'level': 'error', 'description': 'Critical is a world-burning problem, exploitable for nearly all people in ' + @@ -72,7 +72,7 @@ PRIORITY_LEVELS = { 'Defcon1': { 'title': 'Defcon 1', 'value': 'Defcon1', - 'index': '0', + 'index': 0, 'level': 'error', 'description': 'Defcon1 is a Critical problem which has been manually highlighted ' + diff --git a/util/secscan/api.py b/util/secscan/api.py index fd4d369fe..4fc652b06 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -19,8 +19,8 @@ class APIRequestFailure(Exception): _API_METHOD_INSERT = 'layers' _API_METHOD_GET_LAYER = 'layers/%s' -_API_METHOD_GET_WITH_VULNERABILITIES_FLAG = '?vulnerabilities' -_API_METHOD_GET_WITH_FEATURES_FLAG = '?features' +_API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s' +_API_METHOD_GET_NOTIFICATION = 'notifications/%s' class SecurityScannerAPI(object): @@ -113,7 +113,7 @@ class SecurityScannerAPI(object): logger.info('Analyzing layer %s', request['Layer']['Name']) try: - response = self._call(_API_METHOD_INSERT, request) + response = self._call('POST', _API_METHOD_INSERT, request) json_response = response.json() except requests.exceptions.Timeout: logger.exception('Timeout when trying to post layer data response for %s', layer.id) @@ -146,35 +146,94 @@ class SecurityScannerAPI(object): return api_version, False + def check_layer_vulnerable(self, layer_id, cve_name): + """ Checks to see if the layer with the given ID is vulnerable to the specified CVE. """ + layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True) + if layer_data is None or 'Layer' not in layer_data or 'Features' not in layer_data['Layer']: + return False + + for feature in layer_data['Layer']['Features']: + for vuln in feature.get('Vulnerabilities', []): + if vuln['Name'] == cve_name: + return True + + return False + + + def get_notification(self, notification_name, layer_limit=10, page=None): + """ Gets the data for a specific notification, with optional page token. + Returns a tuple of the data (None on failure) and whether to retry. + """ + try: + params = { + 'limit': layer_limit + } + + if page is not None: + params['page'] = page + + response = self._call('GET', _API_METHOD_GET_NOTIFICATION % notification_name, params=params) + json_response = response.json() + except requests.exceptions.Timeout: + logger.exception('Timeout when trying to get notification for %s', notification_name) + return None, True + except requests.exceptions.ConnectionError: + logger.exception('Connection error when trying to get notification for %s', notification_name) + return None, True + except (requests.exceptions.RequestException, ValueError): + logger.exception('Failed to get notification for %s', notification_name) + return None, False + + if response.status_code != 200: + return None, response.status_code != 404 and response.status_code != 400 + + return json_response, False + + + def mark_notification_read(self, notification_name): + """ Marks a security scanner notification as read. """ + try: + response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name) + return response.status_code == 200 + except requests.exceptions.RequestException: + logger.exception('Failed to mark notification as read: %s', notification_name) + return False + + def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False): """ Returns the layer data for the specified layer. On error, returns None. """ layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + return self._get_layer_data(layer_id, include_features, include_vulnerabilities) + + + def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False): try: - flag = '' + params = {} if include_features: - flag = _API_METHOD_GET_WITH_FEATURES_FLAG + params = {'features': True} if include_vulnerabilities: - flag = _API_METHOD_GET_WITH_VULNERABILITIES_FLAG + params = {'vulnerabilities': True} - response = self._call(_API_METHOD_GET_LAYER + flag, None, layer_id) + response = self._call('GET', _API_METHOD_GET_LAYER % layer_id, params=params) logger.debug('Got response %s for vulnerabilities for layer %s', response.status_code, layer_id) + json_response = response.json() except requests.exceptions.Timeout: raise APIRequestFailure('API call timed out') except requests.exceptions.ConnectionError: raise APIRequestFailure('Could not connect to security service') except (requests.exceptions.RequestException, ValueError): - logger.exception('Failed to get layer data response for %s', layer.id) + logger.exception('Failed to get layer data response for %s', layer_id) raise APIRequestFailure() if response.status_code == 404: return None - return response.json() + return json_response - def _call(self, relative_url, body=None, *args, **kwargs): + def _call(self, method, relative_url, params=None, body=None): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response from the API server. @@ -184,18 +243,21 @@ class SecurityScannerAPI(object): raise Exception('Cannot call unconfigured security system') api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' - url = urljoin(api_url, relative_url % args) + url = urljoin(api_url, relative_url) client = self.config['HTTPCLIENT'] timeout = security_config.get('API_TIMEOUT_SECONDS', 1) - logger.debug('Looking up sec information: %s', url) with CloseForLongOperation(self.config): - if body is not None: + if method == 'POST': logger.debug('POSTing security URL %s', url) - return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self._keys, + return client.post(url, json=body, params=params, timeout=timeout, cert=self._keys, verify=self._certificate) + elif method == 'DELETE': + logger.debug('DELETEing security URL %s', url) + return client.delete(url, params=params, timeout=timeout, cert=self._keys, + verify=self._certificate) else: logger.debug('GETing security URL %s', url) - return client.get(url, params=kwargs, timeout=timeout, cert=self._keys, + return client.get(url, params=params, timeout=timeout, cert=self._keys, verify=self._certificate) diff --git a/util/secscan/notifier.py b/util/secscan/notifier.py new file mode 100644 index 000000000..a6df8ed9a --- /dev/null +++ b/util/secscan/notifier.py @@ -0,0 +1,103 @@ +import logging +import sys + +from collections import defaultdict + +from app import secscan_api +from data.model.tag import filter_tags_have_repository_event, get_matching_tags +from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository, + RepositoryTag) +from endpoints.notificationhelper import spawn_notification +from util.secscan import PRIORITY_LEVELS +from util.secscan.api import APIRequestFailure + +logger = logging.getLogger(__name__) + + +def process_notification_data(notification_data): + """ Processes the given notification data to spawn vulnerability notifications as necessary. + Returns whether the processing succeeded. + """ + new_data = notification_data['New'] + old_data = notification_data.get('Old', {}) + + new_vuln = new_data['Vulnerability'] + old_vuln = old_data.get('Vulnerability', {}) + + new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', [])) + old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', [])) + + new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) + old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) + + # By default we only notify the new layers that are affected by the vulnerability. If, however, + # the severity of the vulnerability has increased, we need to notify *all* layers, as we might + # need to send new notifications for older layers. + notify_layers = new_layer_ids - old_layer_ids + if new_severity['index'] < old_severity['index']: + notify_layers = new_layer_ids | old_layer_ids + + if not notify_layers: + # Nothing more to do. + return True + + # Lookup the external event for when we have vulnerabilities. + event = ExternalNotificationEvent.get(name='vulnerability_found') + + # For each layer, retrieving the matching tags and join with repository to determine which + # require new notifications. + tag_map = defaultdict(set) + repository_map = {} + cve_id = new_vuln['Name'] + + # Find all tags that contain the layer(s) introducing the vulnerability, + # in repositories that have the event setup. + for layer_id in notify_layers: + # Split the layer ID into its Docker Image ID and storage ID. + (docker_image_id, storage_uuid) = layer_id.split('.', 2) + + # Find the matching tags. + matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, + Image, ImageStorage) + tags = list(filter_tags_have_repository_event(matching, event)) + + check_map = {} + for tag in tags: + # Verify that the tag's root image has the vulnerability. + tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) + logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) + + if not tag_layer_id in check_map: + try: + is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) + except APIRequestFailure: + return False + + check_map[tag_layer_id] = is_vulerable + + logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, + check_map[tag_layer_id]) + + if check_map[tag_layer_id]: + # Add the vulnerable tag to the list. + tag_map[tag.repository_id].add(tag.name) + repository_map[tag.repository_id] = tag.repository + + # For each of the tags found, issue a notification. + for repository_id in tag_map: + tags = tag_map[repository_id] + event_data = { + 'tags': list(tags), + 'vulnerability': { + 'id': cve_id, + 'description': new_vuln.get('Description', None), + 'link': new_vuln.get('Link', None), + 'priority': new_severity['title'], + 'has_fix': 'FixedIn' in new_vuln, + }, + } + + spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) + + return True + diff --git a/workers/queueworker.py b/workers/queueworker.py index 2895e31e7..7cc223a22 100644 --- a/workers/queueworker.py +++ b/workers/queueworker.py @@ -56,10 +56,11 @@ class QueueWorker(Worker): logger.debug('Disconnecting from database.') db.close() - def extend_processing(self, seconds_from_now): + def extend_processing(self, seconds_from_now, updated_data=None): with self._current_item_lock: if self.current_queue_item is not None: - self._queue.extend_processing(self.current_queue_item, seconds_from_now) + self._queue.extend_processing(self.current_queue_item, seconds_from_now, + updated_data=updated_data) def run_watchdog(self): logger.debug('Running watchdog.') diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index 603633343..a5e6d0480 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -1,80 +1,45 @@ -import json import logging import time - -from collections import defaultdict +import json import features from app import secscan_notification_queue, secscan_api -from data import model -from data.model.tag import filter_tags_have_repository_event, get_matching_tags -from data.database import (Image, ImageStorage, ExternalNotificationEvent, - Repository, RepositoryNotification, RepositoryTag) -from endpoints.notificationhelper import spawn_notification -from workers.queueworker import QueueWorker - +from workers.queueworker import QueueWorker, JobException +from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) +_EXTENDED_SECONDS = 600 class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): - cve_id = data['Name'] - vulnerability = data['Content']['Vulnerability'] - priority = vulnerability['Priority'] + notification_name = data['Name'] + current_page = data.get('page', None) - # Lookup the external event for when we have vulnerabilities. - event = ExternalNotificationEvent.get(name='vulnerability_found') + while True: + (response_data, should_retry) = secscan_api.get_notification(notification_name) + if response_data is None: + if should_retry: + raise JobException() + else: + # Return to mark the job as "complete", as we'll never be able to finish it. + logger.error('Failed to handle security notification %s', notification_name) + return - # For each layer, retrieving the matching tags and join with repository to determine which - # require new notifications. - tag_map = defaultdict(set) - repository_map = {} + notification_data = response_data['Notification'] + if not process_notification_data(notification_data): + raise JobException() - # Find all tags that contain the layer(s) introducing the vulnerability, - # in repositories that have the event setup. - content = data['Content'] - layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) - for layer_id in layer_ids: - (docker_image_id, storage_uuid) = layer_id.split('.', 2) + # Check for a next page of results. If none, we're done. + if 'NextPage' not in notification_data: + return - matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, - Image, ImageStorage) - tags = list(filter_tags_have_repository_event(matching, event)) - - check_map = {} - for tag in tags: - # Verify that the tag's root image has the vulnerability. - tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) - logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) - - if not tag_layer_id in check_map: - is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) - check_map[tag_layer_id] = is_vulerable - - logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, - check_map[tag_layer_id]) - - if check_map[tag_layer_id]: - # Add the vulnerable tag to the list. - tag_map[tag.repository_id].add(tag.name) - repository_map[tag.repository_id] = tag.repository - - # For each of the tags found, issue a notification. - for repository_id in tag_map: - tags = tag_map[repository_id] - event_data = { - 'tags': list(tags), - 'vulnerability': { - 'id': data['Name'], - 'description': vulnerability['Description'], - 'link': vulnerability['Link'], - 'priority': priority, - }, - } - - spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) + # Otherwise, save the next page token into the queue item (so we can pick up from here if + # something goes wrong in the next loop iteration), and continue. + current_page = notification_data['NextPage'] + data['page'] = current_page + self.extend_processing(_EXTENDED_SECONDS, json.dumps(data)) if __name__ == '__main__':