test: add qss read failover case

This commit is contained in:
Jimmy Zelinskie 2017-02-03 16:04:55 -05:00
parent dd033e4feb
commit c2c6bc1e90
2 changed files with 47 additions and 1 deletions

View file

@ -4309,6 +4309,36 @@ class TestRepositoryImageSecurity(ApiTestCase):
self.assertEquals('scanned', image_response['status']) self.assertEquals('scanned', image_response['status'])
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion']) self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
def test_get_vulnerabilities_read_failover(self):
with ConfigForTesting():
self.login(ADMIN_ACCESS_USER)
# Get a layer and mark it as indexed.
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
layer.security_indexed = True
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
layer.save()
with fake_security_scanner(hostname='failoverscanner') as security_scanner:
# Query the wrong security scanner URL without failover.
self.getResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, vulnerabilities='true'),
expected_code=520)
# Set the failover URL.
app.config['SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS'] = ['http://failoverscanner']
# Configure the API to return 200 for this layer.
layer_id = security_scanner.layer_id(layer)
security_scanner.set_ok_layer_id(layer_id)
# Call the API and succeed on failover.
self.getResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, vulnerabilities='true'),
expected_code=200)
class TestSuperUserCustomCertificates(ApiTestCase): class TestSuperUserCustomCertificates(ApiTestCase):
def test_custom_certificates(self): def test_custom_certificates(self):

View file

@ -28,10 +28,17 @@ class FakeSecurityScanner(object):
self.notifications = {} self.notifications = {}
self.layer_vulns = {} self.layer_vulns = {}
self.ok_layer_id = None
self.fail_layer_id = None self.fail_layer_id = None
self.internal_error_layer_id = None self.internal_error_layer_id = None
self.error_layer_id = None self.error_layer_id = None
def set_ok_layer_id(self, ok_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 200
to be immediately returned.
"""
self.ok_layer_id = ok_layer_id
def set_fail_layer_id(self, fail_layer_id): def set_fail_layer_id(self, fail_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 422 """ Sets a layer ID that, if encountered when the analyze call is made, causes a 422
to be raised. to be raised.
@ -167,6 +174,12 @@ class FakeSecurityScanner(object):
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET') @urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
def get_layer_mock(url, request): def get_layer_mock(url, request):
layer_id = url.path[len('/v1/layers/'):] layer_id = url.path[len('/v1/layers/'):]
if layer_id == self.ok_layer_id:
return {
'status_code': 200,
'content': json.dumps({'Layer': {}}),
}
if layer_id == self.internal_error_layer_id: if layer_id == self.internal_error_layer_id:
return { return {
'status_code': 500, 'status_code': 500,
@ -305,7 +318,10 @@ class FakeSecurityScanner(object):
@all_requests @all_requests
def response_content(url, _): def response_content(url, _):
raise Exception('Unknown endpoint: ' + str(url)) return {
'status_code': 500,
'content': '',
}
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification, return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
delete_notification, response_content] delete_notification, response_content]