Implement against new Clair paginated notification system
This commit is contained in:
parent
b34314a584
commit
f498e92d58
10 changed files with 447 additions and 101 deletions
|
@ -1,11 +1,13 @@
|
|||
import unittest
|
||||
import json
|
||||
import os
|
||||
from httmock import urlmatch, all_requests, HTTMock
|
||||
|
||||
from app import app, config_provider, storage, notification_queue
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
|
||||
from util.secscan.analyzer import LayerAnalyzer
|
||||
from util.secscan.notifier import process_notification_data
|
||||
from data import model
|
||||
|
||||
|
||||
|
@ -69,10 +71,10 @@ def get_layer_success_mock(url, request):
|
|||
}
|
||||
]
|
||||
|
||||
if not request.url.endswith('?vulnerabilities'):
|
||||
if not request.url.find('vulnerabilities') > 0:
|
||||
vulnerabilities = []
|
||||
|
||||
if not request.url.endswith('?features'):
|
||||
if not request.url.find('features') > 0:
|
||||
features = []
|
||||
|
||||
return json.dumps({
|
||||
|
@ -97,7 +99,8 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
storage.put_content(['local_us'], 'supports_direct_download', 'true')
|
||||
|
||||
# Setup the database with fake storage.
|
||||
setup_database_for_testing(self, with_storage=True, force_rebuild=True)
|
||||
force_rebuild = os.environ.get('SKIP_REBUILD') != 'true'
|
||||
setup_database_for_testing(self, with_storage=True, force_rebuild=force_rebuild)
|
||||
self.app = app.test_client()
|
||||
self.ctx = app.test_request_context()
|
||||
self.ctx.__enter__()
|
||||
|
@ -238,5 +241,200 @@ class TestSecurityScanner(unittest.TestCase):
|
|||
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
||||
|
||||
|
||||
def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'):
|
||||
return {
|
||||
"Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
|
||||
"Created": "1456247389",
|
||||
"Notified": "1456246708",
|
||||
"Limit": 2,
|
||||
"New": {
|
||||
"Vulnerability": {
|
||||
"Name": "CVE-TEST",
|
||||
"Namespace": "debian:8",
|
||||
"Description": "New CVE",
|
||||
"Severity": new_severity,
|
||||
"FixedIn": [
|
||||
{
|
||||
"Name": "grep",
|
||||
"Namespace": "debian:8",
|
||||
"Version": "2.25"
|
||||
}
|
||||
]
|
||||
},
|
||||
"LayersIntroducingVulnerability": new_layer_ids,
|
||||
},
|
||||
"Old": {
|
||||
"Vulnerability": {
|
||||
"Name": "CVE-TEST",
|
||||
"Namespace": "debian:8",
|
||||
"Description": "New CVE",
|
||||
"Severity": "Low",
|
||||
"FixedIn": []
|
||||
},
|
||||
"LayersIntroducingVulnerability": old_layer_ids,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_notification_new_layers_not_vulnerable(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
# Add a repo event for the layer.
|
||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
||||
def get_matching_layer_not_vulnerable(url, request):
|
||||
return json.dumps({
|
||||
"Layer": {
|
||||
"Name": layer_id,
|
||||
"Namespace": "debian:8",
|
||||
"IndexedByVersion": 1,
|
||||
"Features": [
|
||||
{
|
||||
"Name": "coreutils",
|
||||
"Namespace": "debian:8",
|
||||
"Version": "8.23-4",
|
||||
"Vulnerabilities": [], # Report not vulnerable.
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
# Fire off the notification processing.
|
||||
with HTTMock(get_matching_layer_not_vulnerable, response_content):
|
||||
notification_data = self._get_notification_data([layer_id], [])
|
||||
self.assertTrue(process_notification_data(notification_data))
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
|
||||
def test_notification_new_layers(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
# Add a repo event for the layer.
|
||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
||||
def get_matching_layer_vulnerable(url, request):
|
||||
return json.dumps({
|
||||
"Layer": {
|
||||
"Name": layer_id,
|
||||
"Namespace": "debian:8",
|
||||
"IndexedByVersion": 1,
|
||||
"Features": [
|
||||
{
|
||||
"Name": "coreutils",
|
||||
"Namespace": "debian:8",
|
||||
"Version": "8.23-4",
|
||||
"Vulnerabilities": [
|
||||
{
|
||||
"Name": "CVE-TEST",
|
||||
"Namespace": "debian:8",
|
||||
"Severity": "Low",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
# Fire off the notification processing.
|
||||
with HTTMock(get_matching_layer_vulnerable, response_content):
|
||||
notification_data = self._get_notification_data([layer_id], [])
|
||||
self.assertTrue(process_notification_data(notification_data))
|
||||
|
||||
# Ensure an event was written for the tag.
|
||||
queue_item = notification_queue.get()
|
||||
self.assertIsNotNone(queue_item)
|
||||
|
||||
body = json.loads(queue_item.body)
|
||||
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
||||
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
||||
self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
|
||||
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
||||
|
||||
|
||||
def test_notification_no_new_layers(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
# Add a repo event for the layer.
|
||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
# Fire off the notification processing.
|
||||
with HTTMock(response_content):
|
||||
notification_data = self._get_notification_data([layer_id], [layer_id])
|
||||
self.assertTrue(process_notification_data(notification_data))
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
|
||||
def test_notification_no_new_layers_increased_severity(self):
|
||||
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
|
||||
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
# Add a repo event for the layer.
|
||||
repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
|
||||
model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
|
||||
def get_matching_layer_vulnerable(url, request):
|
||||
return json.dumps({
|
||||
"Layer": {
|
||||
"Name": layer_id,
|
||||
"Namespace": "debian:8",
|
||||
"IndexedByVersion": 1,
|
||||
"Features": [
|
||||
{
|
||||
"Name": "coreutils",
|
||||
"Namespace": "debian:8",
|
||||
"Version": "8.23-4",
|
||||
"Vulnerabilities": [
|
||||
{
|
||||
"Name": "CVE-TEST",
|
||||
"Namespace": "debian:8",
|
||||
"Severity": "Low",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
|
||||
# Ensure that there are no event queue items for the layer.
|
||||
self.assertIsNone(notification_queue.get())
|
||||
|
||||
# Fire off the notification processing.
|
||||
with HTTMock(get_matching_layer_vulnerable, response_content):
|
||||
notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='High')
|
||||
self.assertTrue(process_notification_data(notification_data))
|
||||
|
||||
# Ensure an event was written for the tag.
|
||||
queue_item = notification_queue.get()
|
||||
self.assertIsNotNone(queue_item)
|
||||
|
||||
body = json.loads(queue_item.body)
|
||||
self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
|
||||
self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
|
||||
self.assertEquals('High', body['event_data']['vulnerability']['priority'])
|
||||
self.assertTrue(body['event_data']['vulnerability']['has_fix'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue