c75fcfbd5e
Fixes #1272
454 lines
No EOL
16 KiB
Python
454 lines
No EOL
16 KiB
Python
import unittest
|
|
import json
|
|
import os
|
|
from httmock import urlmatch, all_requests, HTTMock
|
|
|
|
from app import app, config_provider, storage, notification_queue
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
|
from util.secscan.analyzer import LayerAnalyzer
|
|
from util.secscan.notifier import process_notification_data
|
|
from data import model
|
|
|
|
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
SIMPLE_REPO = 'simple'
|
|
|
|
_PORT_NUMBER = 5001
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_layer_failure_mock(url, request):
|
|
return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_badrequest_mock(url, request):
|
|
return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_internal_mock(url, request):
|
|
return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_failure_mock(url, request):
|
|
return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_success_mock(url, request):
|
|
body_data = json.loads(request.body)
|
|
if not 'Layer' in body_data:
|
|
return {'status_code': 400, 'content': 'Missing body'}
|
|
|
|
layer = body_data['Layer']
|
|
if not 'Path' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Path'}
|
|
|
|
if not 'Name' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Name'}
|
|
|
|
if not 'Format' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Format'}
|
|
|
|
return {'status_code': 201, 'content': json.dumps({
|
|
"Layer": {
|
|
"Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6",
|
|
"Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar",
|
|
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
|
|
"Format": "Docker",
|
|
"IndexedByVersion": 1
|
|
}
|
|
})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_layer_success_mock(url, request):
|
|
vulnerabilities = [
|
|
{
|
|
"Name": "CVE-2014-9471",
|
|
"Namespace": "debian:8",
|
|
"Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Low",
|
|
"FixedBy": "9.23-5"
|
|
}
|
|
]
|
|
|
|
features = [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": vulnerabilities,
|
|
}
|
|
]
|
|
|
|
if not request.url.find('vulnerabilities') > 0:
|
|
vulnerabilities = []
|
|
|
|
if not request.url.find('features') > 0:
|
|
features = []
|
|
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52",
|
|
"Namespace": "debian:8",
|
|
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
|
|
"IndexedByVersion": 1,
|
|
"Features": features
|
|
}
|
|
})
|
|
|
|
|
|
@all_requests
|
|
def response_content(url, request):
|
|
return {'status_code': 500, 'content': 'Unknown endpoint'}
|
|
|
|
|
|
class TestSecurityScanner(unittest.TestCase):
|
|
def setUp(self):
|
|
# Enable direct download in fake storage.
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'true')
|
|
|
|
# Setup the database with fake storage.
|
|
force_rebuild = os.environ.get('SKIP_REBUILD') != 'true'
|
|
setup_database_for_testing(self, with_storage=True, force_rebuild=force_rebuild)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
self.api = SecurityScannerAPI(app.config, config_provider, storage)
|
|
|
|
def tearDown(self):
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'false')
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def assertAnalyzed(self, layer, isAnalyzed, engineVersion):
|
|
self.assertEquals(isAnalyzed, layer.security_indexed)
|
|
self.assertEquals(engineVersion, layer.security_indexed_engine)
|
|
|
|
# Ensure all parent layers are marked as analyzed.
|
|
parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
|
|
for parent in parents:
|
|
self.assertEquals(isAnalyzed, parent.security_indexed)
|
|
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
|
|
|
|
|
def test_get_layer_success(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
with HTTMock(get_layer_success_mock, response_content):
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
|
self.assertIsNotNone(result)
|
|
self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52')
|
|
|
|
|
|
def test_get_layer_failure(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
with HTTMock(get_layer_failure_mock, response_content):
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
|
self.assertIsNone(result)
|
|
|
|
|
|
def test_analyze_layer_success(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, True, 1)
|
|
|
|
|
|
def test_analyze_layer_failure(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_failure_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, False, 1)
|
|
|
|
|
|
def test_analyze_layer_internal_error(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_internal_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, False, -1)
|
|
|
|
|
|
def test_analyze_layer_bad_request(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_badrequest_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
try:
|
|
analyzer.analyze_recursively(layer)
|
|
except AnalyzeLayerException:
|
|
return
|
|
|
|
self.fail('Expected exception on bad request')
|
|
|
|
|
|
def test_analyze_layer_missing_storage(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Delete the storage for the layer.
|
|
path = model.storage.get_layer_path(layer.storage)
|
|
locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
|
|
storage.remove(locations, path)
|
|
|
|
with HTTMock(analyze_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertEquals(False, layer.security_indexed)
|
|
self.assertEquals(1, layer.security_indexed_engine)
|
|
|
|
|
|
def test_analyze_layer_success_events(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Ensure there are no existing events.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, True, 1)
|
|
|
|
# Ensure an event was written for the tag.
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(['latest', 'prod'], body['event_data']['tags'])
|
|
self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
|
|
def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'):
|
|
return {
|
|
"Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
|
|
"Created": "1456247389",
|
|
"Notified": "1456246708",
|
|
"Limit": 2,
|
|
"New": {
|
|
"Vulnerability": {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "New CVE",
|
|
"Severity": new_severity,
|
|
"FixedIn": [
|
|
{
|
|
"Name": "grep",
|
|
"Namespace": "debian:8",
|
|
"Version": "2.25"
|
|
}
|
|
]
|
|
},
|
|
"LayersIntroducingVulnerability": new_layer_ids,
|
|
},
|
|
"Old": {
|
|
"Vulnerability": {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "New CVE",
|
|
"Severity": "Low",
|
|
"FixedIn": []
|
|
},
|
|
"LayersIntroducingVulnerability": old_layer_ids,
|
|
}
|
|
}
|
|
|
|
|
|
def test_notification_new_layers_not_vulnerable(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_not_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [], # Report not vulnerable.
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_not_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [
|
|
{
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Severity": "Low",
|
|
}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
|
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
|
|
def test_notification_no_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(response_content):
|
|
notification_data = self._get_notification_data([layer_id], [layer_id])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_no_new_layers_increased_severity(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [
|
|
{
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Severity": "Low",
|
|
}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High')
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
|
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('High', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |