6871eb95b1
Following this change, if an image was previously indexed unsuccessfully, then we will send notifications once successfully indexed
648 lines
24 KiB
Python
648 lines
24 KiB
Python
import json
|
|
import time
|
|
import unittest
|
|
from httmock import urlmatch, all_requests, HTTMock
|
|
|
|
from app import app, storage, notification_queue
|
|
from endpoints.notificationevent import VulnerabilityFoundEvent
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
|
from util.secscan.analyzer import LayerAnalyzer
|
|
from util.secscan.notifier import process_notification_data
|
|
from data import model
|
|
from data.database import Image, IMAGE_NOT_SCANNED_ENGINE_VERSION
|
|
from workers.security_notification_worker import SecurityNotificationWorker
|
|
from endpoints.v2 import v2_bp
|
|
|
|
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
SIMPLE_REPO = 'simple'
|
|
COMPLEX_REPO = 'complex'
|
|
|
|
_PORT_NUMBER = 5001
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_layer_failure_mock(url, request):
|
|
return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_badrequest_mock(url, request):
|
|
return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_internal_mock(url, request):
|
|
return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_failure_mock(url, request):
|
|
return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
|
|
def analyze_layer_success_mock(url, request):
|
|
body_data = json.loads(request.body)
|
|
if not 'Layer' in body_data:
|
|
return {'status_code': 400, 'content': 'Missing body'}
|
|
|
|
layer = body_data['Layer']
|
|
if not 'Path' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Path'}
|
|
|
|
if not 'Name' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Name'}
|
|
|
|
if not 'Format' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Format'}
|
|
|
|
return {'status_code': 201, 'content': json.dumps({
|
|
"Layer": {
|
|
"Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6",
|
|
"Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar",
|
|
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
|
|
"Format": "Docker",
|
|
"IndexedByVersion": 1
|
|
}
|
|
})}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_layer_success_mock(url, request):
|
|
vulnerabilities = [
|
|
{
|
|
"Name": "CVE-2014-9471",
|
|
"Namespace": "debian:8",
|
|
"Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.",
|
|
"Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
|
|
"Severity": "Low",
|
|
"FixedBy": "9.23-5"
|
|
}
|
|
]
|
|
|
|
features = [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": vulnerabilities,
|
|
}
|
|
]
|
|
|
|
if not request.url.find('vulnerabilities') > 0:
|
|
vulnerabilities = []
|
|
|
|
if not request.url.find('features') > 0:
|
|
features = []
|
|
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52",
|
|
"Namespace": "debian:8",
|
|
"ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
|
|
"IndexedByVersion": 1,
|
|
"Features": features
|
|
}
|
|
})
|
|
|
|
|
|
@all_requests
|
|
def response_content(url, request):
|
|
return {'status_code': 500, 'content': 'Unknown endpoint'}
|
|
|
|
|
|
class TestSecurityScanner(unittest.TestCase):
|
|
def setUp(self):
|
|
# Enable direct download in fake storage.
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'true')
|
|
|
|
# Have fake storage say all files exist for the duration of the test.
|
|
storage.put_content(['local_us'], 'all_files_exist', 'true')
|
|
|
|
# Setup the database with fake storage.
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
self.api = SecurityScannerAPI(app, app.config, storage)
|
|
|
|
def tearDown(self):
|
|
storage.remove(['local_us'], 'supports_direct_download')
|
|
storage.remove(['local_us'], 'all_files_exist')
|
|
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def assertAnalyzed(self, layer, isAnalyzed, engineVersion):
|
|
self.assertEquals(isAnalyzed, layer.security_indexed)
|
|
self.assertEquals(engineVersion, layer.security_indexed_engine)
|
|
|
|
# Ensure all parent layers are marked as analyzed.
|
|
parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
|
|
for parent in parents:
|
|
self.assertEquals(isAnalyzed, parent.security_indexed)
|
|
self.assertEquals(engineVersion, parent.security_indexed_engine)
|
|
|
|
|
|
def test_get_layer_success(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
with HTTMock(get_layer_success_mock, response_content):
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
|
self.assertIsNotNone(result)
|
|
self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52')
|
|
|
|
|
|
def test_get_layer_failure(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
with HTTMock(get_layer_failure_mock, response_content):
|
|
result = self.api.get_layer_data(layer, include_vulnerabilities=True)
|
|
self.assertIsNone(result)
|
|
|
|
|
|
def test_analyze_layer_nodirectdownload_success(self):
|
|
# Disable direct download in fake storage.
|
|
storage.put_content(['local_us'], 'supports_direct_download', 'false')
|
|
|
|
try:
|
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
|
except:
|
|
# Already registered.
|
|
pass
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Ensure that the download is a registry+JWT download.
|
|
uri, auth_header = self.api._get_image_url_and_auth(layer)
|
|
self.assertIsNotNone(uri)
|
|
self.assertIsNotNone(auth_header)
|
|
|
|
# Ensure the download doesn't work without the header.
|
|
rv = self.app.head(uri)
|
|
self.assertEquals(rv.status_code, 401)
|
|
|
|
# Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
|
|
# access which messes with the test runner's rollback.
|
|
rv = self.app.head(uri, headers=[('authorization', auth_header)])
|
|
self.assertEquals(rv.status_code, 200)
|
|
|
|
# Ensure the code works when called via analyze.
|
|
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, True, 1)
|
|
|
|
|
|
def test_analyze_layer_success(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, True, 1)
|
|
|
|
|
|
def test_analyze_layer_failure(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_failure_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, False, 1)
|
|
|
|
|
|
def test_analyze_layer_internal_error(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_internal_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, False, -1)
|
|
|
|
|
|
def test_analyze_layer_bad_request(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
with HTTMock(analyze_layer_badrequest_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
try:
|
|
analyzer.analyze_recursively(layer)
|
|
except AnalyzeLayerException:
|
|
return
|
|
|
|
self.fail('Expected exception on bad request')
|
|
|
|
|
|
def test_analyze_layer_missing_storage(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Delete the storage for the layer.
|
|
path = model.storage.get_layer_path(layer.storage)
|
|
locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
|
|
storage.remove(locations, path)
|
|
storage.remove(locations, 'all_files_exist')
|
|
|
|
with HTTMock(analyze_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertEquals(False, layer.security_indexed)
|
|
self.assertEquals(1, layer.security_indexed_engine)
|
|
|
|
def assert_analyze_layer_notify(self, security_indexed_engine, security_indexed, expect_notification):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
self.assertFalse(layer.security_indexed)
|
|
self.assertEquals(-1, layer.security_indexed_engine)
|
|
|
|
# Ensure there are no existing events.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Update the layer's state before analyzing.
|
|
layer.security_indexed_engine = security_indexed_engine
|
|
layer.security_indexed = security_indexed
|
|
layer.save()
|
|
|
|
with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
|
|
analyzer = LayerAnalyzer(app.config, self.api)
|
|
analyzer.analyze_recursively(layer)
|
|
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertAnalyzed(layer, True, 1)
|
|
|
|
# Ensure an event was written for the tag (if necessary).
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
|
|
if expect_notification:
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(set(['latest', 'prod']), set(body['event_data']['tags']))
|
|
self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
else:
|
|
self.assertIsNone(queue_item)
|
|
|
|
# Ensure its security indexed engine was updated.
|
|
updated_layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
|
self.assertEquals(updated_layer.id, layer.id)
|
|
self.assertTrue(updated_layer.security_indexed_engine > 0)
|
|
|
|
def test_analyze_layer_success_events(self):
|
|
# Not previously indexed at all => Notification
|
|
self.assert_analyze_layer_notify(IMAGE_NOT_SCANNED_ENGINE_VERSION, False, True)
|
|
|
|
def test_analyze_layer_success_no_notification(self):
|
|
# Previously successfully indexed => No notification
|
|
self.assert_analyze_layer_notify(0, True, False)
|
|
|
|
def test_analyze_layer_failed_then_success_notification(self):
|
|
# Previously failed to index => Notification
|
|
self.assert_analyze_layer_notify(0, False, True)
|
|
|
|
def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'):
|
|
return {
|
|
"Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
|
|
"Created": "1456247389",
|
|
"Notified": "1456246708",
|
|
"Limit": 2,
|
|
"New": {
|
|
"Vulnerability": {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "New CVE",
|
|
"Severity": new_severity,
|
|
"FixedIn": [
|
|
{
|
|
"Name": "grep",
|
|
"Namespace": "debian:8",
|
|
"Version": "2.25"
|
|
}
|
|
]
|
|
},
|
|
"LayersIntroducingVulnerability": new_layer_ids,
|
|
},
|
|
"Old": {
|
|
"Vulnerability": {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "New CVE",
|
|
"Severity": "Low",
|
|
"FixedIn": []
|
|
},
|
|
"LayersIntroducingVulnerability": old_layer_ids,
|
|
}
|
|
}
|
|
|
|
|
|
def _get_delete_notification_data(self, old_layer_ids):
|
|
return {
|
|
"Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
|
|
"Created": "1456247389",
|
|
"Notified": "1456246708",
|
|
"Limit": 2,
|
|
"Old": {
|
|
"Vulnerability": {
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Description": "New CVE",
|
|
"Severity": "Low",
|
|
"FixedIn": []
|
|
},
|
|
"LayersIntroducingVulnerability": old_layer_ids,
|
|
}
|
|
}
|
|
|
|
def test_notification_new_layers_not_vulnerable(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_not_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [], # Report not vulnerable.
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_not_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_delete(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
notification_data = self._get_delete_notification_data([layer_id])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [
|
|
{
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Severity": "Low",
|
|
}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags']))
|
|
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
|
|
def test_notification_no_new_layers(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(response_content):
|
|
notification_data = self._get_notification_data([layer_id], [layer_id])
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
|
|
def test_notification_no_new_layers_increased_severity(self):
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
# Add a repo event for the layer.
|
|
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
|
def get_matching_layer_vulnerable(url, request):
|
|
return json.dumps({
|
|
"Layer": {
|
|
"Name": layer_id,
|
|
"Namespace": "debian:8",
|
|
"IndexedByVersion": 1,
|
|
"Features": [
|
|
{
|
|
"Name": "coreutils",
|
|
"Namespace": "debian:8",
|
|
"Version": "8.23-4",
|
|
"Vulnerabilities": [
|
|
{
|
|
"Name": "CVE-TEST",
|
|
"Namespace": "debian:8",
|
|
"Severity": "Low",
|
|
}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
# Ensure that there are no event queue items for the layer.
|
|
self.assertIsNone(notification_queue.get())
|
|
|
|
# Fire off the notification processing.
|
|
with HTTMock(get_matching_layer_vulnerable, response_content):
|
|
notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical')
|
|
self.assertTrue(process_notification_data(notification_data))
|
|
|
|
# Ensure an event was written for the tag.
|
|
time.sleep(1)
|
|
queue_item = notification_queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
body = json.loads(queue_item.body)
|
|
self.assertEquals(sorted(['prod', 'latest']), sorted(body['event_data']['tags']))
|
|
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
|
self.assertEquals('Critical', body['event_data']['vulnerability']['priority'])
|
|
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
|
|
|
# Verify that an event would be raised.
|
|
event_data = body['event_data']
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
# Create another notification with a matching level and verify it will be raised.
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1})
|
|
self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
# Create another notification with a higher level and verify it won't be raised.
|
|
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0})
|
|
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
|
|
|
|
|
|
def test_select_images_to_scan(self):
|
|
# Set all images to have a security index of a version to that of the config.
|
|
expected_version = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
|
|
Image.update(security_indexed_engine=expected_version).execute()
|
|
|
|
# Ensure no images are available for scanning.
|
|
self.assertIsNone(model.image.get_min_id_for_sec_scan(expected_version))
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version)) == 0)
|
|
|
|
# Check for a higher version.
|
|
self.assertIsNotNone(model.image.get_min_id_for_sec_scan(expected_version + 1))
|
|
self.assertTrue(len(model.image.get_images_eligible_for_scan(expected_version + 1)) > 0)
|
|
|
|
|
|
def test_notification_worker(self):
|
|
pages_called = []
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE')
|
|
def delete_notification(url, request):
|
|
pages_called.append('DELETE')
|
|
return {'status_code': 201, 'content': ''}
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET')
|
|
def get_notification(url, request):
|
|
if url.query.find('page=nextpage') >= 0:
|
|
pages_called.append('GET-2')
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
data = {
|
|
'Notification': self._get_notification_data([layer_id], [layer_id]),
|
|
}
|
|
|
|
return json.dumps(data)
|
|
else:
|
|
pages_called.append('GET-1')
|
|
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
|
|
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
notification_data = self._get_notification_data([layer_id], [layer_id])
|
|
notification_data['NextPage'] = 'nextpage'
|
|
|
|
data = {
|
|
'Notification': notification_data,
|
|
}
|
|
|
|
return json.dumps(data)
|
|
|
|
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)')
|
|
def unknown_notification(url, request):
|
|
return {'status_code': 404, 'content': 'Unknown notification'}
|
|
|
|
# Test with an unknown notification.
|
|
with HTTMock(get_notification, unknown_notification):
|
|
worker = SecurityNotificationWorker(None)
|
|
self.assertFalse(worker.perform_notification_work({
|
|
'Name': 'unknownnotification'
|
|
}))
|
|
|
|
# Test with a known notification with pages.
|
|
data = {
|
|
'Name': 'somenotification'
|
|
}
|
|
|
|
with HTTMock(get_notification, delete_notification, unknown_notification):
|
|
worker = SecurityNotificationWorker(None)
|
|
self.assertTrue(worker.perform_notification_work(data))
|
|
|
|
self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|