From 15041ac5ed35535f50632bc02e666f6c29b21b69 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 14 Dec 2016 17:11:45 -0500 Subject: [PATCH] Add a fake security scanner class for easier testing The FakeSecurityScanner mocks out all calls that Quay is expected to make to the security scanner API, and returns faked data that can be adjusted by the calling test case --- test/test_api_usage.py | 47 +- test/test_secscan.py | 585 ++++++++++-------------- test/testconfig.py | 2 +- util/secscan/api.py | 3 +- util/secscan/fake.py | 264 +++++++++++ workers/security_notification_worker.py | 4 +- 6 files changed, 522 insertions(+), 383 deletions(-) create mode 100644 util/secscan/fake.py diff --git a/test/test_api_usage.py b/test/test_api_usage.py index f89003972..3e36efd1a 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -14,7 +14,6 @@ from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from playhouse.test_utils import assert_query_count, _QueryLogHandler -from httmock import urlmatch, HTTMock from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from mockldap import MockLdap @@ -28,6 +27,7 @@ from initdb import setup_database_for_testing, finished_database_for_testing from data import database, model from data.database import RepositoryActionCount, Repository as RepositoryTable from test.helpers import assert_action_logged +from util.secscan.fake import fake_security_scanner from endpoints.api.team import (TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam, TeamPermissions) @@ -4107,47 +4107,6 @@ class TestSuperUserConfig(ApiTestCase): mockldap.stop() - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_success_mock(url, request): - vulnerabilities = [ - { - "Name": "CVE-2014-9471", - "Namespace": "debian:8", - "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", - "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", - "Severity": "Low", - "FixedBy": "9.23-5" - } - ] - - features = [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": vulnerabilities, - } - ] - - if not request.url.index('vulnerabilities') > 0: - vulnerabilities = [] - - if not request.url.index('features') > 0: - features = [] - - return py_json.dumps({ - "Layer": { - "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", - "Namespace": "debian:8", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "IndexedByVersion": 1, - "Features": features - } - }) - - - class TestRepositoryImageSecurity(ApiTestCase): def test_get_vulnerabilities(self): self.login(ADMIN_ACCESS_USER) @@ -4167,7 +4126,9 @@ class TestRepositoryImageSecurity(ApiTestCase): layer.save() # Grab the security info again. - with HTTMock(get_layer_success_mock): + with fake_security_scanner() as security_scanner: + security_scanner.add_layer(security_scanner.layer_id(layer)) + response = self.getJsonResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, diff --git a/test/test_secscan.py b/test/test_secscan.py index 91d2616ca..9a1a3018e 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -1,116 +1,24 @@ import json import time import unittest -from httmock import urlmatch, all_requests, HTTMock from app import app, storage, notification_queue +from data import model +from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION from endpoints.notificationevent import VulnerabilityFoundEvent +from endpoints.v2 import v2_bp from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer +from util.secscan.fake import fake_security_scanner from util.secscan.notifier import process_notification_data -from data import model -from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION from workers.security_notification_worker import SecurityNotificationWorker -from endpoints.v2 import v2_bp ADMIN_ACCESS_USER = 'devtable' SIMPLE_REPO = 'simple' COMPLEX_REPO = 'complex' -_PORT_NUMBER = 5001 - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_failure_mock(url, request): - return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_badrequest_mock(url, request): - return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_internal_mock(url, request): - return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_failure_mock(url, request): - return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$') -def analyze_layer_success_mock(url, request): - body_data = json.loads(request.body) - if not 'Layer' in body_data: - return {'status_code': 400, 'content': 'Missing body'} - - layer = body_data['Layer'] - if not 'Path' in layer: - return {'status_code': 400, 'content': 'Missing Path'} - - if not 'Name' in layer: - return {'status_code': 400, 'content': 'Missing Name'} - - if not 'Format' in layer: - return {'status_code': 400, 'content': 'Missing Format'} - - return {'status_code': 201, 'content': json.dumps({ - "Layer": { - "Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6", - "Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "Format": "Docker", - "IndexedByVersion": 1 - } - })} - - -@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') -def get_layer_success_mock(url, request): - vulnerabilities = [ - { - "Name": "CVE-2014-9471", - "Namespace": "debian:8", - "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.", - "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", - "Severity": "Low", - "FixedBy": "9.23-5" - } - ] - - features = [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": vulnerabilities, - } - ] - - if not request.url.find('vulnerabilities') > 0: - vulnerabilities = [] - - if not request.url.find('features') > 0: - features = [] - - return json.dumps({ - "Layer": { - "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52", - "Namespace": "debian:8", - "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2", - "IndexedByVersion": 1, - "Features": features - } - }) - - -@all_requests -def response_content(url, request): - return {'status_code': 500, 'content': 'Unknown endpoint'} - class TestSecurityScanner(unittest.TestCase): def setUp(self): @@ -135,33 +43,42 @@ class TestSecurityScanner(unittest.TestCase): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) - def assertAnalyzed(self, layer, isAnalyzed, engineVersion): + def assertAnalyzed(self, layer, security_scanner, isAnalyzed, engineVersion): self.assertEquals(isAnalyzed, layer.security_indexed) self.assertEquals(engineVersion, layer.security_indexed_engine) - # Ensure all parent layers are marked as analyzed. - parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) - for parent in parents: - self.assertEquals(isAnalyzed, parent.security_indexed) - self.assertEquals(engineVersion, parent.security_indexed_engine) + if isAnalyzed: + self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer))) + + # Ensure all parent layers are marked as analyzed. + parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer) + for parent in parents: + self.assertTrue(parent.security_indexed) + self.assertEquals(engineVersion, parent.security_indexed_engine) + self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent))) - def test_get_layer_success(self): + def test_get_layer(self): + """ Test for basic retrieval of layers from the security scanner. """ layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - with HTTMock(get_layer_success_mock, response_content): + + with fake_security_scanner() as security_scanner: + # Ensure the layer doesn't exist yet. + self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer))) + self.assertIsNone(self.api.get_layer_data(layer)) + + # Add the layer. + security_scanner.add_layer(security_scanner.layer_id(layer)) + + # Retrieve the results. result = self.api.get_layer_data(layer, include_vulnerabilities=True) self.assertIsNotNone(result) - self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52') - - - def test_get_layer_failure(self): - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - with HTTMock(get_layer_failure_mock, response_content): - result = self.api.get_layer_data(layer, include_vulnerabilities=True) - self.assertIsNone(result) + self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer)) def test_analyze_layer_nodirectdownload_success(self): + """ Tests analyzing a layer when direct download is disabled. """ + # Disable direct download in fake storage. storage.put_content(['local_us'], 'supports_direct_download', 'false') @@ -190,38 +107,44 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(rv.status_code, 200) # Ensure the code works when called via analyze. - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) def test_analyze_layer_success(self): + """ Tests that analyzing a layer successfully marks it as analyzed. """ + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) def test_analyze_layer_failure(self): + """ Tests that failing to analyze a layer marks it as not analyzed. """ + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_failure_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_fail_layer_id(security_scanner.layer_id(layer)) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, False, 1) + self.assertAnalyzed(layer, security_scanner, False, 1) def test_analyze_layer_internal_error(self): @@ -229,27 +152,40 @@ class TestSecurityScanner(unittest.TestCase): self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_internal_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer)) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, False, -1) + self.assertAnalyzed(layer, security_scanner, False, -1) - def test_analyze_layer_bad_request(self): + def test_analyze_layer_missing_parent(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) self.assertEquals(-1, layer.security_indexed_engine) - with HTTMock(analyze_layer_badrequest_mock, response_content): + with fake_security_scanner() as security_scanner: + # Analyze the layer and its parents. analyzer = LayerAnalyzer(app.config, self.api) - try: - analyzer.analyze_recursively(layer) - except AnalyzeLayerException: - return + analyzer.analyze_recursively(layer) - self.fail('Expected exception on bad request') + # Make sure it was analyzed. + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Mark the layer as not analyzed and delete its parent layer from the security scanner. + layer.security_indexed_engine = IMAGE_NOT_SCANNED_ENGINE_VERSION + layer.security_indexed = False + layer.save() + + security_scanner.remove_layer(security_scanner.layer_id(layer.parent)) + + # Try to analyze again; this should fail because the parent is missing. + with self.assertRaisesRegexp(AnalyzeLayerException, 'Bad request to security scanner'): + analyzer.analyze_recursively(layer) def test_analyze_layer_missing_storage(self): @@ -263,7 +199,7 @@ class TestSecurityScanner(unittest.TestCase): storage.remove(locations, path) storage.remove(locations, 'all_files_exist') - with HTTMock(analyze_layer_success_mock, response_content): + with fake_security_scanner(): analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) @@ -271,6 +207,7 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(False, layer.security_indexed) self.assertEquals(1, layer.security_indexed_engine) + def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed, expect_notification): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) self.assertFalse(layer.security_indexed) @@ -288,12 +225,23 @@ class TestSecurityScanner(unittest.TestCase): layer.security_indexed = security_indexed layer.save() - with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content): + with fake_security_scanner() as security_scanner: + security_scanner.set_vulns(security_scanner.layer_id(layer), [ + { + "Name": "CVE-2014-9471", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedBy": "9.23-5" + } + ]) + analyzer = LayerAnalyzer(app.config, self.api) analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') - self.assertAnalyzed(layer, True, 1) + self.assertAnalyzed(layer, security_scanner, True, 1) # Ensure an event was written for the tag (if necessary). time.sleep(1) @@ -315,70 +263,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(updated_layer.id, layer.id) self.assertTrue(updated_layer.security_indexed_engine > 0) + def test_analyze_layer_success_events(self): # Not previously indexed at all => Notification self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True) + def test_analyze_layer_success_no_notification(self): # Previously successfully indexed => No notification self.assert_analyze_layer_notify(0, True, False) + def test_analyze_layer_failed_then_success_notification(self): # Previously failed to index => Notification self.assert_analyze_layer_notify(0, False, True) - def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'): - return { - "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", - "Created": "1456247389", - "Notified": "1456246708", - "Limit": 2, - "New": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": new_severity, - "FixedIn": [ - { - "Name": "grep", - "Namespace": "debian:8", - "Version": "2.25" - } - ] - }, - "LayersIntroducingVulnerability": new_layer_ids, - }, - "Old": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": "Low", - "FixedIn": [] - }, - "LayersIntroducingVulnerability": old_layer_ids, - } - } - - - def _get_delete_notification_data(self, old_layer_ids): - return { - "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a", - "Created": "1456247389", - "Notified": "1456246708", - "Limit": 2, - "Old": { - "Vulnerability": { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Description": "New CVE", - "Severity": "Low", - "FixedIn": [] - }, - "LayersIntroducingVulnerability": old_layer_ids, - } - } def test_notification_new_layers_not_vulnerable(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) @@ -388,34 +287,25 @@ class TestSecurityScanner(unittest.TestCase): repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_not_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [], # Report not vulnerable. - } - ] - } - }) - # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_not_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], []) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], [], {}, {}) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_delete(self): @@ -430,11 +320,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - notification_data = self._get_delete_notification_data([layer_id]) - self.assertTrue(process_notification_data(notification_data)) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], None, {}, None) + + # Process the notification. + self.assertTrue(process_notification_data(notification_data)) + + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_new_layers(self): @@ -445,53 +345,47 @@ class TestSecurityScanner(unittest.TestCase): repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [ - { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Severity": "Low", - } - ], - } - ] - } - }) - # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], []) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + "FixedIn": {'Version': "9.23-5"}, + } + security_scanner.set_vulns(layer_id, [vuln_info]) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([], [layer_id], vuln_info, vuln_info) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure an event was written for the tag. - time.sleep(1) - queue_item = notification_queue.get() - self.assertIsNotNone(queue_item) + # Ensure an event was written for the tag. + time.sleep(1) + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) - body = json.loads(queue_item.body) - self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags'])) - self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) - self.assertEquals('Low', body['event_data']['vulnerability']['priority']) - self.assertTrue(body['event_data']['vulnerability']['has_fix']) + item_body = json.loads(queue_item.body) + self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags'])) + self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id']) + self.assertEquals('Low', item_body['event_data']['vulnerability']['priority']) + self.assertTrue(item_body['event_data']['vulnerability']['has_fix']) def test_notification_no_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) @@ -501,12 +395,21 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(response_content): - notification_data = self._get_notification_data([layer_id], [layer_id]) + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([], [], {}, {}) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure that there are no event queue items for the layer. - self.assertIsNone(notification_queue.get()) + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) def test_notification_no_new_layers_increased_severity(self): @@ -515,62 +418,73 @@ class TestSecurityScanner(unittest.TestCase): # Add a repo event for the layer. repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100}) - - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') - def get_matching_layer_vulnerable(url, request): - return json.dumps({ - "Layer": { - "Name": layer_id, - "Namespace": "debian:8", - "IndexedByVersion": 1, - "Features": [ - { - "Name": "coreutils", - "Namespace": "debian:8", - "Version": "8.23-4", - "Vulnerabilities": [ - { - "Name": "CVE-TEST", - "Namespace": "debian:8", - "Severity": "Low", - } - ], - } - ] - } - }) + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 100}) # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) # Fire off the notification processing. - with HTTMock(get_matching_layer_vulnerable, response_content): - notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical') + with fake_security_scanner() as security_scanner: + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, True, 1) + + old_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Low", + } + + new_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Critical", + "FixedIn": {'Version': "9.23-5"}, + } + + security_scanner.set_vulns(layer_id, [new_vuln_info]) + + # Add a notification for the layer. + notification_data = security_scanner.add_notification([layer_id], [layer_id], + old_vuln_info, new_vuln_info) + + # Process the notification. self.assertTrue(process_notification_data(notification_data)) - # Ensure an event was written for the tag. - time.sleep(1) - queue_item = notification_queue.get() - self.assertIsNotNone(queue_item) + # Ensure an event was written for the tag. + time.sleep(1) + queue_item = notification_queue.get() + self.assertIsNotNone(queue_item) - body = json.loads(queue_item.body) - self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags'])) - self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id']) - self.assertEquals('Critical', body['event_data']['vulnerability']['priority']) - self.assertTrue(body['event_data']['vulnerability']['has_fix']) + item_body = json.loads(queue_item.body) + self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags'])) + self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id']) + self.assertEquals('Critical', item_body['event_data']['vulnerability']['priority']) + self.assertTrue(item_body['event_data']['vulnerability']['has_fix']) - # Verify that an event would be raised. - event_data = body['event_data'] - self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Verify that an event would be raised. + event_data = item_body['event_data'] + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) - # Create another notification with a matching level and verify it will be raised. - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1}) - self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Create another notification with a matching level and verify it will be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 1}) + self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification)) - # Create another notification with a higher level and verify it won't be raised. - notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0}) - self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) + # Create another notification with a higher level and verify it won't be raised. + notification = model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, + {'level': 0}) + self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) def test_select_images_to_scan(self): @@ -588,60 +502,61 @@ class TestSecurityScanner(unittest.TestCase): def test_notification_worker(self): - pages_called = [] + layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) + layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') - def delete_notification(url, request): - pages_called.append('DELETE') - return {'status_code': 201, 'content': ''} + # Add a repo events for the layers. + simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO) + complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO) - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') - def get_notification(url, request): - if url.query.find('page=nextpage') >= 0: - pages_called.append('GET-2') - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + model.notification.create_repo_notification(simple_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) + model.notification.create_repo_notification(complex_repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) - data = { - 'Notification': self._get_notification_data([layer_id], [layer_id]), - } + # Ensure that there are no event queue items for the layer. + self.assertIsNone(notification_queue.get()) - return json.dumps(data) - else: - pages_called.append('GET-1') - layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) - layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) - - notification_data = self._get_notification_data([layer_id], [layer_id]) - notification_data['NextPage'] = 'nextpage' - - data = { - 'Notification': notification_data, - } - - return json.dumps(data) - - @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)') - def unknown_notification(url, request): - return {'status_code': 404, 'content': 'Unknown notification'} - - # Test with an unknown notification. - with HTTMock(get_notification, unknown_notification): + with fake_security_scanner() as security_scanner: + # Test with an unknown notification. worker = SecurityNotificationWorker(None) self.assertFalse(worker.perform_notification_work({ 'Name': 'unknownnotification' })) - # Test with a known notification with pages. - data = { - 'Name': 'somenotification' - } + # Add some analyzed layers. + analyzer = LayerAnalyzer(app.config, self.api) + analyzer.analyze_recursively(layer1) + analyzer.analyze_recursively(layer2) + + # Add a notification with pages of data. + new_vuln_info = { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Description": "Some service", + "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", + "Severity": "Critical", + "FixedIn": {'Version': "9.23-5"}, + } + + security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info]) + security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info]) + + layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)] + notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info) + + # Test with a known notification with pages. + data = { + 'Name': notification_data['Name'], + } - with HTTMock(get_notification, delete_notification, unknown_notification): worker = SecurityNotificationWorker(None) - self.assertTrue(worker.perform_notification_work(data)) + self.assertTrue(worker.perform_notification_work(data, layer_limit=1)) - self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called) + # Make sure all pages were processed by ensuring we have two notifications. + time.sleep(1) + self.assertIsNotNone(notification_queue.get()) + self.assertIsNotNone(notification_queue.get()) if __name__ == '__main__': diff --git a/test/testconfig.py b/test/testconfig.py index 02c678200..76192605a 100644 --- a/test/testconfig.py +++ b/test/testconfig.py @@ -60,7 +60,7 @@ class TestConfig(DefaultConfig): FEATURE_SECURITY_SCANNER = True FEATURE_SECURITY_NOTIFICATIONS = True - SECURITY_SCANNER_ENDPOINT = 'http://mockclairservice/' + SECURITY_SCANNER_ENDPOINT = 'http://fakesecurityscanner/' SECURITY_SCANNER_API_VERSION = 'v1' SECURITY_SCANNER_ENGINE_VERSION_TARGET = 1 SECURITY_SCANNER_API_TIMEOUT_SECONDS = 1 diff --git a/util/secscan/api.py b/util/secscan/api.py index cbe97745f..2e04067ea 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -159,11 +159,10 @@ class SecurityScannerAPI(object): except requests.exceptions.ConnectionError: logger.exception('Connection error when trying to post layer data response for %s', layer.id) return None, True - except (requests.exceptions.RequestException, ValueError): + except (requests.exceptions.RequestException, ValueError) as re: logger.exception('Failed to post layer data response for %s', layer.id) return None, False - # Handle any errors from the security scanner. if response.status_code != 201: message = json_response.get('Error').get('Message', '') diff --git a/util/secscan/fake.py b/util/secscan/fake.py new file mode 100644 index 000000000..6b1efc894 --- /dev/null +++ b/util/secscan/fake.py @@ -0,0 +1,264 @@ +import json +import copy +import uuid +import urlparse + +from contextlib import contextmanager +from httmock import urlmatch, HTTMock, all_requests + +@contextmanager +def fake_security_scanner(hostname='fakesecurityscanner'): + """ Context manager which yields a fake security scanner. All requests made to the given + hostname (default: fakesecurityscanner) will be handled by the fake. + """ + scanner = FakeSecurityScanner(hostname) + with HTTMock(*(scanner.get_endpoints())): + yield scanner + + +class FakeSecurityScanner(object): + """ Implements a fake security scanner (with somewhat real responses) for testing API calls and + responses. + """ + def __init__(self, hostname, index_version=1): + self.hostname = hostname + self.index_version = index_version + self.layers = {} + self.notifications = {} + self.layer_vulns = {} + + self.fail_layer_id = None + self.internal_error_layer_id = None + + def set_fail_layer_id(self, fail_layer_id): + """ Sets a layer ID that, if encountered when the analyze call is made, causes a 422 + to be raised. + """ + self.fail_layer_id = fail_layer_id + + def set_internal_error_layer_id(self, internal_error_layer_id): + """ Sets a layer ID that, if encountered when the analyze call is made, causes a 500 + to be raised. + """ + self.internal_error_layer_id = internal_error_layer_id + + def has_layer(self, layer_id): + """ Returns true if the layer with the given ID has been analyzed. """ + return layer_id in self.layers + + def has_notification(self, notification_id): + """ Returns whether a notification with the given ID is found in the scanner. """ + return notification_id in self.notifications + + def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln): + """ Adds a new notification over the given sets of layer IDs and vulnerability information, + returning the structural data of the notification created. + """ + notification_id = str(uuid.uuid4()) + self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids, + new_layer_ids=new_layer_ids, + old_vuln=old_vuln, + new_vuln=new_vuln) + + return self._get_notification_data(notification_id, 0, 100) + + def layer_id(self, layer): + """ Returns the Quay Security Scanner layer ID for the given layer (Image row). """ + return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + def add_layer(self, layer_id): + """ Adds a layer to the security scanner, with no features or vulnerabilities. """ + self.layers[layer_id] = { + "Name": layer_id, + "Format": "Docker", + "IndexedByVersion": self.index_version, + } + + def remove_layer(self, layer_id): + """ Removes a layer from the security scanner. """ + self.layers.pop(layer_id, None) + + def set_vulns(self, layer_id, vulns): + """ Sets the vulnerabilities for the layer with the given ID to those given. """ + self.layer_vulns[layer_id] = vulns + + # Since this call may occur before the layer is "anaylzed", we only add the data + # to the layer itself if present. + if self.layers.get(layer_id): + layer = self.layers[layer_id] + layer['Features'] = layer.get('Features', []) + layer['Features'].append({ + "Name": 'somefeature', + "Namespace": 'somenamespace', + "Version": 'someversion', + "Vulnerabilities": self.layer_vulns[layer_id], + }) + + def _get_notification_data(self, notification_id, page, limit): + """ Returns the structural data for the notification with the given ID, paginated using + the given page and limit. """ + notification = self.notifications[notification_id] + notification_data = { + "Name": notification_id, + "Created": "1456247389", + "Notified": "1456246708", + "Limit": limit, + } + + start_index = (page*limit) + end_index = ((page+1)*limit) + has_additional_page = False + + if notification.get('old_vuln'): + old_layer_ids = notification['old_layer_ids'] + old_layer_ids = old_layer_ids[start_index:end_index] + has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:])) + + notification_data['Old'] = { + 'Vulnerability': notification['old_vuln'], + 'LayersIntroducingVulnerability': old_layer_ids, + } + + if notification.get('new_vuln'): + new_layer_ids = notification['new_layer_ids'] + new_layer_ids = new_layer_ids[start_index:end_index] + has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:])) + + notification_data['New'] = { + 'Vulnerability': notification['new_vuln'], + 'LayersIntroducingVulnerability': new_layer_ids, + } + + if has_additional_page: + notification_data['NextPage'] = str(page+1) + + return notification_data + + def get_endpoints(self): + """ Returns the HTTMock endpoint definitions for the fake security scanner. """ + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET') + def get_layer_mock(url, request): + layer_id = url.path[len('/v1/layers/'):] + if layer_id == self.internal_error_layer_id: + return { + 'status_code': 500, + 'content': json.dumps({'Error': {'Message': 'Internal server error'}}), + } + + if not layer_id in self.layers: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown layer'}}), + } + + layer_data = copy.deepcopy(self.layers[layer_id]) + + has_vulns = request.url.find('vulnerabilities') > 0 + has_features = request.url.find('features') > 0 + if not has_vulns and not has_features: + layer_data.pop('Features', None) + + return { + 'status_code': 200, + 'content': json.dumps({'Layer': layer_data}), + } + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST') + def post_layer_mock(_, request): + body_data = json.loads(request.body) + if not 'Layer' in body_data: + return {'status_code': 400, 'content': 'Missing body'} + + layer = body_data['Layer'] + if not 'Path' in layer: + return {'status_code': 400, 'content': 'Missing Path'} + + if not 'Name' in layer: + return {'status_code': 400, 'content': 'Missing Name'} + + if not 'Format' in layer: + return {'status_code': 400, 'content': 'Missing Format'} + + if layer['Name'] == self.internal_error_layer_id: + return { + 'status_code': 500, + 'content': json.dumps({'Error': {'Message': 'Internal server error'}}), + } + + if layer['Name'] == self.fail_layer_id: + return { + 'status_code': 422, + 'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}), + } + + parent_id = layer.get('ParentName', None) + parent_layer = None + + if parent_id is not None: + parent_layer = self.layers.get(parent_id, None) + if parent_layer is None: + return { + 'status_code': 400, + 'content': json.dumps({'Error': {'Message': 'Unknown parent'}}), + } + + self.add_layer(layer['Name']) + if parent_layer is not None: + self.layers[layer['Name']]['ParentName'] = parent_id + + # If vulnerabilities have already been registered with this layer, call set_vulns to make sure + # their data is added to the layer's data. + if self.layer_vulns.get(layer['Name']): + self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']]) + + return { + 'status_code': 201, + 'content': json.dumps({ + "Layer": self.layers[layer['Name']], + }), + } + + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE') + def delete_notification(url, _): + notification_id = url.path[len('/v1/notifications/'):] + if notification_id not in self.notifications: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown notification'}}), + } + + self.notifications.pop(notification_id) + return { + 'status_code': 204, + 'content': '', + } + + + @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET') + def get_notification(url, _): + notification_id = url.path[len('/v1/notifications/'):] + if notification_id not in self.notifications: + return { + 'status_code': 404, + 'content': json.dumps({'Error': {'Message': 'Unknown notification'}}), + } + + query_params = urlparse.parse_qs(url.query) + limit = int(query_params.get('limit', [2])[0]) + page = int(query_params.get('page', [0])[0]) + + notification_data = self._get_notification_data(notification_id, page, limit) + response = {'Notification': notification_data} + return { + 'status_code': 200, + 'content': json.dumps(response), + } + + @all_requests + def response_content(url, _): + raise Exception('Unknown endpoint: ' + str(url)) + + return [get_layer_mock, post_layer_mock, get_notification, delete_notification, + response_content] diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index 6b135ebaf..7717048df 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -20,7 +20,7 @@ class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): self.perform_notification_work(data) - def perform_notification_work(self, data): + def perform_notification_work(self, data, layer_limit=_LAYER_LIMIT): """ Performs the work for handling a security notification as referenced by the given data object. Returns True on successful handling, False on non-retryable failure and raises a JobException on retryable failure. @@ -31,7 +31,7 @@ class SecurityNotificationWorker(QueueWorker): while True: (response_data, should_retry) = secscan_api.get_notification(notification_name, - layer_limit=_LAYER_LIMIT, + layer_limit=layer_limit, page=current_page) if response_data is None: if should_retry: