15041ac5ed
The FakeSecurityScanner mocks out all calls that Quay is expected to make to the security scanner API, and returns faked data that can be adjusted by the calling test case
563 lines
23 KiB
Python
563 lines
23 KiB
Python
import json
|
|
import time
|
|
import unittest
|
|
|
|
from app import app, storage, notification_queue
|
|
from data import model
|
|
from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|
from endpoints.notificationevent import VulnerabilityFoundEvent
|
|
from endpoints.v2 import v2_bp
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
|
from util.secscan.analyzer import LayerAnalyzer
|
|
from util.secscan.fake import fake_security_scanner
|
|
from util.secscan.notifier import process_notification_data
|
|
from workers.security_notification_worker import SecurityNotificationWorker
|
|
|
|
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
SIMPLE_REPO = 'simple'
|
|
COMPLEX_REPO = 'complex'
|
|
|
|
|
|
class TestSecurityScanner(unittest.TestCase):
|
|
def setUp(self):
|
|
# Enable direct download in fake storage.
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'true')
|
|
|
|
# Have fake storage say all files exist for the duration of the test.
|
|
storage.put_content(['local_us'], 'all_files_exist', 'true')
|
|
|
|
# Setup the database with fake storage.
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
self.api = SecurityScannerAPI(app, app.config, storage)
|
|
|
|
def tearDown(self):
|
|
storage.remove(['local_us'], 'supports_direct_download')
|
|
storage.remove(['local_us'], 'all_files_exist')
|
|
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def assertAnalyzed(self, layer, security_scanner, isAnalyzed, engineVersion):
|
|
self.assertEquals(isAnalyzed, layer.security_indexed)
|
|
self.assertEquals(engineVersion, layer.security_indexed_engine)
|
|
|
|
if isAnalyzed:
|
|
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer)))
|
|
|
|
# Ensure all parent layers are marked as analyzed.
|
|
parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
|
|
for parent in parents:
|
|
self.assertTrue(parent.security_indexed)
|
|
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
|
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent)))
|
|
|
|
|
|
def test_get_layer(self):
|
|
""" Test for basic retrieval of layers from the security scanner. """
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
# Ensure the layer doesn't exist yet.
|
|
self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer)))
|
|
self.assertIsNone(self.api.get_layer_data(layer))
|
|
|
|
# Add the layer.
|
|
security_scanner.add_layer(security_scanner.layer_id(layer))
|
|
|
|
# Retrieve the results.
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
|
self.assertIsNotNone(result)
|
|
self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer))
|
|
|
|
|
|
def test_analyze_layer_nodirectdownload_success(self):
|
|
""" Tests analyzing a layer when direct download is disabled. """
|
|
|
|
# Disable direct download in fake storage.
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'false')
|
|
|
|
try:
|
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
|
except:
|
|
# Already registered.
|
|
pass
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Ensure that the download is a registry+JWT download.
|
|
uri, auth_header = self.api._get_image_url_and_auth(layer)
|
|
self.assertIsNotNone(uri)
|
|
self.assertIsNotNone(auth_header)
|
|
|
|
# Ensure the download doesn't work without the header.
|
|
rv = self.app.head(uri)
|
|
self.assertEquals(rv.status_code, 401)
|
|
|
|
# Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
|
|
# access which messes with the test runner's rollback.
|
|
rv = self.app.head(uri, headers=[('authorization', auth_header)])
|
|
self.assertEquals(rv.status_code, 200)
|
|
|
|
# Ensure the code works when called via analyze.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
def test_analyze_layer_success(self):
|
|
""" Tests that analyzing a layer successfully marks it as analyzed. """
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
def test_analyze_layer_failure(self):
|
|
""" Tests that failing to analyze a layer marks it as not analyzed. """
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
security_scanner.set_fail_layer_id(security_scanner.layer_id(layer))
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
|
|
|
|
|
def test_analyze_layer_internal_error(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer))
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, False, -1)
|
|
|
|
|
|
def test_analyze_layer_missing_parent(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
# Analyze the layer and its parents.
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
# Make sure it was analyzed.
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
# Mark the layer as not analyzed and delete its parent layer from the security scanner.
|
|
layer.security_indexed_engine = IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|
layer.security_indexed = False
|
|
layer.save()
|
|
|
|
security_scanner.remove_layer(security_scanner.layer_id(layer.parent))
|
|
|
|
# Try to analyze again; this should fail because the parent is missing.
|
|
with self.assertRaisesRegexp(AnalyzeLayerException, 'Bad request to security scanner'):
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
def test_analyze_layer_missing_storage(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Delete the storage for the layer.
|
|
path = model.storage.get_layer_path(layer.storage)
|
|
locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
|
|
storage.remove(locations, path)
|
|
storage.remove(locations, 'all_files_exist')
|
|
|
|
with fake_security_scanner():
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertEquals(False, layer.security_indexed)
|
|
self.assertEquals(1, layer.security_indexed_engine)
|
|
|
|
|
|
def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed, expect_notification):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Ensure there are no existing events.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Update the layer's state before analyzing.
|
|
layer.security_indexed_engine = security_indexed_engine
|
|
layer.security_indexed = security_indexed
|
|
layer.save()
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer), [
|
|
{
|
|
"Name": "CVE-2014-9471",
|
|
"Namespace": "debian:8",
|
|
"Description": "Some service",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Low",
|
|
"FixedBy": "9.23-5"
|
|
}
|
|
])
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
# Ensure an event was written for the tag (if necessary).
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
|
|
if expect_notification:
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(set(['latest', 'prod']), set(body['event_data']['tags']))
|
|
self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
else:
|
|
self.assertIsNone(queue_item)
|
|
|
|
# Ensure its security indexed engine was updated.
|
|
updated_layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertEquals(updated_layer.id, layer.id)
|
|
self.assertTrue(updated_layer.security_indexed_engine > 0)
|
|
|
|
|
|
def test_analyze_layer_success_events(self):
|
|
# Not previously indexed at all => Notification
|
|
self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True)
|
|
|
|
|
|
def test_analyze_layer_success_no_notification(self):
|
|
# Previously successfully indexed => No notification
|
|
self.assert_analyze_layer_notify(0, True, False)
|
|
|
|
|
|
def test_analyze_layer_failed_then_success_notification(self):
|
|
# Previously failed to index => Notification
|
|
self.assert_analyze_layer_notify(0, False, True)
|
|
|
|
|
|
def test_notification_new_layers_not_vulnerable(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
# Add a notification for the layer.
|
|
notification_data = security_scanner.add_notification([layer_id], [], {}, {})
|
|
|
|
# Process the notification.
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_delete(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
# Add a notification for the layer.
|
|
notification_data = security_scanner.add_notification([layer_id], None, {}, None)
|
|
|
|
# Process the notification.
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
vuln_info = {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "Some service",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Low",
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
}
|
|
security_scanner.set_vulns(layer_id, [vuln_info])
|
|
|
|
# Add a notification for the layer.
|
|
notification_data = security_scanner.add_notification([], [layer_id], vuln_info, vuln_info)
|
|
|
|
# Process the notification.
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
item_body = json.loads(queue_item.body)
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags']))
|
|
self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', item_body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
|
|
|
|
|
def test_notification_no_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
# Add a notification for the layer.
|
|
notification_data = security_scanner.add_notification([], [], {}, {})
|
|
|
|
# Process the notification.
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_no_new_layers_increased_severity(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
'quay_notification', {},
|
|
{'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with fake_security_scanner() as security_scanner:
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
old_vuln_info = {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "Some service",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Low",
|
|
}
|
|
|
|
new_vuln_info = {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "Some service",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Critical",
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
}
|
|
|
|
security_scanner.set_vulns(layer_id, [new_vuln_info])
|
|
|
|
# Add a notification for the layer.
|
|
notification_data = security_scanner.add_notification([layer_id], [layer_id],
|
|
old_vuln_info, new_vuln_info)
|
|
|
|
# Process the notification.
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
item_body = json.loads(queue_item.body)
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags']))
|
|
self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Critical', item_body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
|
|
|
# Verify that an event would be raised.
|
|
event_data = item_body['event_data']
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
# Create another notification with a matching level and verify it will be raised.
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
'quay_notification', {},
|
|
{'level': 1})
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
# Create another notification with a higher level and verify it won't be raised.
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
'quay_notification', {},
|
|
{'level': 0})
|
|
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
|
|
def test_select_images_to_scan(self):
|
|
# Set all images to have a security index of a version to that of the config.
|
|
expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
|
Image.update(security_indexed_engine=expected_version).execute()
|
|
|
|
# Ensure no images are available for scanning.
|
|
self.assertIsNone(model.image.get_min_id_for_sec_scan(expected_version))
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version)) == 0)
|
|
|
|
# Check for a higher version.
|
|
self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1))
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0)
|
|
|
|
|
|
def test_notification_worker(self):
|
|
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
|
|
|
# Add a repo events for the layers.
|
|
simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO)
|
|
|
|
model.notification.create_repo_notification(simple_repo, 'vulnerability_found',
|
|
'quay_notification', {}, {'level': 100})
|
|
model.notification.create_repo_notification(complex_repo, 'vulnerability_found',
|
|
'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
# Test with an unknown notification.
|
|
worker = SecurityNotificationWorker(None)
|
|
self.assertFalse(worker.perform_notification_work({
|
|
'Name': 'unknownnotification'
|
|
}))
|
|
|
|
# Add some analyzed layers.
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer1)
|
|
analyzer.analyze_recursively(layer2)
|
|
|
|
# Add a notification with pages of data.
|
|
new_vuln_info = {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "Some service",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Critical",
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
}
|
|
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info])
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info])
|
|
|
|
layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)]
|
|
notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info)
|
|
|
|
# Test with a known notification with pages.
|
|
data = {
|
|
'Name': notification_data['Name'],
|
|
}
|
|
|
|
worker = SecurityNotificationWorker(None)
|
|
self.assertTrue(worker.perform_notification_work(data, layer_limit=1))
|
|
|
|
# Make sure all pages were processed by ensuring we have two notifications.
|
|
time.sleep(1)
|
|
self.assertIsNotNone(notification_queue.get())
|
|
self.assertIsNotNone(notification_queue.get())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|