Refactor the security worker and API calls and add a bunch of tests

This commit is contained in:
Joseph Schorr 2016-02-24 16:01:27 -05:00
parent 0183c519f7
commit c0374d71c9
17 changed files with 811 additions and 456 deletions

View file

@ -49,7 +49,7 @@ from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserMana
SuperUserSendRecoveryEmail, ChangeLog,
SuperUserOrganizationManagement, SuperUserOrganizationList,
SuperUserAggregateLogs)
from endpoints.api.secscan import RepositoryImagePackages, RepositoryImageVulnerabilities
from endpoints.api.secscan import RepositoryImageSecurity
try:
@ -4170,28 +4170,10 @@ class TestOrganizationInvoiceField(ApiTestCase):
self._run_test('DELETE', 201, 'devtable', None)
class TestRepositoryImageVulnerabilities(ApiTestCase):
class TestRepositoryImageSecurity(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(RepositoryImageVulnerabilities, repository='devtable/simple', imageid='fake')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)
def test_get_freshuser(self):
self._run_test('GET', 403, 'freshuser', None)
def test_get_reader(self):
self._run_test('GET', 403, 'reader', None)
def test_get_devtable(self):
self._run_test('GET', 404, 'devtable', None)
class TestRepositoryImagePackages(ApiTestCase):
def setUp(self):
ApiTestCase.setUp(self)
self._set_url(RepositoryImagePackages, repository='devtable/simple', imageid='fake')
self._set_url(RepositoryImageSecurity, repository='devtable/simple', imageid='fake')
def test_get_anonymous(self):
self._run_test('GET', 401, None, None)

View file

@ -11,6 +11,7 @@ from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from playhouse.test_utils import assert_query_count, _QueryLogHandler
from httmock import urlmatch, HTTMock
from endpoints.api import api_bp, api
from endpoints.building import PreparedBuild
@ -52,6 +53,7 @@ from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repos
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import SuperUserLogs, SuperUserList, SuperUserManagement
from endpoints.api.secscan import RepositoryImageSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser)
@ -3430,6 +3432,75 @@ class TestSuperUserConfig(ApiTestCase):
self.assertTrue(json['exists'])
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
def get_layer_success_mock(url, request):
vulnerabilities = [
{
"Name": "CVE-2014-9471",
"Namespace": "debian:8",
"Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.",
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
"Severity": "Low",
"FixedBy": "9.23-5"
}
]
features = [
{
"Name": "coreutils",
"Namespace": "debian:8",
"Version": "8.23-4",
"Vulnerabilities": vulnerabilities,
}
]
if not request.url.endswith('?vulnerabilities'):
vulnerabilities = []
if not request.url.endswith('?features'):
features = []
return py_json.dumps({
"Layer": {
"Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52",
"Namespace": "debian:8",
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
"IndexedByVersion": 1,
"Features": features
}
})
class TestRepositoryImageSecurity(ApiTestCase):
def test_get_vulnerabilities(self):
self.login(ADMIN_ACCESS_USER)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
# Grab the security info for the tag. It should be queued.
response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals('queued', response['status'])
# Mark the layer as indexed.
layer.security_indexed = True
layer.security_indexed_engine = app.config['SECURITY_SCANNER']['ENGINE_VERSION_TARGET']
layer.save()
# Grab the security info again.
with HTTMock(get_layer_success_mock):
response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals('scanned', response['status'])
self.assertEquals(1, response['data']['Layer']['IndexedByVersion'])
class TestSuperUserManagement(ApiTestCase):
def test_get_user(self):
self.login(ADMIN_ACCESS_USER)

242
test/test_secscan.py Normal file
View file

@ -0,0 +1,242 @@
import unittest
import json
from httmock import urlmatch, all_requests, HTTMock
from app import app, config_provider, storage, notification_queue
from initdb import setup_database_for_testing, finished_database_for_testing
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
from util.secscan.analyzer import LayerAnalyzer
from data import model
ADMIN_ACCESS_USER = 'devtable'
SIMPLE_REPO = 'simple'
_PORT_NUMBER = 5001
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
def get_layer_failure_mock(url, request):
return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
def analyze_layer_badrequest_mock(url, request):
return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
def analyze_layer_internal_mock(url, request):
return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
def analyze_layer_failure_mock(url, request):
return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
def analyze_layer_success_mock(url, request):
return {'status_code': 201, 'content': json.dumps({
"Layer": {
"Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6",
"Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar",
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
"Format": "Docker",
"IndexedByVersion": 1
}
})}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
def get_layer_success_mock(url, request):
vulnerabilities = [
{
"Name": "CVE-2014-9471",
"Namespace": "debian:8",
"Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.",
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
"Severity": "Low",
"FixedBy": "9.23-5"
}
]
features = [
{
"Name": "coreutils",
"Namespace": "debian:8",
"Version": "8.23-4",
"Vulnerabilities": vulnerabilities,
}
]
if not request.url.endswith('?vulnerabilities'):
vulnerabilities = []
if not request.url.endswith('?features'):
features = []
return json.dumps({
"Layer": {
"Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52",
"Namespace": "debian:8",
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
"IndexedByVersion": 1,
"Features": features
}
})
@all_requests
def response_content(url, request):
return {'status_code': 500, 'content': 'Unknown endpoint'}
class TestSecurityScanner(unittest.TestCase):
def setUp(self):
# Enable direct download in fake storage.
storage.put_content(['local_us'], 'supports_direct_download', 'true')
# Setup the database with fake storage.
setup_database_for_testing(self, with_storage=True, force_rebuild=True)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.api = SecurityScannerAPI(app.config, config_provider, storage)
def tearDown(self):
storage.put_content(['local_us'], 'supports_direct_download', 'false')
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def assertAnalyzed(self, layer, isAnalyzed, engineVersion):
self.assertEquals(isAnalyzed, layer.security_indexed)
self.assertEquals(engineVersion, layer.security_indexed_engine)
# Ensure all parent layers are marked as analyzed.
parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
for parent in parents:
self.assertEquals(isAnalyzed, parent.security_indexed)
self.assertEquals(engineVersion, parent.security_indexed_engine)
def test_get_layer_success(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
with HTTMock(get_layer_success_mock, response_content):
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
self.assertIsNotNone(result)
self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52')
def test_get_layer_failure(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
with HTTMock(get_layer_failure_mock, response_content):
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
self.assertIsNone(result)
def test_analyze_layer_success(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, True, 1)
def test_analyze_layer_failure(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
with HTTMock(analyze_layer_failure_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, False, 1)
def test_analyze_layer_internal_error(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
with HTTMock(analyze_layer_internal_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, False, -1)
def test_analyze_layer_bad_request(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
with HTTMock(analyze_layer_badrequest_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
try:
analyzer.analyze_recursively(layer)
except AnalyzeLayerException:
return
self.fail('Expected exception on bad request')
def test_analyze_layer_missing_storage(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
# Delete the storage for the layer.
path = model.storage.get_layer_path(layer.storage)
locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
storage.remove(locations, path)
with HTTMock(analyze_layer_success_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertEquals(False, layer.security_indexed)
self.assertEquals(1, layer.security_indexed_engine)
def test_analyze_layer_success_events(self):
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertFalse(layer.security_indexed)
self.assertEquals(-1, layer.security_indexed_engine)
# Ensure there are no existing events.
self.assertIsNone(notification_queue.get())
# Add a repo event for the layer.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
analyzer = LayerAnalyzer(app.config, self.api)
analyzer.analyze_recursively(layer)
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
self.assertAnalyzed(layer, True, 1)
# Ensure an event was written for the tag.
queue_item = notification_queue.get()
self.assertIsNotNone(queue_item)
body = json.loads(queue_item.body)
self.assertEquals(['latest', 'prod'], body['event_data']['tags'])
self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
if __name__ == '__main__':
unittest.main()

View file

@ -26,7 +26,7 @@ class TestConfig(DefaultConfig):
DB_URI = os.environ.get('TEST_DATABASE_URI', 'sqlite:///{0}'.format(TEST_DB_FILE.name))
DB_CONNECTION_ARGS = {
'threadlocals': True,
'autorollback': True
'autorollback': True,
}
@staticmethod
@ -59,7 +59,8 @@ class TestConfig(DefaultConfig):
FEATURE_SECURITY_SCANNER = True
SECURITY_SCANNER = {
'ENDPOINT': 'http://localhost/some/invalid/path',
'ENDPOINT': 'http://mockclairservice/',
'API_VERSION': 'v1',
'ENGINE_VERSION_TARGET': 1,
'API_CALL_TIMEOUT': 1
}