diff --git a/test/test_secscan.py b/test/test_secscan.py index db466689a..95e74d53f 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -160,7 +160,7 @@ class TestSecurityScanner(unittest.TestCase): security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer)) analyzer = LayerAnalyzer(app.config, self.api) - with self.assertRaises(APIRequestFailure) as ctx: + with self.assertRaises(APIRequestFailure): analyzer.analyze_recursively(layer) layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') @@ -185,6 +185,27 @@ class TestSecurityScanner(unittest.TestCase): layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') self.assertAnalyzed(layer, security_scanner, False, 1) + def test_analyze_layer_unexpected_status(self): + """ Tests that a response from a scanner with an unexpected status code fails correctly. """ + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True) + self.assertFalse(layer.security_indexed) + self.assertEquals(-1, layer.security_indexed_engine) + + with fake_security_scanner() as security_scanner: + # Make is so trying to analyze the parent will fail with an error. + security_scanner.set_unexpected_status_layer_id(security_scanner.layer_id(layer.parent)) + + # Try to the layer and its parents, but with one request causing an error. + analyzer = LayerAnalyzer(app.config, self.api) + with self.assertRaises(APIRequestFailure): + analyzer.analyze_recursively(layer) + + # Make sure it isn't analyzed. + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest') + self.assertAnalyzed(layer, security_scanner, False, -1) + + def test_analyze_layer_missing_parent_handled(self): """ Tests that a missing parent causes an automatic reanalysis, which succeeds. """ diff --git a/util/secscan/fake.py b/util/secscan/fake.py index 849d0c2ca..69fc30cc2 100644 --- a/util/secscan/fake.py +++ b/util/secscan/fake.py @@ -33,6 +33,7 @@ class FakeSecurityScanner(object): self.fail_layer_id = None self.internal_error_layer_id = None self.error_layer_id = None + self.unexpected_status_layer_id = None def set_ok_layer_id(self, ok_layer_id): """ Sets a layer ID that, if encountered when the analyze call is made, causes a 200 @@ -58,6 +59,12 @@ class FakeSecurityScanner(object): """ self.error_layer_id = error_layer_id + def set_unexpected_status_layer_id(self, layer_id): + """ Sets a layer ID that, if encountered when the analyze call is made, causes an HTTP 600 + to be raised. This is useful in testing the robustness of the to unknown status codes. + """ + self.unexpected_status_layer_id = layer_id + def has_layer(self, layer_id): """ Returns true if the layer with the given ID has been analyzed. """ return layer_id in self.layers @@ -252,6 +259,13 @@ class FakeSecurityScanner(object): 'content': json.dumps({'Error': {'Message': 'Some sort of error'}}), } + if layer['Name'] == self.unexpected_status_layer_id: + return { + 'status_code': 600, + 'content': json.dumps({'Error': {'Message': 'Some sort of error'}}), + } + + parent_id = layer.get('ParentName', None) parent_layer = None