2016-02-24 21:01:27 +00:00
|
|
|
import json
|
2016-07-15 23:00:18 +00:00
|
|
|
import time
|
|
|
|
import unittest
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-07-08 17:57:01 +00:00
|
|
|
from app import app, storage, notification_queue
|
2016-12-14 22:11:45 +00:00
|
|
|
from data import model
|
|
|
|
from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
|
2016-03-17 16:59:27 +00:00
|
|
|
from endpoints.notificationevent import VulnerabilityFoundEvent
|
2016-12-14 22:11:45 +00:00
|
|
|
from endpoints.v2 import v2_bp
|
2016-02-24 21:01:27 +00:00
|
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
2016-12-15 21:27:24 +00:00
|
|
|
from util.secscan.api import SecurityScannerAPI
|
2016-02-24 21:01:27 +00:00
|
|
|
from util.secscan.analyzer import LayerAnalyzer
|
2016-12-14 22:11:45 +00:00
|
|
|
from util.secscan.fake import fake_security_scanner
|
2016-02-25 20:58:42 +00:00
|
|
|
from util.secscan.notifier import process_notification_data
|
2016-03-19 00:28:06 +00:00
|
|
|
from workers.security_notification_worker import SecurityNotificationWorker
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
|
|
SIMPLE_REPO = 'simple'
|
2016-03-19 00:28:06 +00:00
|
|
|
COMPLEX_REPO = 'complex'
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestSecurityScanner(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
# Enable direct download in fake storage.
|
|
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'true')
|
|
|
|
|
2016-05-05 17:55:24 +00:00
|
|
|
# Have fake storage say all files exist for the duration of the test.
|
|
|
|
storage.put_content(['local_us'], 'all_files_exist', 'true')
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
# Setup the database with fake storage.
|
2016-05-05 17:55:24 +00:00
|
|
|
setup_database_for_testing(self)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.app = app.test_client()
|
|
|
|
self.ctx = app.test_request_context()
|
|
|
|
self.ctx.__enter__()
|
|
|
|
|
2016-05-04 21:40:09 +00:00
|
|
|
self.api = SecurityScannerAPI(app, app.config, storage)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
def tearDown(self):
|
2016-05-05 17:55:24 +00:00
|
|
|
storage.remove(['local_us'], 'supports_direct_download')
|
|
|
|
storage.remove(['local_us'], 'all_files_exist')
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
finished_database_for_testing(self)
|
|
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
def assertAnalyzed(self, layer, security_scanner, isAnalyzed, engineVersion):
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertEquals(isAnalyzed, layer.security_indexed)
|
|
|
|
self.assertEquals(engineVersion, layer.security_indexed_engine)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
if isAnalyzed:
|
|
|
|
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(layer)))
|
|
|
|
|
|
|
|
# Ensure all parent layers are marked as analyzed.
|
|
|
|
parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
|
|
|
|
for parent in parents:
|
|
|
|
self.assertTrue(parent.security_indexed)
|
|
|
|
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
|
|
|
self.assertTrue(security_scanner.has_layer(security_scanner.layer_id(parent)))
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
def test_get_layer(self):
|
|
|
|
""" Test for basic retrieval of layers from the security scanner. """
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Ensure the layer doesn't exist yet.
|
|
|
|
self.assertFalse(security_scanner.has_layer(security_scanner.layer_id(layer)))
|
|
|
|
self.assertIsNone(self.api.get_layer_data(layer))
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Add the layer.
|
|
|
|
security_scanner.add_layer(security_scanner.layer_id(layer))
|
|
|
|
|
|
|
|
# Retrieve the results.
|
2016-02-24 21:01:27 +00:00
|
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertIsNotNone(result)
|
|
|
|
self.assertEquals(result['Layer']['Name'], security_scanner.layer_id(layer))
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
2016-05-05 01:47:03 +00:00
|
|
|
def test_analyze_layer_nodirectdownload_success(self):
|
2016-12-14 22:11:45 +00:00
|
|
|
""" Tests analyzing a layer when direct download is disabled. """
|
|
|
|
|
2016-05-05 01:47:03 +00:00
|
|
|
# Disable direct download in fake storage.
|
|
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'false')
|
|
|
|
|
|
|
|
try:
|
|
|
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
|
|
|
except:
|
|
|
|
# Already registered.
|
|
|
|
pass
|
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-05-05 01:47:03 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
# Ensure that the download is a registry+JWT download.
|
|
|
|
uri, auth_header = self.api._get_image_url_and_auth(layer)
|
|
|
|
self.assertIsNotNone(uri)
|
|
|
|
self.assertIsNotNone(auth_header)
|
|
|
|
|
|
|
|
# Ensure the download doesn't work without the header.
|
|
|
|
rv = self.app.head(uri)
|
|
|
|
self.assertEquals(rv.status_code, 401)
|
|
|
|
|
|
|
|
# Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
|
|
|
|
# access which messes with the test runner's rollback.
|
|
|
|
rv = self.app.head(uri, headers=[('authorization', auth_header)])
|
|
|
|
self.assertEquals(rv.status_code, 200)
|
|
|
|
|
|
|
|
# Ensure the code works when called via analyze.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
2016-05-05 01:47:03 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
2016-05-05 01:47:03 +00:00
|
|
|
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
def test_analyze_layer_success(self):
|
2016-12-14 22:11:45 +00:00
|
|
|
""" Tests that analyzing a layer successfully marks it as analyzed. """
|
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_failure(self):
|
2016-12-15 21:27:24 +00:00
|
|
|
""" Tests that failing to analyze a layer (because it 422s) marks it as analyzed but failed. """
|
2016-12-14 22:11:45 +00:00
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
security_scanner.set_fail_layer_id(security_scanner.layer_id(layer))
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_internal_error(self):
|
2016-12-15 21:27:24 +00:00
|
|
|
""" Tests that failing to analyze a layer (because it 500s) marks it as not analyzed. """
|
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
security_scanner.set_internal_error_layer_id(security_scanner.layer_id(layer))
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, False, -1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
def test_analyze_layer_error(self):
|
|
|
|
""" Tests that failing to analyze a layer (because it 400s) marks it as analyzed but failed. """
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Make is so trying to analyze the parent will fail with an error.
|
|
|
|
security_scanner.set_error_layer_id(security_scanner.layer_id(layer.parent))
|
|
|
|
|
|
|
|
# Try to the layer and its parents, but with one request causing an error.
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
# Make sure it is marked as analyzed, but in a failed state.
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_missing_parent_handled(self):
|
|
|
|
""" Tests that a missing parent causes an automatic reanalysis, which succeeds. """
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Analyze the layer and its parents.
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
# Make sure it was analyzed.
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
# Mark the layer as not yet scanned.
|
|
|
|
layer.security_indexed_engine = IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|
|
|
layer.security_indexed = False
|
|
|
|
layer.save()
|
|
|
|
|
|
|
|
# Remove the layer's parent entirely from the security scanner.
|
|
|
|
security_scanner.remove_layer(security_scanner.layer_id(layer.parent))
|
|
|
|
|
|
|
|
# Analyze again, which should properly re-analyze the missing parent and this layer.
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_invalid_parent(self):
|
|
|
|
""" Tests that trying to reanalyze a parent that is invalid causes the layer to be marked
|
|
|
|
as analyzed, but failed.
|
|
|
|
"""
|
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Analyze the layer and its parents.
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
2016-12-14 22:11:45 +00:00
|
|
|
analyzer.analyze_recursively(layer)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Make sure it was analyzed.
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
# Mark the layer as not yet scanned.
|
2016-12-14 22:11:45 +00:00
|
|
|
layer.security_indexed_engine = IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|
|
|
layer.security_indexed = False
|
|
|
|
layer.save()
|
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
# Remove the layer's parent entirely from the security scanner.
|
2016-12-14 22:11:45 +00:00
|
|
|
security_scanner.remove_layer(security_scanner.layer_id(layer.parent))
|
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
# Make is so trying to analyze the parent will fail.
|
|
|
|
security_scanner.set_error_layer_id(security_scanner.layer_id(layer.parent))
|
|
|
|
|
|
|
|
# Try to analyze again, which should try to reindex the parent and fail.
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_unsupported_parent(self):
|
|
|
|
""" Tests that attempting to analyze a layer whose parent is unanalyzable, results in the layer
|
|
|
|
being marked as analyzed, but failed.
|
|
|
|
"""
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Make is so trying to analyze the parent will fail.
|
|
|
|
security_scanner.set_fail_layer_id(security_scanner.layer_id(layer.parent))
|
|
|
|
|
|
|
|
# Attempt to the layer and its parents. This should mark the layer itself as unanalyzable.
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_analyze_layer_missing_storage(self):
|
2016-12-15 21:27:24 +00:00
|
|
|
""" Tests trying to analyze a layer with missing storage. """
|
|
|
|
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
# Delete the storage for the layer.
|
|
|
|
path = model.storage.get_layer_path(layer.storage)
|
|
|
|
locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
|
|
|
|
storage.remove(locations, path)
|
2016-05-05 17:55:24 +00:00
|
|
|
storage.remove(locations, 'all_files_exist')
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-15 21:27:24 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, False, 1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
|
2016-12-15 21:27:24 +00:00
|
|
|
def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed,
|
|
|
|
expect_notification):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-24 21:01:27 +00:00
|
|
|
self.assertFalse(layer.security_indexed)
|
|
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
|
|
|
|
# Ensure there are no existing events.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-15 21:27:24 +00:00
|
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {}, {'level': 100})
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 16:10:53 +00:00
|
|
|
# Update the layer's state before analyzing.
|
|
|
|
layer.security_indexed_engine = security_indexed_engine
|
|
|
|
layer.security_indexed = security_indexed
|
|
|
|
layer.save()
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer), [
|
|
|
|
{
|
|
|
|
"Name": "CVE-2014-9471",
|
|
|
|
"Namespace": "debian:8",
|
|
|
|
"Description": "Some service",
|
|
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
|
|
"Severity": "Low",
|
|
|
|
"FixedBy": "9.23-5"
|
|
|
|
}
|
|
|
|
])
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 16:10:53 +00:00
|
|
|
# Ensure an event was written for the tag (if necessary).
|
2016-07-15 23:00:18 +00:00
|
|
|
time.sleep(1)
|
2016-02-24 21:01:27 +00:00
|
|
|
queue_item = notification_queue.get()
|
|
|
|
|
2016-12-14 16:10:53 +00:00
|
|
|
if expect_notification:
|
|
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
|
|
|
|
body = json.loads(queue_item.body)
|
|
|
|
self.assertEquals(set(['latest', 'prod']), set(body['event_data']['tags']))
|
|
|
|
self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
|
|
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
else:
|
|
|
|
self.assertIsNone(queue_item)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-12-14 03:51:29 +00:00
|
|
|
# Ensure its security indexed engine was updated.
|
|
|
|
updated_layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertEquals(updated_layer.id, layer.id)
|
|
|
|
self.assertTrue(updated_layer.security_indexed_engine > 0)
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
|
2016-12-14 16:10:53 +00:00
|
|
|
def test_analyze_layer_success_events(self):
|
|
|
|
# Not previously indexed at all => Notification
|
|
|
|
self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True)
|
2016-12-14 03:51:29 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
|
2016-12-14 03:51:29 +00:00
|
|
|
def test_analyze_layer_success_no_notification(self):
|
2016-12-14 16:10:53 +00:00
|
|
|
# Previously successfully indexed => No notification
|
|
|
|
self.assert_analyze_layer_notify(0, True, False)
|
2016-12-14 03:51:29 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
|
2016-12-14 16:10:53 +00:00
|
|
|
def test_analyze_layer_failed_then_success_notification(self):
|
|
|
|
# Previously failed to index => Notification
|
|
|
|
self.assert_analyze_layer_notify(0, False, True)
|
2016-02-24 21:01:27 +00:00
|
|
|
|
2016-04-22 17:05:34 +00:00
|
|
|
|
2016-02-25 20:58:42 +00:00
|
|
|
def test_notification_new_layers_not_vulnerable(self):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-25 20:58:42 +00:00
|
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-15 21:27:24 +00:00
|
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification',
|
|
|
|
{}, {'level': 100})
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Fire off the notification processing.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
# Add a notification for the layer.
|
|
|
|
notification_data = security_scanner.add_notification([layer_id], [], {}, {})
|
|
|
|
|
|
|
|
# Process the notification.
|
2016-02-25 20:58:42 +00:00
|
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
|
2016-04-22 17:05:34 +00:00
|
|
|
def test_notification_delete(self):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-04-22 17:05:34 +00:00
|
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-15 21:27:24 +00:00
|
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification',
|
|
|
|
{}, {'level': 100})
|
2016-04-22 17:05:34 +00:00
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Fire off the notification processing.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
2016-04-22 17:05:34 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
# Add a notification for the layer.
|
|
|
|
notification_data = security_scanner.add_notification([layer_id], None, {}, None)
|
|
|
|
|
|
|
|
# Process the notification.
|
|
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
2016-04-22 17:05:34 +00:00
|
|
|
|
|
|
|
|
2016-02-25 20:58:42 +00:00
|
|
|
def test_notification_new_layers(self):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-25 20:58:42 +00:00
|
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-15 21:27:24 +00:00
|
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification',
|
|
|
|
{}, {'level': 100})
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Fire off the notification processing.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
vuln_info = {
|
|
|
|
"Name": "CVE-TEST",
|
|
|
|
"Namespace": "debian:8",
|
|
|
|
"Description": "Some service",
|
|
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
|
|
"Severity": "Low",
|
|
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
|
|
}
|
|
|
|
security_scanner.set_vulns(layer_id, [vuln_info])
|
|
|
|
|
|
|
|
# Add a notification for the layer.
|
|
|
|
notification_data = security_scanner.add_notification([], [layer_id], vuln_info, vuln_info)
|
|
|
|
|
|
|
|
# Process the notification.
|
2016-02-25 20:58:42 +00:00
|
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Ensure an event was written for the tag.
|
|
|
|
time.sleep(1)
|
|
|
|
queue_item = notification_queue.get()
|
|
|
|
self.assertIsNotNone(queue_item)
|
2016-02-25 20:58:42 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
item_body = json.loads(queue_item.body)
|
|
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags']))
|
|
|
|
self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id'])
|
|
|
|
self.assertEquals('Low', item_body['event_data']['vulnerability']['priority'])
|
|
|
|
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_notification_no_new_layers(self):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-15 21:27:24 +00:00
|
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification',
|
|
|
|
{}, {'level': 100})
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Fire off the notification processing.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
# Add a notification for the layer.
|
|
|
|
notification_data = security_scanner.add_notification([], [], {}, {})
|
|
|
|
|
|
|
|
# Process the notification.
|
2016-02-25 20:58:42 +00:00
|
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_notification_no_new_layers_increased_severity(self):
|
2016-06-02 20:36:38 +00:00
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
2016-02-25 20:58:42 +00:00
|
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
|
|
|
|
# Add a repo event for the layer.
|
|
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
2016-12-14 22:11:45 +00:00
|
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {},
|
|
|
|
{'level': 100})
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
|
|
# Fire off the notification processing.
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
|
|
self.assertAnalyzed(layer, security_scanner, True, 1)
|
|
|
|
|
|
|
|
old_vuln_info = {
|
|
|
|
"Name": "CVE-TEST",
|
|
|
|
"Namespace": "debian:8",
|
|
|
|
"Description": "Some service",
|
|
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
|
|
"Severity": "Low",
|
|
|
|
}
|
|
|
|
|
|
|
|
new_vuln_info = {
|
|
|
|
"Name": "CVE-TEST",
|
|
|
|
"Namespace": "debian:8",
|
|
|
|
"Description": "Some service",
|
|
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
|
|
"Severity": "Critical",
|
|
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
|
|
}
|
|
|
|
|
|
|
|
security_scanner.set_vulns(layer_id, [new_vuln_info])
|
|
|
|
|
|
|
|
# Add a notification for the layer.
|
|
|
|
notification_data = security_scanner.add_notification([layer_id], [layer_id],
|
|
|
|
old_vuln_info, new_vuln_info)
|
|
|
|
|
|
|
|
# Process the notification.
|
2016-02-25 20:58:42 +00:00
|
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Ensure an event was written for the tag.
|
|
|
|
time.sleep(1)
|
|
|
|
queue_item = notification_queue.get()
|
|
|
|
self.assertIsNotNone(queue_item)
|
2016-02-25 20:58:42 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
item_body = json.loads(queue_item.body)
|
|
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(item_body['event_data']['tags']))
|
|
|
|
self.assertEquals('CVE-TEST', item_body['event_data']['vulnerability']['id'])
|
|
|
|
self.assertEquals('Critical', item_body['event_data']['vulnerability']['priority'])
|
|
|
|
self.assertTrue(item_body['event_data']['vulnerability']['has_fix'])
|
2016-02-25 20:58:42 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Verify that an event would be raised.
|
|
|
|
event_data = item_body['event_data']
|
|
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
2016-03-17 16:59:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Create another notification with a matching level and verify it will be raised.
|
|
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {},
|
|
|
|
{'level': 1})
|
|
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
2016-03-17 16:59:27 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Create another notification with a higher level and verify it won't be raised.
|
|
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {},
|
|
|
|
{'level': 0})
|
|
|
|
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
2016-02-25 20:58:42 +00:00
|
|
|
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 05:07:48 +00:00
|
|
|
def test_select_images_to_scan(self):
|
|
|
|
# Set all images to have a security index of a version to that of the config.
|
|
|
|
expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
|
|
|
Image.update(security_indexed_engine=expected_version).execute()
|
|
|
|
|
|
|
|
# Ensure no images are available for scanning.
|
|
|
|
self.assertIsNone(model.image.get_min_id_for_sec_scan(expected_version))
|
|
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version)) == 0)
|
|
|
|
|
|
|
|
# Check for a higher version.
|
|
|
|
self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1))
|
|
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0)
|
|
|
|
|
|
|
|
|
2016-03-19 00:28:06 +00:00
|
|
|
def test_notification_worker(self):
|
2016-12-14 22:11:45 +00:00
|
|
|
layer1 = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
|
|
layer2 = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Add a repo events for the layers.
|
|
|
|
simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
|
|
complex_repo = model.repository.get_repository(ADMIN_ACCESS_USER, COMPLEX_REPO)
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
model.notification.create_repo_notification(simple_repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {}, {'level': 100})
|
|
|
|
model.notification.create_repo_notification(complex_repo, 'vulnerability_found',
|
|
|
|
'quay_notification', {}, {'level': 100})
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
|
|
self.assertIsNone(notification_queue.get())
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
with fake_security_scanner() as security_scanner:
|
|
|
|
# Test with an unknown notification.
|
2016-03-19 00:28:06 +00:00
|
|
|
worker = SecurityNotificationWorker(None)
|
2016-03-28 20:41:37 +00:00
|
|
|
self.assertFalse(worker.perform_notification_work({
|
2016-03-19 00:28:06 +00:00
|
|
|
'Name': 'unknownnotification'
|
|
|
|
}))
|
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Add some analyzed layers.
|
|
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
|
|
analyzer.analyze_recursively(layer1)
|
|
|
|
analyzer.analyze_recursively(layer2)
|
|
|
|
|
|
|
|
# Add a notification with pages of data.
|
|
|
|
new_vuln_info = {
|
|
|
|
"Name": "CVE-TEST",
|
|
|
|
"Namespace": "debian:8",
|
|
|
|
"Description": "Some service",
|
|
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
|
|
"Severity": "Critical",
|
|
|
|
"FixedIn": {'Version': "9.23-5"},
|
|
|
|
}
|
|
|
|
|
|
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer1), [new_vuln_info])
|
|
|
|
security_scanner.set_vulns(security_scanner.layer_id(layer2), [new_vuln_info])
|
|
|
|
|
|
|
|
layer_ids = [security_scanner.layer_id(layer1), security_scanner.layer_id(layer2)]
|
|
|
|
notification_data = security_scanner.add_notification([], layer_ids, {}, new_vuln_info)
|
|
|
|
|
|
|
|
# Test with a known notification with pages.
|
|
|
|
data = {
|
|
|
|
'Name': notification_data['Name'],
|
|
|
|
}
|
2016-03-19 00:28:06 +00:00
|
|
|
|
|
|
|
worker = SecurityNotificationWorker(None)
|
2016-12-14 22:11:45 +00:00
|
|
|
self.assertTrue(worker.perform_notification_work(data, layer_limit=1))
|
2016-03-19 00:28:06 +00:00
|
|
|
|
2016-12-14 22:11:45 +00:00
|
|
|
# Make sure all pages were processed by ensuring we have two notifications.
|
|
|
|
time.sleep(1)
|
|
|
|
self.assertIsNotNone(notification_queue.get())
|
|
|
|
self.assertIsNotNone(notification_queue.get())
|
2016-03-19 00:28:06 +00:00
|
|
|
|
|
|
|
|
2016-02-24 21:01:27 +00:00
|
|
|
if __name__ == '__main__':
|
2016-07-15 23:00:18 +00:00
|
|
|
unittest.main()
|