diff --git a/test/test_secscan.py b/test/test_secscan.py index dce1ac13e..7a40d81a9 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -64,7 +64,6 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(engineVersion, parent.security_indexed_engine) self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent))) - def test_get_layer(self): """ Test for basic retrieval of layers from the security scanner. """ layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) @@ -82,7 +81,6 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNotNone(result) self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer)) - def test_analyze_layer_nodirectdownload_success(self): """ Tests analyzing a layer when direct download is disabled. """ @@ -121,7 +119,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, True, 1) - def test_analyze_layer_success(self): """ Tests that analyzing a layer successfully marks it as analyzed. """ @@ -136,7 +133,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, True, 1) - def test_analyze_layer_failure(self): """ Tests that failing to analyze a layer (because it 422s) marks it as analyzed but failed. """ @@ -153,7 +149,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) - def test_analyze_layer_internal_error(self): """ Tests that failing to analyze a layer (because it 500s) marks it as not analyzed. """ @@ -170,7 +165,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, -1) - def test_analyze_layer_error(self): """ Tests that failing to analyze a layer (because it 400s) marks it as analyzed but failed. """ @@ -190,7 +184,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) - def test_analyze_layer_missing_parent_handled(self): """ Tests that a missing parent causes an automatic reanalysis, which succeeds. """ @@ -221,7 +214,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, True, 1) - def test_analyze_layer_invalid_parent(self): """ Tests that trying to reanalyze a parent that is invalid causes the layer to be marked as analyzed, but failed. @@ -257,7 +249,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) - def test_analyze_layer_unsupported_parent(self): """ Tests that attempting to analyze a layer whose parent is unanalyzable, results in the layer being marked as analyzed, but failed. @@ -278,7 +269,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) - def test_analyze_layer_missing_storage(self): """ Tests trying to analyze a layer with missing storage. """ @@ -299,7 +289,6 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) - def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed, expect_notification): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) @@ -357,22 +346,18 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals(updated_layer.id, layer.id) self.assertTrue(updated_layer.security_indexed_engine > 0) - def test_analyze_layer_success_events(self): # Not previously indexed at all => Notification self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True) - def test_analyze_layer_success_no_notification(self): # Previously successfully indexed => No notification self.assert_analyze_layer_notify(0, True, False) - def test_analyze_layer_failed_then_success_notification(self): # Previously failed to index => Notification self.assert_analyze_layer_notify(0, False, True) - def test_notification_new_layers_not_vulnerable(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) @@ -402,7 +387,6 @@ class TestSecurityScanner(unittest.TestCase): # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) - def test_notification_delete(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) @@ -432,7 +416,6 @@ class TestSecurityScanner(unittest.TestCase): # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) - def test_notification_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) @@ -459,7 +442,7 @@ class TestSecurityScanner(unittest.TestCase): "Description": "Some service", "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471", "Severity": "Low", - "FixedIn": {'Version': "9.23-5"}, + "FixedIn": {"Version": "9.23-5"}, } security_scanner.set_vulns(layer_id, [vuln_info]) @@ -480,7 +463,6 @@ class TestSecurityScanner(unittest.TestCase): self.assertEquals('Low', item_body['event_data']['vulnerability']['priority']) self.assertTrue(item_body['event_data']['vulnerability']['has_fix']) - def test_notification_no_new_layers(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) @@ -509,7 +491,6 @@ class TestSecurityScanner(unittest.TestCase): # Ensure that there are no event queue items for the layer. self.assertIsNone(notification_queue.get()) - def test_notification_no_new_layers_increased_severity(self): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) @@ -584,7 +565,6 @@ class TestSecurityScanner(unittest.TestCase): {'level': 0}) self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification)) - def test_select_images_to_scan(self): # Set all images to have a security index of a version to that of the config. expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET'] @@ -598,7 +578,6 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1)) self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0) - def test_notification_worker(self): layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) @@ -656,17 +635,14 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNotNone(notification_queue.get()) self.assertIsNotNone(notification_queue.get()) - def test_notification_worker_offset_pages_not_indexed(self): # Try without indexes. self.assert_notification_worker_offset_pages(indexed=False) - def test_notification_worker_offset_pages_indexed(self): # Try with indexes. self.assert_notification_worker_offset_pages(indexed=True) - def assert_notification_worker_offset_pages(self, indexed=False): layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True) @@ -750,6 +726,5 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNone(notification_queue.get()) - if __name__ == '__main__': unittest.main()