Merge pull request #1260 from coreos-inc/new_clair_pagination
Implement against new Clair paginated notification system
This commit is contained in:
		
						commit
						1b7d741e30
					
				
					 10 changed files with 447 additions and 101 deletions
				
			
		|  | @ -129,7 +129,7 @@ class DefaultConfig(object): | |||
|   NOTIFICATION_QUEUE_NAME = 'notification' | ||||
|   DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' | ||||
|   REPLICATION_QUEUE_NAME = 'imagestoragereplication' | ||||
|   SECSCAN_NOTIFICATION_QUEUE_NAME = 'secscan_notification' | ||||
|   SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification' | ||||
| 
 | ||||
|   # Super user config. Note: This MUST BE an empty list for the default config. | ||||
|   SUPER_USERS = [] | ||||
|  |  | |||
|  | @ -228,16 +228,26 @@ class WorkQueue(object): | |||
|       except QueueItem.DoesNotExist: | ||||
|         return False | ||||
| 
 | ||||
|   def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION): | ||||
|   def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION, | ||||
|                         updated_data=None): | ||||
|     with self._transaction_factory(db): | ||||
|       try: | ||||
|         queue_item = self._item_by_id_for_update(item.id) | ||||
|         new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) | ||||
|         has_change = False | ||||
| 
 | ||||
|         # Only actually write the new expiration to the db if it moves the expiration some minimum | ||||
|         if new_expiration - queue_item.processing_expires > minimum_extension: | ||||
|           queue_item.processing_expires = new_expiration | ||||
|           has_change = True | ||||
| 
 | ||||
|         if updated_data is not None: | ||||
|           queue_item.body = updated_data | ||||
|           has_change = True | ||||
| 
 | ||||
|         if has_change: | ||||
|           queue_item.save() | ||||
| 
 | ||||
|       except QueueItem.DoesNotExist: | ||||
|         return | ||||
| 
 | ||||
|  |  | |||
|  | @ -3,23 +3,30 @@ import json | |||
| 
 | ||||
| import features | ||||
| 
 | ||||
| from app import secscan_notification_queue | ||||
| from flask import request, make_response, Blueprint | ||||
| from app import secscan_notification_queue, secscan_api | ||||
| from flask import request, make_response, Blueprint, abort | ||||
| from endpoints.common import route_show_if | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| secscan = Blueprint('secscan', __name__) | ||||
| 
 | ||||
| @route_show_if(features.SECURITY_SCANNER) | ||||
| @secscan.route('/notification', methods=['POST']) | ||||
| @secscan.route('/notify', methods=['POST']) | ||||
| def secscan_notification(): | ||||
|   data = request.get_json() | ||||
|   logger.debug('Got notification from Clair: %s', data) | ||||
|   logger.debug('Got notification from Security Scanner: %s', data) | ||||
|   if 'Notification' not in data: | ||||
|     abort(400) | ||||
| 
 | ||||
|   content = data['Content'] | ||||
|   layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) | ||||
|   if not layer_ids: | ||||
|     return make_response('Okay') | ||||
|   notification = data['Notification'] | ||||
| 
 | ||||
|   # Queue the notification to be processed. | ||||
|   item_id = secscan_notification_queue.put(['named', notification['Name']], | ||||
|                                            json.dumps(notification)) | ||||
| 
 | ||||
|   # Mark the notification as read. | ||||
|   if not secscan_api.mark_notification_read(notification['Name']): | ||||
|     secscan_notification_queue.cancel(item_id) | ||||
|     abort(400) | ||||
| 
 | ||||
|   secscan_notification_queue.put(['notification', data['Name']], json.dumps(data)) | ||||
|   return make_response('Okay') | ||||
|  |  | |||
|  | @ -3455,10 +3455,10 @@ def get_layer_success_mock(url, request): | |||
|     } | ||||
|   ] | ||||
| 
 | ||||
|   if not request.url.endswith('?vulnerabilities'): | ||||
|   if not request.url.index('vulnerabilities') > 0: | ||||
|     vulnerabilities = [] | ||||
| 
 | ||||
|     if not request.url.endswith('?features'): | ||||
|     if not request.url.index('features') > 0: | ||||
|       features = [] | ||||
| 
 | ||||
|   return py_json.dumps({ | ||||
|  |  | |||
|  | @ -1,11 +1,13 @@ | |||
| import unittest | ||||
| import json | ||||
| import os | ||||
| from httmock import urlmatch, all_requests, HTTMock | ||||
| 
 | ||||
| from app import app, config_provider, storage, notification_queue | ||||
| from initdb import setup_database_for_testing, finished_database_for_testing | ||||
| from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException | ||||
| from util.secscan.analyzer import LayerAnalyzer | ||||
| from util.secscan.notifier import process_notification_data | ||||
| from data import model | ||||
| 
 | ||||
| 
 | ||||
|  | @ -69,10 +71,10 @@ def get_layer_success_mock(url, request): | |||
|     } | ||||
|   ] | ||||
| 
 | ||||
|   if not request.url.endswith('?vulnerabilities'): | ||||
|   if not request.url.find('vulnerabilities') > 0: | ||||
|     vulnerabilities = [] | ||||
| 
 | ||||
|     if not request.url.endswith('?features'): | ||||
|     if not request.url.find('features') > 0: | ||||
|       features = [] | ||||
| 
 | ||||
|   return json.dumps({ | ||||
|  | @ -97,7 +99,8 @@ class TestSecurityScanner(unittest.TestCase): | |||
|     storage.put_content(['local_us'], 'supports_direct_download', 'true') | ||||
| 
 | ||||
|     # Setup the database with fake storage. | ||||
|     setup_database_for_testing(self, with_storage=True, force_rebuild=True) | ||||
|     force_rebuild = os.environ.get('SKIP_REBUILD') != 'true' | ||||
|     setup_database_for_testing(self, with_storage=True, force_rebuild=force_rebuild) | ||||
|     self.app = app.test_client() | ||||
|     self.ctx = app.test_request_context() | ||||
|     self.ctx.__enter__() | ||||
|  | @ -238,5 +241,200 @@ class TestSecurityScanner(unittest.TestCase): | |||
|     self.assertTrue(body['event_data']['vulnerability']['has_fix']) | ||||
| 
 | ||||
| 
 | ||||
|   def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'): | ||||
|     return { | ||||
|       "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", | ||||
|       "Created": "1456247389", | ||||
|       "Notified": "1456246708", | ||||
|       "Limit": 2, | ||||
|       "New": { | ||||
|         "Vulnerability": { | ||||
|           "Name": "CVE-TEST", | ||||
|           "Namespace": "debian:8", | ||||
|           "Description": "New CVE", | ||||
|           "Severity": new_severity, | ||||
|           "FixedIn": [ | ||||
|             { | ||||
|               "Name": "grep", | ||||
|               "Namespace": "debian:8", | ||||
|               "Version": "2.25" | ||||
|             } | ||||
|           ] | ||||
|         }, | ||||
|         "LayersIntroducingVulnerability": new_layer_ids, | ||||
|       }, | ||||
|       "Old": { | ||||
|         "Vulnerability": { | ||||
|           "Name": "CVE-TEST", | ||||
|           "Namespace": "debian:8", | ||||
|           "Description": "New CVE", | ||||
|           "Severity": "Low", | ||||
|           "FixedIn": [] | ||||
|         }, | ||||
|         "LayersIntroducingVulnerability": old_layer_ids, | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
|   def test_notification_new_layers_not_vulnerable(self): | ||||
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') | ||||
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) | ||||
| 
 | ||||
|     # Add a repo event for the layer. | ||||
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) | ||||
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) | ||||
| 
 | ||||
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') | ||||
|     def get_matching_layer_not_vulnerable(url, request): | ||||
|       return json.dumps({ | ||||
|         "Layer": { | ||||
|           "Name": layer_id, | ||||
|           "Namespace": "debian:8", | ||||
|           "IndexedByVersion": 1, | ||||
|           "Features": [ | ||||
|             { | ||||
|               "Name": "coreutils", | ||||
|               "Namespace": "debian:8", | ||||
|               "Version": "8.23-4", | ||||
|               "Vulnerabilities": [], # Report not vulnerable. | ||||
|             } | ||||
|           ] | ||||
|         } | ||||
|       }) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
|     # Fire off the notification processing. | ||||
|     with HTTMock(get_matching_layer_not_vulnerable, response_content): | ||||
|       notification_data = self._get_notification_data([layer_id], []) | ||||
|       self.assertTrue(process_notification_data(notification_data)) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
| 
 | ||||
|   def test_notification_new_layers(self): | ||||
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') | ||||
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) | ||||
| 
 | ||||
|     # Add a repo event for the layer. | ||||
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) | ||||
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) | ||||
| 
 | ||||
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') | ||||
|     def get_matching_layer_vulnerable(url, request): | ||||
|       return json.dumps({ | ||||
|         "Layer": { | ||||
|           "Name": layer_id, | ||||
|           "Namespace": "debian:8", | ||||
|           "IndexedByVersion": 1, | ||||
|           "Features": [ | ||||
|             { | ||||
|               "Name": "coreutils", | ||||
|               "Namespace": "debian:8", | ||||
|               "Version": "8.23-4", | ||||
|               "Vulnerabilities": [ | ||||
|                 { | ||||
|                   "Name": "CVE-TEST", | ||||
|                   "Namespace": "debian:8", | ||||
|                   "Severity": "Low", | ||||
|                 } | ||||
|               ], | ||||
|             } | ||||
|           ] | ||||
|         } | ||||
|       }) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
|     # Fire off the notification processing. | ||||
|     with HTTMock(get_matching_layer_vulnerable, response_content): | ||||
|       notification_data = self._get_notification_data([layer_id], []) | ||||
|       self.assertTrue(process_notification_data(notification_data)) | ||||
| 
 | ||||
|     # Ensure an event was written for the tag. | ||||
|     queue_item = notification_queue.get() | ||||
|     self.assertIsNotNone(queue_item) | ||||
| 
 | ||||
|     body = json.loads(queue_item.body) | ||||
|     self.assertEquals(['prod', 'latest'], body['event_data']['tags']) | ||||
|     self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) | ||||
|     self.assertEquals('Low', body['event_data']['vulnerability']['priority']) | ||||
|     self.assertTrue(body['event_data']['vulnerability']['has_fix']) | ||||
| 
 | ||||
| 
 | ||||
|   def test_notification_no_new_layers(self): | ||||
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') | ||||
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) | ||||
| 
 | ||||
|     # Add a repo event for the layer. | ||||
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) | ||||
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
|     # Fire off the notification processing. | ||||
|     with HTTMock(response_content): | ||||
|       notification_data = self._get_notification_data([layer_id], [layer_id]) | ||||
|       self.assertTrue(process_notification_data(notification_data)) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
| 
 | ||||
|   def test_notification_no_new_layers_increased_severity(self): | ||||
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') | ||||
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) | ||||
| 
 | ||||
|     # Add a repo event for the layer. | ||||
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) | ||||
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) | ||||
| 
 | ||||
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') | ||||
|     def get_matching_layer_vulnerable(url, request): | ||||
|       return json.dumps({ | ||||
|         "Layer": { | ||||
|           "Name": layer_id, | ||||
|           "Namespace": "debian:8", | ||||
|           "IndexedByVersion": 1, | ||||
|           "Features": [ | ||||
|             { | ||||
|               "Name": "coreutils", | ||||
|               "Namespace": "debian:8", | ||||
|               "Version": "8.23-4", | ||||
|               "Vulnerabilities": [ | ||||
|                 { | ||||
|                   "Name": "CVE-TEST", | ||||
|                   "Namespace": "debian:8", | ||||
|                   "Severity": "Low", | ||||
|                 } | ||||
|               ], | ||||
|             } | ||||
|           ] | ||||
|         } | ||||
|       }) | ||||
| 
 | ||||
|     # Ensure that there are no event queue items for the layer. | ||||
|     self.assertIsNone(notification_queue.get()) | ||||
| 
 | ||||
|     # Fire off the notification processing. | ||||
|     with HTTMock(get_matching_layer_vulnerable, response_content): | ||||
|       notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High') | ||||
|       self.assertTrue(process_notification_data(notification_data)) | ||||
| 
 | ||||
|     # Ensure an event was written for the tag. | ||||
|     queue_item = notification_queue.get() | ||||
|     self.assertIsNotNone(queue_item) | ||||
| 
 | ||||
|     body = json.loads(queue_item.body) | ||||
|     self.assertEquals(['prod', 'latest'], body['event_data']['tags']) | ||||
|     self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) | ||||
|     self.assertEquals('High', body['event_data']['vulnerability']['priority']) | ||||
|     self.assertTrue(body['event_data']['vulnerability']['has_fix']) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|   unittest.main() | ||||
|  | @ -3,7 +3,7 @@ | |||
| PRIORITY_LEVELS = { | ||||
|  'Unknown': { | ||||
|    'title': 'Unknown', | ||||
|    'index': '6', | ||||
|    'index': 6, | ||||
|    'level': 'info', | ||||
| 
 | ||||
|    'description': 'Unknown is either a security problem that has not been assigned ' + | ||||
|  | @ -13,7 +13,7 @@ PRIORITY_LEVELS = { | |||
| 
 | ||||
|  'Negligible': { | ||||
|    'title': 'Negligible', | ||||
|    'index': '5', | ||||
|    'index': 5, | ||||
|    'level': 'info', | ||||
| 
 | ||||
|    'description': 'Negligible is technically a security problem, but is only theoretical ' + | ||||
|  | @ -24,7 +24,7 @@ PRIORITY_LEVELS = { | |||
| 
 | ||||
|  'Low': { | ||||
|    'title': 'Low', | ||||
|    'index': '4', | ||||
|    'index': 4, | ||||
|    'level': 'warning', | ||||
| 
 | ||||
|    'description': 'Low is a security problem, but is hard to exploit due to environment, ' + | ||||
|  | @ -36,7 +36,7 @@ PRIORITY_LEVELS = { | |||
|  'Medium': { | ||||
|    'title': 'Medium', | ||||
|    'value': 'Medium', | ||||
|    'index': '3', | ||||
|    'index': 3, | ||||
|    'level': 'warning', | ||||
| 
 | ||||
|    'description': 'Medium is a real security problem, and is exploitable for many people. ' + | ||||
|  | @ -48,7 +48,7 @@ PRIORITY_LEVELS = { | |||
|  'High': { | ||||
|    'title': 'High', | ||||
|    'value': 'High', | ||||
|    'index': '2', | ||||
|    'index': 2, | ||||
|    'level': 'warning', | ||||
| 
 | ||||
|    'description': 'High is a real problem, exploitable for many people in a default installation. ' + | ||||
|  | @ -60,7 +60,7 @@ PRIORITY_LEVELS = { | |||
|  'Critical': { | ||||
|    'title': 'Critical', | ||||
|    'value': 'Critical', | ||||
|    'index': '1', | ||||
|    'index': 1, | ||||
|    'level': 'error', | ||||
| 
 | ||||
|    'description': 'Critical is a world-burning problem, exploitable for nearly all people in ' + | ||||
|  | @ -72,7 +72,7 @@ PRIORITY_LEVELS = { | |||
|  'Defcon1': { | ||||
|    'title': 'Defcon 1', | ||||
|    'value': 'Defcon1', | ||||
|    'index': '0', | ||||
|    'index': 0, | ||||
|    'level': 'error', | ||||
| 
 | ||||
|    'description': 'Defcon1 is a Critical problem which has been manually highlighted ' + | ||||
|  |  | |||
|  | @ -19,8 +19,8 @@ class APIRequestFailure(Exception): | |||
| 
 | ||||
| _API_METHOD_INSERT = 'layers' | ||||
| _API_METHOD_GET_LAYER = 'layers/%s' | ||||
| _API_METHOD_GET_WITH_VULNERABILITIES_FLAG = '?vulnerabilities' | ||||
| _API_METHOD_GET_WITH_FEATURES_FLAG = '?features' | ||||
| _API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s' | ||||
| _API_METHOD_GET_NOTIFICATION = 'notifications/%s' | ||||
| 
 | ||||
| 
 | ||||
| class SecurityScannerAPI(object): | ||||
|  | @ -113,7 +113,7 @@ class SecurityScannerAPI(object): | |||
| 
 | ||||
|     logger.info('Analyzing layer %s', request['Layer']['Name']) | ||||
|     try: | ||||
|       response = self._call(_API_METHOD_INSERT, request) | ||||
|       response = self._call('POST', _API_METHOD_INSERT, request) | ||||
|       json_response = response.json() | ||||
|     except requests.exceptions.Timeout: | ||||
|       logger.exception('Timeout when trying to post layer data response for %s', layer.id) | ||||
|  | @ -146,35 +146,94 @@ class SecurityScannerAPI(object): | |||
|     return api_version, False | ||||
| 
 | ||||
| 
 | ||||
|   def check_layer_vulnerable(self, layer_id, cve_name): | ||||
|     """ Checks to see if the layer with the given ID is vulnerable to the specified CVE. """ | ||||
|     layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True) | ||||
|     if layer_data is None or 'Layer' not in layer_data or 'Features' not in layer_data['Layer']: | ||||
|       return False | ||||
| 
 | ||||
|     for feature in layer_data['Layer']['Features']: | ||||
|       for vuln in feature.get('Vulnerabilities', []): | ||||
|         if vuln['Name'] == cve_name: | ||||
|           return True | ||||
| 
 | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
|   def get_notification(self, notification_name, layer_limit=10, page=None): | ||||
|     """ Gets the data for a specific notification, with optional page token. | ||||
|         Returns a tuple of the data (None on failure) and whether to retry. | ||||
|     """ | ||||
|     try: | ||||
|       params = { | ||||
|         'limit': layer_limit | ||||
|       } | ||||
| 
 | ||||
|       if page is not None: | ||||
|         params['page'] = page | ||||
| 
 | ||||
|       response = self._call('GET', _API_METHOD_GET_NOTIFICATION % notification_name, params=params) | ||||
|       json_response = response.json() | ||||
|     except requests.exceptions.Timeout: | ||||
|       logger.exception('Timeout when trying to get notification for %s', notification_name) | ||||
|       return None, True | ||||
|     except requests.exceptions.ConnectionError: | ||||
|       logger.exception('Connection error when trying to get notification for %s', notification_name) | ||||
|       return None, True | ||||
|     except (requests.exceptions.RequestException, ValueError): | ||||
|       logger.exception('Failed to get notification for %s', notification_name) | ||||
|       return None, False | ||||
| 
 | ||||
|     if response.status_code != 200: | ||||
|       return None, response.status_code != 404 and response.status_code != 400 | ||||
| 
 | ||||
|     return json_response, False | ||||
| 
 | ||||
| 
 | ||||
|   def mark_notification_read(self, notification_name): | ||||
|     """ Marks a security scanner notification as read. """ | ||||
|     try: | ||||
|       response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name) | ||||
|       return response.status_code == 200 | ||||
|     except requests.exceptions.RequestException: | ||||
|       logger.exception('Failed to mark notification as read: %s', notification_name) | ||||
|       return False | ||||
| 
 | ||||
| 
 | ||||
|   def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False): | ||||
|     """ Returns the layer data for the specified layer. On error, returns None. """ | ||||
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) | ||||
|     return self._get_layer_data(layer_id, include_features, include_vulnerabilities) | ||||
| 
 | ||||
| 
 | ||||
|   def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False): | ||||
|     try: | ||||
|       flag = '' | ||||
|       params = {} | ||||
|       if include_features: | ||||
|         flag = _API_METHOD_GET_WITH_FEATURES_FLAG | ||||
|         params = {'features': True} | ||||
| 
 | ||||
|       if include_vulnerabilities: | ||||
|         flag = _API_METHOD_GET_WITH_VULNERABILITIES_FLAG | ||||
|         params = {'vulnerabilities': True} | ||||
| 
 | ||||
|       response = self._call(_API_METHOD_GET_LAYER + flag, None, layer_id) | ||||
|       response = self._call('GET', _API_METHOD_GET_LAYER % layer_id, params=params) | ||||
|       logger.debug('Got response %s for vulnerabilities for layer %s', | ||||
|                    response.status_code, layer_id) | ||||
|       json_response = response.json() | ||||
|     except requests.exceptions.Timeout: | ||||
|       raise APIRequestFailure('API call timed out') | ||||
|     except requests.exceptions.ConnectionError: | ||||
|       raise APIRequestFailure('Could not connect to security service') | ||||
|     except (requests.exceptions.RequestException, ValueError): | ||||
|       logger.exception('Failed to get layer data response for %s', layer.id) | ||||
|       logger.exception('Failed to get layer data response for %s', layer_id) | ||||
|       raise APIRequestFailure() | ||||
| 
 | ||||
|     if response.status_code == 404: | ||||
|       return None | ||||
| 
 | ||||
|     return response.json() | ||||
|     return json_response | ||||
| 
 | ||||
| 
 | ||||
|   def _call(self, relative_url, body=None, *args, **kwargs): | ||||
|   def _call(self, method, relative_url, params=None, body=None): | ||||
|     """ Issues an HTTP call to the sec API at the given relative URL. | ||||
|         This function disconnects from the database while awaiting a response | ||||
|         from the API server. | ||||
|  | @ -184,18 +243,21 @@ class SecurityScannerAPI(object): | |||
|       raise Exception('Cannot call unconfigured security system') | ||||
| 
 | ||||
|     api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' | ||||
|     url = urljoin(api_url, relative_url % args) | ||||
|     url = urljoin(api_url, relative_url) | ||||
| 
 | ||||
|     client = self.config['HTTPCLIENT'] | ||||
|     timeout = security_config.get('API_TIMEOUT_SECONDS', 1) | ||||
|     logger.debug('Looking up sec information: %s', url) | ||||
| 
 | ||||
|     with CloseForLongOperation(self.config): | ||||
|       if body is not None: | ||||
|       if method == 'POST': | ||||
|         logger.debug('POSTing security URL %s', url) | ||||
|         return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self._keys, | ||||
|         return client.post(url, json=body, params=params, timeout=timeout, cert=self._keys, | ||||
|                            verify=self._certificate) | ||||
|       elif method == 'DELETE': | ||||
|         logger.debug('DELETEing security URL %s', url) | ||||
|         return client.delete(url, params=params, timeout=timeout, cert=self._keys, | ||||
|                              verify=self._certificate) | ||||
|       else: | ||||
|         logger.debug('GETing security URL %s', url) | ||||
|         return client.get(url, params=kwargs, timeout=timeout, cert=self._keys, | ||||
|         return client.get(url, params=params, timeout=timeout, cert=self._keys, | ||||
|                           verify=self._certificate) | ||||
|  |  | |||
							
								
								
									
										103
									
								
								util/secscan/notifier.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										103
									
								
								util/secscan/notifier.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,103 @@ | |||
| import logging | ||||
| import sys | ||||
| 
 | ||||
| from collections import defaultdict | ||||
| 
 | ||||
| from app import secscan_api | ||||
| from data.model.tag import filter_tags_have_repository_event, get_matching_tags | ||||
| from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository, | ||||
|                            RepositoryTag) | ||||
| from endpoints.notificationhelper import spawn_notification | ||||
| from util.secscan import PRIORITY_LEVELS | ||||
| from util.secscan.api import APIRequestFailure | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def process_notification_data(notification_data): | ||||
|   """ Processes the given notification data to spawn vulnerability notifications as necessary. | ||||
|       Returns whether the processing succeeded. | ||||
|   """ | ||||
|   new_data = notification_data['New'] | ||||
|   old_data = notification_data.get('Old', {}) | ||||
| 
 | ||||
|   new_vuln = new_data['Vulnerability'] | ||||
|   old_vuln = old_data.get('Vulnerability', {}) | ||||
| 
 | ||||
|   new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', [])) | ||||
|   old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', [])) | ||||
| 
 | ||||
|   new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) | ||||
|   old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) | ||||
| 
 | ||||
|   # By default we only notify the new layers that are affected by the vulnerability. If, however, | ||||
|   # the severity of the vulnerability has increased, we need to notify *all* layers, as we might | ||||
|   # need to send new notifications for older layers. | ||||
|   notify_layers = new_layer_ids - old_layer_ids | ||||
|   if new_severity['index'] < old_severity['index']: | ||||
|     notify_layers = new_layer_ids | old_layer_ids | ||||
| 
 | ||||
|   if not notify_layers: | ||||
|     # Nothing more to do. | ||||
|     return True | ||||
| 
 | ||||
|   # Lookup the external event for when we have vulnerabilities. | ||||
|   event = ExternalNotificationEvent.get(name='vulnerability_found') | ||||
| 
 | ||||
|   # For each layer, retrieving the matching tags and join with repository to determine which | ||||
|   # require new notifications. | ||||
|   tag_map = defaultdict(set) | ||||
|   repository_map = {} | ||||
|   cve_id = new_vuln['Name'] | ||||
| 
 | ||||
|   # Find all tags that contain the layer(s) introducing the vulnerability, | ||||
|   # in repositories that have the event setup. | ||||
|   for layer_id in notify_layers: | ||||
|     # Split the layer ID into its Docker Image ID and storage ID. | ||||
|     (docker_image_id, storage_uuid) = layer_id.split('.', 2) | ||||
| 
 | ||||
|     # Find the matching tags. | ||||
|     matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, | ||||
|                                  Image, ImageStorage) | ||||
|     tags = list(filter_tags_have_repository_event(matching, event)) | ||||
| 
 | ||||
|     check_map = {} | ||||
|     for tag in tags: | ||||
|       # Verify that the tag's root image has the vulnerability. | ||||
|       tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) | ||||
|       logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) | ||||
| 
 | ||||
|       if not tag_layer_id in check_map: | ||||
|         try: | ||||
|           is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) | ||||
|         except APIRequestFailure: | ||||
|           return False | ||||
| 
 | ||||
|         check_map[tag_layer_id] = is_vulerable | ||||
| 
 | ||||
|       logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, | ||||
|                    check_map[tag_layer_id]) | ||||
| 
 | ||||
|       if check_map[tag_layer_id]: | ||||
|         # Add the vulnerable tag to the list. | ||||
|         tag_map[tag.repository_id].add(tag.name) | ||||
|         repository_map[tag.repository_id] = tag.repository | ||||
| 
 | ||||
|   # For each of the tags found, issue a notification. | ||||
|   for repository_id in tag_map: | ||||
|     tags = tag_map[repository_id] | ||||
|     event_data = { | ||||
|       'tags': list(tags), | ||||
|       'vulnerability': { | ||||
|         'id': cve_id, | ||||
|         'description': new_vuln.get('Description', None), | ||||
|         'link': new_vuln.get('Link', None), | ||||
|         'priority': new_severity['title'], | ||||
|         'has_fix': 'FixedIn' in new_vuln, | ||||
|       }, | ||||
|     } | ||||
| 
 | ||||
|     spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) | ||||
| 
 | ||||
|   return True | ||||
| 
 | ||||
|  | @ -56,10 +56,11 @@ class QueueWorker(Worker): | |||
|       logger.debug('Disconnecting from database.') | ||||
|       db.close() | ||||
| 
 | ||||
|   def extend_processing(self, seconds_from_now): | ||||
|   def extend_processing(self, seconds_from_now, updated_data=None): | ||||
|     with self._current_item_lock: | ||||
|       if self.current_queue_item is not None: | ||||
|         self._queue.extend_processing(self.current_queue_item, seconds_from_now) | ||||
|         self._queue.extend_processing(self.current_queue_item, seconds_from_now, | ||||
|                                       updated_data=updated_data) | ||||
| 
 | ||||
|   def run_watchdog(self): | ||||
|     logger.debug('Running watchdog.') | ||||
|  |  | |||
|  | @ -1,80 +1,45 @@ | |||
| import json | ||||
| import logging | ||||
| import time | ||||
| 
 | ||||
| from collections import defaultdict | ||||
| import json | ||||
| 
 | ||||
| import features | ||||
| 
 | ||||
| from app import secscan_notification_queue, secscan_api | ||||
| from data import model | ||||
| from data.model.tag import filter_tags_have_repository_event, get_matching_tags | ||||
| from data.database import (Image, ImageStorage, ExternalNotificationEvent, | ||||
|                            Repository, RepositoryNotification, RepositoryTag) | ||||
| from endpoints.notificationhelper import spawn_notification | ||||
| from workers.queueworker import QueueWorker | ||||
| 
 | ||||
| from workers.queueworker import QueueWorker, JobException | ||||
| from util.secscan.notifier import process_notification_data | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| _EXTENDED_SECONDS = 600 | ||||
| 
 | ||||
| class SecurityNotificationWorker(QueueWorker): | ||||
|   def process_queue_item(self, data): | ||||
|     cve_id = data['Name'] | ||||
|     vulnerability = data['Content']['Vulnerability'] | ||||
|     priority = vulnerability['Priority'] | ||||
|     notification_name = data['Name'] | ||||
|     current_page = data.get('page', None) | ||||
| 
 | ||||
|     # Lookup the external event for when we have vulnerabilities. | ||||
|     event = ExternalNotificationEvent.get(name='vulnerability_found') | ||||
|     while True: | ||||
|       (response_data, should_retry) = secscan_api.get_notification(notification_name) | ||||
|       if response_data is None: | ||||
|         if should_retry: | ||||
|           raise JobException() | ||||
|         else: | ||||
|           # Return to mark the job as "complete", as we'll never be able to finish it. | ||||
|           logger.error('Failed to handle security notification %s', notification_name) | ||||
|           return | ||||
| 
 | ||||
|     # For each layer, retrieving the matching tags and join with repository to determine which | ||||
|     # require new notifications. | ||||
|     tag_map = defaultdict(set) | ||||
|     repository_map = {} | ||||
|       notification_data = response_data['Notification'] | ||||
|       if not process_notification_data(notification_data): | ||||
|         raise JobException() | ||||
| 
 | ||||
|     # Find all tags that contain the layer(s) introducing the vulnerability, | ||||
|     # in repositories that have the event setup. | ||||
|     content = data['Content'] | ||||
|     layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) | ||||
|     for layer_id in layer_ids: | ||||
|       (docker_image_id, storage_uuid) = layer_id.split('.', 2) | ||||
|       # Check for a next page of results. If none, we're done. | ||||
|       if 'NextPage' not in notification_data: | ||||
|         return | ||||
| 
 | ||||
|       matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, | ||||
|                                    Image, ImageStorage) | ||||
|       tags = list(filter_tags_have_repository_event(matching, event)) | ||||
| 
 | ||||
|       check_map = {} | ||||
|       for tag in tags: | ||||
|         # Verify that the tag's root image has the vulnerability. | ||||
|         tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) | ||||
|         logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) | ||||
| 
 | ||||
|         if not tag_layer_id in check_map: | ||||
|           is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) | ||||
|           check_map[tag_layer_id] = is_vulerable | ||||
| 
 | ||||
|         logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, | ||||
|                      check_map[tag_layer_id]) | ||||
| 
 | ||||
|         if check_map[tag_layer_id]: | ||||
|           # Add the vulnerable tag to the list. | ||||
|           tag_map[tag.repository_id].add(tag.name) | ||||
|           repository_map[tag.repository_id] = tag.repository | ||||
| 
 | ||||
|     # For each of the tags found, issue a notification. | ||||
|     for repository_id in tag_map: | ||||
|       tags = tag_map[repository_id] | ||||
|       event_data = { | ||||
|         'tags': list(tags), | ||||
|         'vulnerability': { | ||||
|           'id': data['Name'], | ||||
|           'description': vulnerability['Description'], | ||||
|           'link': vulnerability['Link'], | ||||
|           'priority': priority, | ||||
|         }, | ||||
|       } | ||||
| 
 | ||||
|       spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) | ||||
|       # Otherwise, save the next page token into the queue item (so we can pick up from here if | ||||
|       # something goes wrong in the next loop iteration), and continue. | ||||
|       current_page = notification_data['NextPage'] | ||||
|       data['page'] = current_page | ||||
|       self.extend_processing(_EXTENDED_SECONDS, json.dumps(data)) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
		Reference in a new issue