diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 34817907f..ed4f9f97d 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -14,7 +14,6 @@ from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from playhouse.test_utils import assert_query_count, _QueryLogHandler -from httmock import urlmatch, HTTMock from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from mockldap import MockLdap @@ -28,6 +27,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing from data import database, model from data.database import RepositoryActionCount, Repository as RepositoryTable from test.helpers import assert_action_logged +from util.secscan.fake import fake_security_scanner from endpoints.api.team import (TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam, TeamPermissions) @@ -4115,47 +4115,6 @@ class TestSuperUserConfig(ApiTestCase): mockldap.stop() - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_success_mock(url, request): - vulnerabilities = [ - { - "Name": "CVE-2014-9471", - "Namespace": "debian:8", - "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", - "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", - "Severity": "Low", - "FixedBy": "9.23-5" - } - ] - - features = [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": vulnerabilities, - } - ] - - if not request.url.index('vulnerabilities') > 0: - vulnerabilities = [] - - if not request.url.index('features') > 0: - features = [] - - return py_json.dumps({ - "Layer": { - "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", - "Namespace": "debian:8", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "IndexedByVersion": 1, - "Features": features - } - }) - - - class TestRepositoryImageSecurity(ApiTestCase): def test_get_vulnerabilities(self): self.login(ADMIN_ACCESS_USER) @@ -4175,7 +4134,9 @@ class TestRepositoryImageSecurity(ApiTestCase): layer.save() # Grab the security info again. - with HTTMock(get_layer_success_mock): + with fake_security_scanner() as security_scanner: + security_scanner.add_layer(security_scanner.layer_id(layer)) + response = self.getJsonResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, diff --git a/test/test_secscan.py b/test/test_secscan.py index 91d2616ca..9a1a3018e 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -1,116 +1,24 @@ import json import time import unittest -from httmock import urlmatch, all_requests, HTTMock from app import app, storage, notification_queue +from data import model +from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION from endpoints.notificationevent import VulnerabilityFoundEvent +from endpoints.v2 import v2_bp from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer +from util.secscan.fake import fake_security_scanner from util.secscan.notifier import process_notification_data -from data import model -from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION from workers.security_notification_worker import SecurityNotificationWorker -from endpoints.v2 import v2_bp ADMIN_ACCESS_USER = 'devtable' SIMPLE_REPO = 'simple' COMPLEX_REPO = 'complex' -_PORT_NUMBER = 5001 - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_failure_mock(url, request): - return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_badrequest_mock(url, request): - return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_internal_mock(url, request): - return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_failure_mock(url, request): - return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_success_mock(url, request): - body_data = json.loads(request.body) - if not 'Layer' in body_data: - return {'status_code': 400, 'content': 'Missing body'} - - layer = body_data['Layer'] - if not 'Path' in layer: - return {'status_code': 400, 'content': 'Missing Path'} - - if not 'Name' in layer: - return {'status_code': 400, 'content': 'Missing Name'} - - if not 'Format' in layer: - return {'status_code': 400, 'content': 'Missing Format'} - - return {'status_code': 201, 'content': json.dumps({ - "Layer": { - "Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6", - "Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "Format": "Docker", - "IndexedByVersion": 1 - } - })} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_success_mock(url, request): - vulnerabilities = [ - { - "Name": "CVE-2014-9471", - "Namespace": "debian:8", - "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", - "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", - "Severity": "Low", - "FixedBy": "9.23-5" - } - ] - - features = [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": vulnerabilities, - } - ] - - if not request.url.find('vulnerabilities') > 0: - vulnerabilities = [] - - if not request.url.find('features') > 0: - features = [] - - return json.dumps({ - "Layer": { - "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", - "Namespace": "debian:8", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "IndexedByVersion": 1, - "Features": features - } - }) - - -@all_requests -def response_content(url, request): - return {'status_code': 500, 'content': 'Unknown endpoint'} - class TestSecurityScanner(unittest.TestCase): def setUp(self): @@ -135,33 +43,42 @@ class TestSecurityScanner(unittest.TestCase): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) - def assertAnalyzed(self, layer, isAnalyzed, engineVersion): + def assertAnalyzed(self, layer, security_scanner, isAnalyzed, engineVersion): self.assertEquals(isAnalyzed, layer.security_indexed) self.assertEquals(engineVersion, layer.security_indexed_engine) - # Ensure all parent layers are marked as analyzed. - parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) - for parent in parents: - self.assertEquals(isAnalyzed, parent.security_indexed) - self.assertEquals(engineVersion, parent.security_indexed_engine) + if isAnalyzed: + self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer))) + + # Ensure all parent layers are marked as analyzed. + parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) + for parent in parents: + self.assertTrue(parent.security_indexed) + self.assertEquals(engineVersion, parent.security_indexed_engine) + self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent))) - def test_get_layer_success(self): + def test_get_layer(self): + """ Test for basic retrieval of layers from the security scanner. """ layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - with HTTMock(get_layer_success_mock, response_content): + + with fake_security_scanner() as security_scanner: + # Ensure the layer doesn't exist yet. + self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer))) + self.assertIsNone(self.api.get_layer_data(layer)) + + # Add the layer. + security_scanner.add_layer(security_scanner.layer_id(layer)) + + # Retrieve the results. result = self.api.get_layer_data(layer, include_vulnerabilities=True) self.assertIsNotNone(result) - self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52') - - - def test_get_layer_failure(self): - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - with HTTMock(get_layer_failure_mock, response_content): - result = self.api.get_layer_data(layer, include_vulnerabilities=True) - self.assertIsNone(result) + self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer)) def test_analyze_layer_nodirectdownload_success(self): + """ Tests analyzing a layer when direct download is disabled. """ + # Disable direct download in fake storage. storage.put_content(['local_us'], 'supports_direct_download', 'false') @@ -190,38 +107,44 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(rv.status_code, 200) # Ensure the code works when called via analyze. - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) def test_analyze_layer_success(self): + """ Tests that analyzing a layer successfully marks it as analyzed. """ + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) def test_analyze_layer_failure(self): + """ Tests that failing to analyze a layer marks it as not analyzed. """ + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_failure_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_fail_layer_id(security_scanner.layer_id(layer)) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, False, 1) + self.assertAnalyzed(layer, security_scanner, False, 1) def test_analyze_layer_internal_error(self): @@ -229,27 +152,40 @@ class TestSecurityScanner(unittest.TestCase): self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_internal_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer)) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, False, -1) + self.assertAnalyzed(layer, security_scanner, False, -1) - def test_analyze_layer_bad_request(self): + def test_analyze_layer_missing_parent(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_badrequest_mock, response_content): + with fake_security_scanner() as security_scanner: + # Analyze the layer and its parents. analyzer = LayerAnalyzer(app.config, self.api) - try: - analyzer.analyze_recursively(layer) - except AnalyzeLayerException: - return + analyzer.analyze_recursively(layer) - self.fail('Expected exception on bad request') + # Make sure it was analyzed. + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Mark the layer as not analyzed and delete its parent layer from the security scanner. + layer.security_indexed_engine = IMAGE_NOT_SCANNED_ENGINE_VERSION + layer.security_indexed = False + layer.save() + + security_scanner.remove_layer(security_scanner.layer_id(layer.parent)) + + # Try to analyze again; this should fail because the parent is missing. + with self.assertRaisesRegexp(AnalyzeLayerException, 'Bad request to security scanner'): + analyzer.analyze_recursively(layer) def test_analyze_layer_missing_storage(self): @@ -263,7 +199,7 @@ class TestSecurityScanner(unittest.TestCase): storage.remove(locations, path) storage.remove(locations, 'all_files_exist') - with HTTMock(analyze_layer_success_mock, response_content): + with fake_security_scanner(): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) @@ -271,6 +207,7 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(False, layer.security_indexed) self.assertEquals(1, layer.security_indexed_engine) + def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed, expect_notification): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) @@ -288,12 +225,23 @@ class TestSecurityScanner(unittest.TestCase): layer.security_indexed = security_indexed layer.save() - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_vulns(security_scanner.layer_id(layer), [ + { + "Name": "CVE-2014-9471", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedBy": "9.23-5" + } + ]) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) # Ensure an event was written for the tag (if necessary). time.sleep(1) @@ -315,70 +263,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(updated_layer.id, layer.id) self.assertTrue(updated_layer.security_indexed_engine > 0) + def test_analyze_layer_success_events(self): # Not previously indexed at all => Notification self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True) + def test_analyze_layer_success_no_notification(self): # Previously successfully indexed => No notification self.assert_analyze_layer_notify(0, True, False) + def test_analyze_layer_failed_then_success_notification(self): # Previously failed to index => Notification self.assert_analyze_layer_notify(0, False, True) - def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'): - return { - "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", - "Created": "1456247389", - "Notified": "1456246708", - "Limit": 2, - "New": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": new_severity, - "FixedIn": [ - { - "Name": "grep", - "Namespace": "debian:8", - "Version": "2.25" - } - ] - }, - "LayersIntroducingVulnerability": new_layer_ids, - }, - "Old": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": "Low", - "FixedIn": [] - }, - "LayersIntroducingVulnerability": old_layer_ids, - } - } - - - def _get_delete_notification_data(self, old_layer_ids): - return { - "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", - "Created": "1456247389", - "Notified": "1456246708", - "Limit": 2, - "Old": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": "Low", - "FixedIn": [] - }, - "LayersIntroducingVulnerability": old_layer_ids, - } - } def test_notification_new_layers_not_vulnerable(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) @@ -388,34 +287,25 @@ class TestSecurityScanner(unittest.TestCase): repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_not_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [], # Report not vulnerable. - } - ] - } - }) - # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_not_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], []) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], [], {}, {}) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_delete(self): @@ -430,11 +320,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - notification_data = self._get_delete_notification_data([layer_id]) - self.assertTrue(process_notification_data(notification_data)) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], None, {}, None) + + # Process the notification. + self.assertTrue(process_notification_data(notification_data)) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_new_layers(self): @@ -445,53 +345,47 @@ class TestSecurityScanner(unittest.TestCase): repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [ - { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Severity": "Low", - } - ], - } - ] - } - }) - # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], []) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedIn": {'Version': "9.23-5"}, + } + security_scanner.set_vulns(layer_id, [vuln_info]) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([], [layer_id], vuln_info, vuln_info) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure an event was written for the tag. - time.sleep(1) - queue_item = notification_queue.get() - self.assertIsNotNone(queue_item) + # Ensure an event was written for the tag. + time.sleep(1) + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) - body = json.loads(queue_item.body) - self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags'])) - self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) - self.assertEquals('Low', body['event_data']['vulnerability']['priority']) - self.assertTrue(body['event_data']['vulnerability']['has_fix']) + item_body = json.loads(queue_item.body) + self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags'])) + self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id']) + self.assertEquals('Low', item_body['event_data']['vulnerability']['priority']) + self.assertTrue(item_body['event_data']['vulnerability']['has_fix']) def test_notification_no_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) @@ -501,12 +395,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(response_content): - notification_data = self._get_notification_data([layer_id], [layer_id]) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([], [], {}, {}) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_no_new_layers_increased_severity(self): @@ -515,62 +418,73 @@ class TestSecurityScanner(unittest.TestCase): # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [ - { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Severity": "Low", - } - ], - } - ] - } - }) + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 100}) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical') + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + old_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + } + + new_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Critical", + "FixedIn": {'Version': "9.23-5"}, + } + + security_scanner.set_vulns(layer_id, [new_vuln_info]) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], [layer_id], + old_vuln_info, new_vuln_info) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure an event was written for the tag. - time.sleep(1) - queue_item = notification_queue.get() - self.assertIsNotNone(queue_item) + # Ensure an event was written for the tag. + time.sleep(1) + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) - body = json.loads(queue_item.body) - self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags'])) - self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) - self.assertEquals('Critical', body['event_data']['vulnerability']['priority']) - self.assertTrue(body['event_data']['vulnerability']['has_fix']) + item_body = json.loads(queue_item.body) + self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags'])) + self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id']) + self.assertEquals('Critical', item_body['event_data']['vulnerability']['priority']) + self.assertTrue(item_body['event_data']['vulnerability']['has_fix']) - # Verify that an event would be raised. - event_data = body['event_data'] - self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Verify that an event would be raised. + event_data = item_body['event_data'] + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) - # Create another notification with a matching level and verify it will be raised. - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1}) - self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Create another notification with a matching level and verify it will be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 1}) + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) - # Create another notification with a higher level and verify it won't be raised. - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0}) - self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Create another notification with a higher level and verify it won't be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 0}) + self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) def test_select_images_to_scan(self): @@ -588,60 +502,61 @@ class TestSecurityScanner(unittest.TestCase): def test_notification_worker(self): - pages_called = [] + layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) + layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') - def delete_notification(url, request): - pages_called.append('DELETE') - return {'status_code': 201, 'content': ''} + # Add a repo events for the layers. + simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') - def get_notification(url, request): - if url.query.find('page=nextpage') >= 0: - pages_called.append('GET-2') - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + model.notification.create_repo_notification(simple_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) + model.notification.create_repo_notification(complex_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) - data = { - 'Notification': self._get_notification_data([layer_id], [layer_id]), - } + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) - return json.dumps(data) - else: - pages_called.append('GET-1') - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) - - notification_data = self._get_notification_data([layer_id], [layer_id]) - notification_data['NextPage'] = 'nextpage' - - data = { - 'Notification': notification_data, - } - - return json.dumps(data) - - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)') - def unknown_notification(url, request): - return {'status_code': 404, 'content': 'Unknown notification'} - - # Test with an unknown notification. - with HTTMock(get_notification, unknown_notification): + with fake_security_scanner() as security_scanner: + # Test with an unknown notification. worker = SecurityNotificationWorker(None) self.assertFalse(worker.perform_notification_work({ 'Name': 'unknownnotification' })) - # Test with a known notification with pages. - data = { - 'Name': 'somenotification' - } + # Add some analyzed layers. + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer1) + analyzer.analyze_recursively(layer2) + + # Add a notification with pages of data. + new_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Critical", + "FixedIn": {'Version': "9.23-5"}, + } + + security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info]) + security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info]) + + layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)] + notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info) + + # Test with a known notification with pages. + data = { + 'Name': notification_data['Name'], + } - with HTTMock(get_notification, delete_notification, unknown_notification): worker = SecurityNotificationWorker(None) - self.assertTrue(worker.perform_notification_work(data)) + self.assertTrue(worker.perform_notification_work(data, layer_limit=1)) - self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called) + # Make sure all pages were processed by ensuring we have two notifications. + time.sleep(1) + self.assertIsNotNone(notification_queue.get()) + self.assertIsNotNone(notification_queue.get()) if __name__ == '__main__': diff --git a/test/testconfig.py b/test/testconfig.py index 02c678200..76192605a 100644 --- a/test/testconfig.py +++ b/test/testconfig.py @@ -60,7 +60,7 @@ class TestConfig(DefaultConfig): FEATURE_SECURITY_SCANNER = True FEATURE_SECURITY_NOTIFICATIONS = True - SECURITY_SCANNER_ENDPOINT = 'http://mockclairservice/' + SECURITY_SCANNER_ENDPOINT = 'http://fakesecurityscanner/' SECURITY_SCANNER_API_VERSION = 'v1' SECURITY_SCANNER_ENGINE_VERSION_TARGET = 1 SECURITY_SCANNER_API_TIMEOUT_SECONDS = 1 diff --git a/util/secscan/api.py b/util/secscan/api.py index cbe97745f..2e04067ea 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -159,11 +159,10 @@ class SecurityScannerAPI(object): except requests.exceptions.ConnectionError: logger.exception('Connection error when trying to post layer data response for %s', layer.id) return None, True - except (requests.exceptions.RequestException, ValueError): + except (requests.exceptions.RequestException, ValueError) as re: logger.exception('Failed to post layer data response for %s', layer.id) return None, False - # Handle any errors from the security scanner. if response.status_code != 201: message = json_response.get('Error').get('Message', '') diff --git a/util/secscan/fake.py b/util/secscan/fake.py new file mode 100644 index 000000000..6b1efc894 --- /dev/null +++ b/util/secscan/fake.py @@ -0,0 +1,264 @@ +import json +import copy +import uuid +import urlparse + +from contextlib import contextmanager +from httmock import urlmatch, HTTMock, all_requests + +@contextmanager +def fake_security_scanner(hostname='fakesecurityscanner'): + """ Context manager which yields a fake security scanner. All requests made to the given + hostname (default: fakesecurityscanner) will be handled by the fake. + """ + scanner = FakeSecurityScanner(hostname) + with HTTMock(*(scanner.get_endpoints())): + yield scanner + + +class FakeSecurityScanner(object): + """ Implements a fake security scanner (with somewhat real responses) for testing API calls and + responses. + """ + def __init__(self, hostname, index_version=1): + self.hostname = hostname + self.index_version = index_version + self.layers = {} + self.notifications = {} + self.layer_vulns = {} + + self.fail_layer_id = None + self.internal_error_layer_id = None + + def set_fail_layer_id(self, fail_layer_id): + """ Sets a layer ID that, if encountered when the analyze call is made, causes a 422 + to be raised. + """ + self.fail_layer_id = fail_layer_id + + def set_internal_error_layer_id(self, internal_error_layer_id): + """ Sets a layer ID that, if encountered when the analyze call is made, causes a 500 + to be raised. + """ + self.internal_error_layer_id = internal_error_layer_id + + def has_layer(self, layer_id): + """ Returns true if the layer with the given ID has been analyzed. """ + return layer_id in self.layers + + def has_notification(self, notification_id): + """ Returns whether a notification with the given ID is found in the scanner. """ + return notification_id in self.notifications + + def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln): + """ Adds a new notification over the given sets of layer IDs and vulnerability information, + returning the structural data of the notification created. + """ + notification_id = str(uuid.uuid4()) + self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids, + new_layer_ids=new_layer_ids, + old_vuln=old_vuln, + new_vuln=new_vuln) + + return self._get_notification_data(notification_id, 0, 100) + + def layer_id(self, layer): + """ Returns the Quay Security Scanner layer ID for the given layer (Image row). """ + return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + def add_layer(self, layer_id): + """ Adds a layer to the security scanner, with no features or vulnerabilities. """ + self.layers[layer_id] = { + "Name": layer_id, + "Format": "Docker", + "IndexedByVersion": self.index_version, + } + + def remove_layer(self, layer_id): + """ Removes a layer from the security scanner. """ + self.layers.pop(layer_id, None) + + def set_vulns(self, layer_id, vulns): + """ Sets the vulnerabilities for the layer with the given ID to those given. """ + self.layer_vulns[layer_id] = vulns + + # Since this call may occur before the layer is "anaylzed", we only add the data + # to the layer itself if present. + if self.layers.get(layer_id): + layer = self.layers[layer_id] + layer['Features'] = layer.get('Features', []) + layer['Features'].append({ + "Name": 'somefeature', + "Namespace": 'somenamespace', + "Version": 'someversion', + "Vulnerabilities": self.layer_vulns[layer_id], + }) + + def _get_notification_data(self, notification_id, page, limit): + """ Returns the structural data for the notification with the given ID, paginated using + the given page and limit. """ + notification = self.notifications[notification_id] + notification_data = { + "Name": notification_id, + "Created": "1456247389", + "Notified": "1456246708", + "Limit": limit, + } + + start_index = (page*limit) + end_index = ((page+1)*limit) + has_additional_page = False + + if notification.get('old_vuln'): + old_layer_ids = notification['old_layer_ids'] + old_layer_ids = old_layer_ids[start_index:end_index] + has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:])) + + notification_data['Old'] = { + 'Vulnerability': notification['old_vuln'], + 'LayersIntroducingVulnerability': old_layer_ids, + } + + if notification.get('new_vuln'): + new_layer_ids = notification['new_layer_ids'] + new_layer_ids = new_layer_ids[start_index:end_index] + has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:])) + + notification_data['New'] = { + 'Vulnerability': notification['new_vuln'], + 'LayersIntroducingVulnerability': new_layer_ids, + } + + if has_additional_page: + notification_data['NextPage'] = str(page+1) + + return notification_data + + def get_endpoints(self): + """ Returns the HTTMock endpoint definitions for the fake security scanner. """ + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET') + def get_layer_mock(url, request): + layer_id = url.path[len('/v1/layers/'):] + if layer_id == self.internal_error_layer_id: + return { + 'status_code': 500, + 'content': json.dumps({'Error': {'Message': 'Internal server error'}}), + } + + if not layer_id in self.layers: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown layer'}}), + } + + layer_data = copy.deepcopy(self.layers[layer_id]) + + has_vulns = request.url.find('vulnerabilities') > 0 + has_features = request.url.find('features') > 0 + if not has_vulns and not has_features: + layer_data.pop('Features', None) + + return { + 'status_code': 200, + 'content': json.dumps({'Layer': layer_data}), + } + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST') + def post_layer_mock(_, request): + body_data = json.loads(request.body) + if not 'Layer' in body_data: + return {'status_code': 400, 'content': 'Missing body'} + + layer = body_data['Layer'] + if not 'Path' in layer: + return {'status_code': 400, 'content': 'Missing Path'} + + if not 'Name' in layer: + return {'status_code': 400, 'content': 'Missing Name'} + + if not 'Format' in layer: + return {'status_code': 400, 'content': 'Missing Format'} + + if layer['Name'] == self.internal_error_layer_id: + return { + 'status_code': 500, + 'content': json.dumps({'Error': {'Message': 'Internal server error'}}), + } + + if layer['Name'] == self.fail_layer_id: + return { + 'status_code': 422, + 'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}), + } + + parent_id = layer.get('ParentName', None) + parent_layer = None + + if parent_id is not None: + parent_layer = self.layers.get(parent_id, None) + if parent_layer is None: + return { + 'status_code': 400, + 'content': json.dumps({'Error': {'Message': 'Unknown parent'}}), + } + + self.add_layer(layer['Name']) + if parent_layer is not None: + self.layers[layer['Name']]['ParentName'] = parent_id + + # If vulnerabilities have already been registered with this layer, call set_vulns to make sure + # their data is added to the layer's data. + if self.layer_vulns.get(layer['Name']): + self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']]) + + return { + 'status_code': 201, + 'content': json.dumps({ + "Layer": self.layers[layer['Name']], + }), + } + + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE') + def delete_notification(url, _): + notification_id = url.path[len('/v1/notifications/'):] + if notification_id not in self.notifications: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown notification'}}), + } + + self.notifications.pop(notification_id) + return { + 'status_code': 204, + 'content': '', + } + + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET') + def get_notification(url, _): + notification_id = url.path[len('/v1/notifications/'):] + if notification_id not in self.notifications: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown notification'}}), + } + + query_params = urlparse.parse_qs(url.query) + limit = int(query_params.get('limit', [2])[0]) + page = int(query_params.get('page', [0])[0]) + + notification_data = self._get_notification_data(notification_id, page, limit) + response = {'Notification': notification_data} + return { + 'status_code': 200, + 'content': json.dumps(response), + } + + @all_requests + def response_content(url, _): + raise Exception('Unknown endpoint: ' + str(url)) + + return [get_layer_mock, post_layer_mock, get_notification, delete_notification, + response_content] diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index 6b135ebaf..7717048df 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -20,7 +20,7 @@ class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): self.perform_notification_work(data) - def perform_notification_work(self, data): + def perform_notification_work(self, data, layer_limit=_LAYER_LIMIT): """ Performs the work for handling a security notification as referenced by the given data object. Returns True on successful handling, False on non-retryable failure and raises a JobException on retryable failure. @@ -31,7 +31,7 @@ class SecurityNotificationWorker(QueueWorker): while True: (response_data, should_retry) = secscan_api.get_notification(notification_name, - layer_limit=_LAYER_LIMIT, + layer_limit=layer_limit, page=current_page) if response_data is None: if should_retry: