test.test_api_usage: fix secscan tests
This commit is contained in:
parent
3286566478
commit
1d6339e644
2 changed files with 27 additions and 25 deletions
|
@ -4326,34 +4326,36 @@ class TestRepositoryImageSecurity(ApiTestCase):
|
||||||
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
|
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
|
||||||
|
|
||||||
def test_get_vulnerabilities_read_failover(self):
|
def test_get_vulnerabilities_read_failover(self):
|
||||||
with ConfigForTesting():
|
self.login(ADMIN_ACCESS_USER)
|
||||||
self.login(ADMIN_ACCESS_USER)
|
|
||||||
|
|
||||||
# Get a layer and mark it as indexed.
|
# Get a layer and mark it as indexed.
|
||||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
|
||||||
layer.security_indexed = True
|
layer.security_indexed = True
|
||||||
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
||||||
layer.save()
|
layer.save()
|
||||||
|
|
||||||
with fake_security_scanner(hostname='failoverscanner') as security_scanner:
|
with fake_security_scanner(hostname='failoverscanner') as security_scanner:
|
||||||
# Query the wrong security scanner URL without failover.
|
# Query the wrong security scanner URL without failover.
|
||||||
self.getResponse(RepositoryImageSecurity,
|
self.getResponse(RepositoryImageSecurity,
|
||||||
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
||||||
imageid=layer.docker_image_id, vulnerabilities='true'),
|
imageid=layer.docker_image_id, vulnerabilities='true'),
|
||||||
expected_code=520)
|
expected_code=520)
|
||||||
|
|
||||||
# Set the failover URL.
|
# Set the failover URL in the global config.
|
||||||
app.config['SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS'] = ['http://failoverscanner']
|
app.config['SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS'] = ['http://failoverscanner']
|
||||||
|
|
||||||
# Configure the API to return 200 for this layer.
|
# Configure the API to return 200 for this layer.
|
||||||
layer_id = security_scanner.layer_id(layer)
|
layer_id = security_scanner.layer_id(layer)
|
||||||
security_scanner.set_ok_layer_id(layer_id)
|
security_scanner.set_ok_layer_id(layer_id)
|
||||||
|
|
||||||
# Call the API and succeed on failover.
|
# Call the API and succeed on failover.
|
||||||
self.getResponse(RepositoryImageSecurity,
|
self.getResponse(RepositoryImageSecurity,
|
||||||
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
||||||
imageid=layer.docker_image_id, vulnerabilities='true'),
|
imageid=layer.docker_image_id, vulnerabilities='true'),
|
||||||
expected_code=200)
|
expected_code=200)
|
||||||
|
|
||||||
|
# Remove the failover endpoints from the global config.
|
||||||
|
app.config['SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS'] = []
|
||||||
|
|
||||||
|
|
||||||
class TestSuperUserCustomCertificates(ApiTestCase):
|
class TestSuperUserCustomCertificates(ApiTestCase):
|
||||||
|
|
|
@ -5,6 +5,7 @@ import urlparse
|
||||||
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from httmock import urlmatch, HTTMock, all_requests
|
from httmock import urlmatch, HTTMock, all_requests
|
||||||
|
|
||||||
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id
|
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -170,7 +171,6 @@ class FakeSecurityScanner(object):
|
||||||
|
|
||||||
def get_endpoints(self):
|
def get_endpoints(self):
|
||||||
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
|
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
|
||||||
|
|
||||||
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
|
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
|
||||||
def get_layer_mock(url, request):
|
def get_layer_mock(url, request):
|
||||||
layer_id = url.path[len('/v1/layers/'):]
|
layer_id = url.path[len('/v1/layers/'):]
|
||||||
|
@ -320,7 +320,7 @@ class FakeSecurityScanner(object):
|
||||||
def response_content(url, _):
|
def response_content(url, _):
|
||||||
return {
|
return {
|
||||||
'status_code': 500,
|
'status_code': 500,
|
||||||
'content': '',
|
'content': json.dumps({'Error': {'Message': 'Unknown endpoint %s' % url.path}}),
|
||||||
}
|
}
|
||||||
|
|
||||||
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
|
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
|
||||||
|
|
Reference in a new issue