Fix whitespace
This commit is contained in:
parent
5b3212ea0e
commit
001691e579
1 changed files with 1 additions and 26 deletions
|
@ -64,7 +64,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
||||||
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent)))
|
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent)))
|
||||||
|
|
||||||
|
|
||||||
def test_get_layer(self):
|
def test_get_layer(self):
|
||||||
""" Test for basic retrieval of layers from the security scanner. """
|
""" Test for basic retrieval of layers from the security scanner. """
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
|
@ -82,7 +81,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer))
|
self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer))
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_nodirectdownload_success(self):
|
def test_analyze_layer_nodirectdownload_success(self):
|
||||||
""" Tests analyzing a layer when direct download is disabled. """
|
""" Tests analyzing a layer when direct download is disabled. """
|
||||||
|
|
||||||
|
@ -121,7 +119,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, True, 1)
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_success(self):
|
def test_analyze_layer_success(self):
|
||||||
""" Tests that analyzing a layer successfully marks it as analyzed. """
|
""" Tests that analyzing a layer successfully marks it as analyzed. """
|
||||||
|
|
||||||
|
@ -136,7 +133,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, True, 1)
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_failure(self):
|
def test_analyze_layer_failure(self):
|
||||||
""" Tests that failing to analyze a layer (because it 422s) marks it as analyzed but failed. """
|
""" Tests that failing to analyze a layer (because it 422s) marks it as analyzed but failed. """
|
||||||
|
|
||||||
|
@ -153,7 +149,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_internal_error(self):
|
def test_analyze_layer_internal_error(self):
|
||||||
""" Tests that failing to analyze a layer (because it 500s) marks it as not analyzed. """
|
""" Tests that failing to analyze a layer (because it 500s) marks it as not analyzed. """
|
||||||
|
|
||||||
|
@ -170,7 +165,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, -1)
|
self.assertAnalyzed(layer, security_scanner, False, -1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_error(self):
|
def test_analyze_layer_error(self):
|
||||||
""" Tests that failing to analyze a layer (because it 400s) marks it as analyzed but failed. """
|
""" Tests that failing to analyze a layer (because it 400s) marks it as analyzed but failed. """
|
||||||
|
|
||||||
|
@ -190,7 +184,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_missing_parent_handled(self):
|
def test_analyze_layer_missing_parent_handled(self):
|
||||||
""" Tests that a missing parent causes an automatic reanalysis, which succeeds. """
|
""" Tests that a missing parent causes an automatic reanalysis, which succeeds. """
|
||||||
|
|
||||||
|
@ -221,7 +214,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, True, 1)
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_invalid_parent(self):
|
def test_analyze_layer_invalid_parent(self):
|
||||||
""" Tests that trying to reanalyze a parent that is invalid causes the layer to be marked
|
""" Tests that trying to reanalyze a parent that is invalid causes the layer to be marked
|
||||||
as analyzed, but failed.
|
as analyzed, but failed.
|
||||||
|
@ -257,7 +249,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_unsupported_parent(self):
|
def test_analyze_layer_unsupported_parent(self):
|
||||||
""" Tests that attempting to analyze a layer whose parent is unanalyzable, results in the layer
|
""" Tests that attempting to analyze a layer whose parent is unanalyzable, results in the layer
|
||||||
being marked as analyzed, but failed.
|
being marked as analyzed, but failed.
|
||||||
|
@ -278,7 +269,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_missing_storage(self):
|
def test_analyze_layer_missing_storage(self):
|
||||||
""" Tests trying to analyze a layer with missing storage. """
|
""" Tests trying to analyze a layer with missing storage. """
|
||||||
|
|
||||||
|
@ -299,7 +289,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||||
self.assertAnalyzed(layer, security_scanner, False, 1)
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
||||||
|
|
||||||
|
|
||||||
def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed,
|
def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed,
|
||||||
expect_notification):
|
expect_notification):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
|
@ -357,22 +346,18 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertEquals(updated_layer.id, layer.id)
|
self.assertEquals(updated_layer.id, layer.id)
|
||||||
self.assertTrue(updated_layer.security_indexed_engine > 0)
|
self.assertTrue(updated_layer.security_indexed_engine > 0)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_success_events(self):
|
def test_analyze_layer_success_events(self):
|
||||||
# Not previously indexed at all => Notification
|
# Not previously indexed at all => Notification
|
||||||
self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True)
|
self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_success_no_notification(self):
|
def test_analyze_layer_success_no_notification(self):
|
||||||
# Previously successfully indexed => No notification
|
# Previously successfully indexed => No notification
|
||||||
self.assert_analyze_layer_notify(0, True, False)
|
self.assert_analyze_layer_notify(0, True, False)
|
||||||
|
|
||||||
|
|
||||||
def test_analyze_layer_failed_then_success_notification(self):
|
def test_analyze_layer_failed_then_success_notification(self):
|
||||||
# Previously failed to index => Notification
|
# Previously failed to index => Notification
|
||||||
self.assert_analyze_layer_notify(0, False, True)
|
self.assert_analyze_layer_notify(0, False, True)
|
||||||
|
|
||||||
|
|
||||||
def test_notification_new_layers_not_vulnerable(self):
|
def test_notification_new_layers_not_vulnerable(self):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||||
|
@ -402,7 +387,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
# Ensure that there are no event queue items for the layer.
|
# Ensure that there are no event queue items for the layer.
|
||||||
self.assertIsNone(notification_queue.get())
|
self.assertIsNone(notification_queue.get())
|
||||||
|
|
||||||
|
|
||||||
def test_notification_delete(self):
|
def test_notification_delete(self):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||||
|
@ -432,7 +416,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
# Ensure that there are no event queue items for the layer.
|
# Ensure that there are no event queue items for the layer.
|
||||||
self.assertIsNone(notification_queue.get())
|
self.assertIsNone(notification_queue.get())
|
||||||
|
|
||||||
|
|
||||||
def test_notification_new_layers(self):
|
def test_notification_new_layers(self):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||||
|
@ -459,7 +442,7 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
"Description": "Some service",
|
"Description": "Some service",
|
||||||
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
||||||
"Severity": "Low",
|
"Severity": "Low",
|
||||||
"FixedIn": {'Version': "9.23-5"},
|
"FixedIn": {"Version": "9.23-5"},
|
||||||
}
|
}
|
||||||
security_scanner.set_vulns(layer_id, [vuln_info])
|
security_scanner.set_vulns(layer_id, [vuln_info])
|
||||||
|
|
||||||
|
@ -480,7 +463,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertEquals('Low', item_body['event_data']['vulnerability']['priority'])
|
self.assertEquals('Low', item_body['event_data']['vulnerability']['priority'])
|
||||||
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
||||||
|
|
||||||
|
|
||||||
def test_notification_no_new_layers(self):
|
def test_notification_no_new_layers(self):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
|
|
||||||
|
@ -509,7 +491,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
# Ensure that there are no event queue items for the layer.
|
# Ensure that there are no event queue items for the layer.
|
||||||
self.assertIsNone(notification_queue.get())
|
self.assertIsNone(notification_queue.get())
|
||||||
|
|
||||||
|
|
||||||
def test_notification_no_new_layers_increased_severity(self):
|
def test_notification_no_new_layers_increased_severity(self):
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||||
|
@ -584,7 +565,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
{'level': 0})
|
{'level': 0})
|
||||||
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
||||||
|
|
||||||
|
|
||||||
def test_select_images_to_scan(self):
|
def test_select_images_to_scan(self):
|
||||||
# Set all images to have a security index of a version to that of the config.
|
# Set all images to have a security index of a version to that of the config.
|
||||||
expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
||||||
|
@ -598,7 +578,6 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1))
|
self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1))
|
||||||
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0)
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0)
|
||||||
|
|
||||||
|
|
||||||
def test_notification_worker(self):
|
def test_notification_worker(self):
|
||||||
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
||||||
|
@ -656,17 +635,14 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertIsNotNone(notification_queue.get())
|
self.assertIsNotNone(notification_queue.get())
|
||||||
self.assertIsNotNone(notification_queue.get())
|
self.assertIsNotNone(notification_queue.get())
|
||||||
|
|
||||||
|
|
||||||
def test_notification_worker_offset_pages_not_indexed(self):
|
def test_notification_worker_offset_pages_not_indexed(self):
|
||||||
# Try without indexes.
|
# Try without indexes.
|
||||||
self.assert_notification_worker_offset_pages(indexed=False)
|
self.assert_notification_worker_offset_pages(indexed=False)
|
||||||
|
|
||||||
|
|
||||||
def test_notification_worker_offset_pages_indexed(self):
|
def test_notification_worker_offset_pages_indexed(self):
|
||||||
# Try with indexes.
|
# Try with indexes.
|
||||||
self.assert_notification_worker_offset_pages(indexed=True)
|
self.assert_notification_worker_offset_pages(indexed=True)
|
||||||
|
|
||||||
|
|
||||||
def assert_notification_worker_offset_pages(self, indexed=False):
|
def assert_notification_worker_offset_pages(self, indexed=False):
|
||||||
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
||||||
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
||||||
|
@ -750,6 +726,5 @@ class TestSecurityScanner(unittest.TestCase):
|
||||||
self.assertIsNone(notification_queue.get())
|
self.assertIsNone(notification_queue.get())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Reference in a new issue