import unittest import json from httmock import urlmatch, all_requests, HTTMock from app import app, storage, notification_queue from endpoints.notificationevent import VulnerabilityFoundEvent from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer from util.secscan.notifier import process_notification_data from data import model from workers.security_notification_worker import SecurityNotificationWorker from endpoints.v2 import v2_bp ADMIN_ACCESS_USER = 'devtable' SIMPLE_REPO = 'simple' COMPLEX_REPO = 'complex' _PORT_NUMBER = 5001 @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_layer_failure_mock(url, request): return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') def analyze_layer_badrequest_mock(url, request): return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') def analyze_layer_internal_mock(url, request): return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') def analyze_layer_failure_mock(url, request): return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') def analyze_layer_success_mock(url, request): body_data = json.loads(request.body) if not 'Layer' in body_data: return {'status_code': 400, 'content': 'Missing body'} layer = body_data['Layer'] if not 'Path' in layer: return {'status_code': 400, 'content': 'Missing Path'} if not 'Name' in layer: return {'status_code': 400, 'content': 'Missing Name'} if not 'Format' in layer: return {'status_code': 400, 'content': 'Missing Format'} return {'status_code': 201, 'content': json.dumps({ "Layer": { "Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6", "Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar", "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", "Format": "Docker", "IndexedByVersion": 1 } })} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_layer_success_mock(url, request): vulnerabilities = [ { "Name": "CVE-2014-9471", "Namespace": "debian:8", "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", "Severity": "Low", "FixedBy": "9.23-5" } ] features = [ { "Name": "coreutils", "Namespace": "debian:8", "Version": "8.23-4", "Vulnerabilities": vulnerabilities, } ] if not request.url.find('vulnerabilities') > 0: vulnerabilities = [] if not request.url.find('features') > 0: features = [] return json.dumps({ "Layer": { "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", "Namespace": "debian:8", "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", "IndexedByVersion": 1, "Features": features } }) @all_requests def response_content(url, request): return {'status_code': 500, 'content': 'Unknown endpoint'} class TestSecurityScanner(unittest.TestCase): def setUp(self): # Enable direct download in fake storage. storage.put_content(['local_us'], 'supports_direct_download', 'true') # Have fake storage say all files exist for the duration of the test. storage.put_content(['local_us'], 'all_files_exist', 'true') # Setup the database with fake storage. setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() self.api = SecurityScannerAPI(app, app.config, storage) def tearDown(self): storage.remove(['local_us'], 'supports_direct_download') storage.remove(['local_us'], 'all_files_exist') finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def assertAnalyzed(self, layer, isAnalyzed, engineVersion): self.assertEquals(isAnalyzed, layer.security_indexed) self.assertEquals(engineVersion, layer.security_indexed_engine) # Ensure all parent layers are marked as analyzed. parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) for parent in parents: self.assertEquals(isAnalyzed, parent.security_indexed) self.assertEquals(engineVersion, parent.security_indexed_engine) def test_get_layer_success(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) with HTTMock(get_layer_success_mock, response_content): result = self.api.get_layer_data(layer, include_vulnerabilities=True) self.assertIsNotNone(result) self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52') def test_get_layer_failure(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) with HTTMock(get_layer_failure_mock, response_content): result = self.api.get_layer_data(layer, include_vulnerabilities=True) self.assertIsNone(result) def test_analyze_layer_nodirectdownload_success(self): # Disable direct download in fake storage. storage.put_content(['local_us'], 'supports_direct_download', 'false') try: app.register_blueprint(v2_bp, url_prefix='/v2') except: # Already registered. pass layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) # Ensure that the download is a registry+JWT download. uri, auth_header = self.api._get_image_url_and_auth(layer) self.assertIsNotNone(uri) self.assertIsNotNone(auth_header) # Ensure the download doesn't work without the header. rv = self.app.head(uri) self.assertEquals(rv.status_code, 401) # Ensure the download works with the header. Note we use a HEAD here, as GET causes DB # access which messes with the test runner's rollback. rv = self.app.head(uri, headers=[('authorization', auth_header)]) self.assertEquals(rv.status_code, 200) # Ensure the code works when called via analyze. with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, True, 1) def test_analyze_layer_success(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, True, 1) def test_analyze_layer_failure(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) with HTTMock(analyze_layer_failure_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, False, 1) def test_analyze_layer_internal_error(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) with HTTMock(analyze_layer_internal_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, False, -1) def test_analyze_layer_bad_request(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) with HTTMock(analyze_layer_badrequest_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) try: analyzer.analyze_recursively(layer) except AnalyzeLayerException: return self.fail('Expected exception on bad request') def test_analyze_layer_missing_storage(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) # Delete the storage for the layer. path = model.storage.get_layer_path(layer.storage) locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE'] storage.remove(locations, path) storage.remove(locations, 'all_files_exist') with HTTMock(analyze_layer_success_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertEquals(False, layer.security_indexed) self.assertEquals(1, layer.security_indexed_engine) def test_analyze_layer_success_events(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) # Ensure there are no existing events. self.assertIsNone(notification_queue.get()) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, True, 1) # Ensure an event was written for the tag. queue_item = notification_queue.get() self.assertIsNotNone(queue_item) body = json.loads(queue_item.body) self.assertEquals(['latest', 'prod'], body['event_data']['tags']) self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id']) self.assertEquals('Low', body['event_data']['vulnerability']['priority']) self.assertTrue(body['event_data']['vulnerability']['has_fix']) def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'): return { "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", "Created": "1456247389", "Notified": "1456246708", "Limit": 2, "New": { "Vulnerability": { "Name": "CVE-TEST", "Namespace": "debian:8", "Description": "New CVE", "Severity": new_severity, "FixedIn": [ { "Name": "grep", "Namespace": "debian:8", "Version": "2.25" } ] }, "LayersIntroducingVulnerability": new_layer_ids, }, "Old": { "Vulnerability": { "Name": "CVE-TEST", "Namespace": "debian:8", "Description": "New CVE", "Severity": "Low", "FixedIn": [] }, "LayersIntroducingVulnerability": old_layer_ids, } } def _get_delete_notification_data(self, old_layer_ids): return { "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", "Created": "1456247389", "Notified": "1456246708", "Limit": 2, "Old": { "Vulnerability": { "Name": "CVE-TEST", "Namespace": "debian:8", "Description": "New CVE", "Severity": "Low", "FixedIn": [] }, "LayersIntroducingVulnerability": old_layer_ids, } } def test_notification_new_layers_not_vulnerable(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_matching_layer_not_vulnerable(url, request): return json.dumps({ "Layer": { "Name": layer_id, "Namespace": "debian:8", "IndexedByVersion": 1, "Features": [ { "Name": "coreutils", "Namespace": "debian:8", "Version": "8.23-4", "Vulnerabilities": [], # Report not vulnerable. } ] } }) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. with HTTMock(get_matching_layer_not_vulnerable, response_content): notification_data = self._get_notification_data([layer_id], []) self.assertTrue(process_notification_data(notification_data)) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) def test_notification_delete(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. notification_data = self._get_delete_notification_data([layer_id]) self.assertTrue(process_notification_data(notification_data)) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) def test_notification_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_matching_layer_vulnerable(url, request): return json.dumps({ "Layer": { "Name": layer_id, "Namespace": "debian:8", "IndexedByVersion": 1, "Features": [ { "Name": "coreutils", "Namespace": "debian:8", "Version": "8.23-4", "Vulnerabilities": [ { "Name": "CVE-TEST", "Namespace": "debian:8", "Severity": "Low", } ], } ] } }) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. with HTTMock(get_matching_layer_vulnerable, response_content): notification_data = self._get_notification_data([layer_id], []) self.assertTrue(process_notification_data(notification_data)) # Ensure an event was written for the tag. queue_item = notification_queue.get() self.assertIsNotNone(queue_item) body = json.loads(queue_item.body) self.assertEquals(['prod', 'latest'], body['event_data']['tags']) self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) self.assertEquals('Low', body['event_data']['vulnerability']['priority']) self.assertTrue(body['event_data']['vulnerability']['has_fix']) def test_notification_no_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. with HTTMock(response_content): notification_data = self._get_notification_data([layer_id], [layer_id]) self.assertTrue(process_notification_data(notification_data)) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) def test_notification_no_new_layers_increased_severity(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') def get_matching_layer_vulnerable(url, request): return json.dumps({ "Layer": { "Name": layer_id, "Namespace": "debian:8", "IndexedByVersion": 1, "Features": [ { "Name": "coreutils", "Namespace": "debian:8", "Version": "8.23-4", "Vulnerabilities": [ { "Name": "CVE-TEST", "Namespace": "debian:8", "Severity": "Low", } ], } ] } }) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. with HTTMock(get_matching_layer_vulnerable, response_content): notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical') self.assertTrue(process_notification_data(notification_data)) # Ensure an event was written for the tag. queue_item = notification_queue.get() self.assertIsNotNone(queue_item) body = json.loads(queue_item.body) self.assertEquals(['prod', 'latest'], body['event_data']['tags']) self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) self.assertEquals('Critical', body['event_data']['vulnerability']['priority']) self.assertTrue(body['event_data']['vulnerability']['has_fix']) # Verify that an event would be raised. event_data = body['event_data'] self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) # Create another notification with a matching level and verify it will be raised. notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1}) self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) # Create another notification with a higher level and verify it won't be raised. notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0}) self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) def test_notification_worker(self): pages_called = [] @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') def delete_notification(url, request): pages_called.append('DELETE') return {'status_code': 201, 'content': ''} @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') def get_notification(url, request): if url.query.find('page=nextpage') >= 0: pages_called.append('GET-2') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) data = { 'Notification': self._get_notification_data([layer_id], [layer_id]), } return json.dumps(data) else: pages_called.append('GET-1') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) notification_data = self._get_notification_data([layer_id], [layer_id]) notification_data['NextPage'] = 'nextpage' data = { 'Notification': notification_data, } return json.dumps(data) @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)') def unknown_notification(url, request): return {'status_code': 404, 'content': 'Unknown notification'} # Test with an unknown notification. with HTTMock(get_notification, unknown_notification): worker = SecurityNotificationWorker(None) self.assertFalse(worker.perform_notification_work({ 'Name': 'unknownnotification' })) # Test with a known notification with pages. data = { 'Name': 'somenotification' } with HTTMock(get_notification, delete_notification, unknown_notification): worker = SecurityNotificationWorker(None) self.assertTrue(worker.perform_notification_work(data)) self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called) if __name__ == '__main__': unittest.main()