607 lines
		
	
	
		
			No EOL
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			607 lines
		
	
	
		
			No EOL
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import json
 | |
| import os
 | |
| from httmock import urlmatch, all_requests, HTTMock
 | |
| 
 | |
| from app import app, config_provider, storage, notification_queue
 | |
| from endpoints.notificationevent import VulnerabilityFoundEvent
 | |
| from initdb import setup_database_for_testing, finished_database_for_testing
 | |
| from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
 | |
| from util.secscan.analyzer import LayerAnalyzer
 | |
| from util.secscan.notifier import process_notification_data
 | |
| from data import model
 | |
| from storage.basestorage import StoragePaths
 | |
| from workers.security_notification_worker import SecurityNotificationWorker
 | |
| from endpoints.v2 import v2_bp
 | |
| 
 | |
| 
 | |
| ADMIN_ACCESS_USER = 'devtable'
 | |
| SIMPLE_REPO = 'simple'
 | |
| COMPLEX_REPO = 'complex'
 | |
| 
 | |
| _PORT_NUMBER = 5001
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
 | |
| def get_layer_failure_mock(url, request):
 | |
|   return {'status_code': 404, 'content': json.dumps({'Error': {'Message': 'Unknown layer'}})}
 | |
| 
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
 | |
| def analyze_layer_badrequest_mock(url, request):
 | |
|   return {'status_code': 400, 'content': json.dumps({'Error': {'Message': 'Bad request'}})}
 | |
| 
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
 | |
| def analyze_layer_internal_mock(url, request):
 | |
|   return {'status_code': 500, 'content': json.dumps({'Error': {'Message': 'Internal server error'}})}
 | |
| 
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
 | |
| def analyze_layer_failure_mock(url, request):
 | |
|   return {'status_code': 422, 'content': json.dumps({'Error': {'Message': 'Bad layer'}})}
 | |
| 
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers$')
 | |
| def analyze_layer_success_mock(url, request):
 | |
|   body_data = json.loads(request.body)
 | |
|   if not 'Layer' in body_data:
 | |
|     return {'status_code': 400, 'content': 'Missing body'}
 | |
| 
 | |
|   layer = body_data['Layer']
 | |
|   if not 'Path' in layer:
 | |
|     return {'status_code': 400, 'content': 'Missing Path'}
 | |
| 
 | |
|   if not 'Name' in layer:
 | |
|     return {'status_code': 400, 'content': 'Missing Name'}
 | |
| 
 | |
|   if not 'Format' in layer:
 | |
|     return {'status_code': 400, 'content': 'Missing Format'}
 | |
| 
 | |
|   return {'status_code': 201, 'content': json.dumps({
 | |
|     "Layer": {
 | |
|       "Name": "523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6",
 | |
|       "Path": "/mnt/layers/523ef1d23f222195488575f52a39c729c76a8c5630c9a194139cb246fb212da6/layer.tar",
 | |
|       "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
 | |
|       "Format": "Docker",
 | |
|       "IndexedByVersion": 1
 | |
|     }
 | |
|   })}
 | |
| 
 | |
| 
 | |
| @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
 | |
| def get_layer_success_mock(url, request):
 | |
|   vulnerabilities = [
 | |
|     {
 | |
|       "Name": "CVE-2014-9471",
 | |
|       "Namespace": "debian:8",
 | |
|       "Description": "The parse_datetime function in GNU coreutils allows remote attackers to cause a denial of service (crash) or possibly execute arbitrary code via a crafted date string, as demonstrated by the \"--date=TZ=\"123\"345\" @1\" string to the touch or date command.",
 | |
|       "Link": "https://security-tracker.debian.org/tracker/CVE-2014-9471",
 | |
|       "Severity": "Low",
 | |
|       "FixedBy": "9.23-5"
 | |
|     }
 | |
|   ]
 | |
| 
 | |
|   features = [
 | |
|     {
 | |
|       "Name": "coreutils",
 | |
|       "Namespace": "debian:8",
 | |
|       "Version": "8.23-4",
 | |
|       "Vulnerabilities": vulnerabilities,
 | |
|     }
 | |
|   ]
 | |
| 
 | |
|   if not request.url.find('vulnerabilities') > 0:
 | |
|     vulnerabilities = []
 | |
| 
 | |
|     if not request.url.find('features') > 0:
 | |
|       features = []
 | |
| 
 | |
|   return json.dumps({
 | |
|     "Layer": {
 | |
|       "Name": "17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52",
 | |
|       "Namespace": "debian:8",
 | |
|       "ParentName": "140f9bdfeb9784cf8730e9dab5dd12fbd704151cf555ac8cae650451794e5ac2",
 | |
|       "IndexedByVersion": 1,
 | |
|       "Features": features
 | |
|     }
 | |
|   })
 | |
| 
 | |
| 
 | |
| @all_requests
 | |
| def response_content(url, request):
 | |
|   return {'status_code': 500, 'content': 'Unknown endpoint'}
 | |
| 
 | |
| 
 | |
| class TestSecurityScanner(unittest.TestCase):
 | |
|   def setUp(self):
 | |
|     # Enable direct download in fake storage.
 | |
|     storage.put_content(['local_us'], 'supports_direct_download', 'true')
 | |
| 
 | |
|     # Have fake storage say all files exist for the duration of the test.
 | |
|     storage.put_content(['local_us'], 'all_files_exist', 'true')
 | |
| 
 | |
|     # Setup the database with fake storage.
 | |
|     setup_database_for_testing(self)
 | |
|     self.app = app.test_client()
 | |
|     self.ctx = app.test_request_context()
 | |
|     self.ctx.__enter__()
 | |
| 
 | |
|     self.api = SecurityScannerAPI(app, app.config, storage)
 | |
| 
 | |
|   def tearDown(self):
 | |
|     storage.remove(['local_us'], 'supports_direct_download')
 | |
|     storage.remove(['local_us'], 'all_files_exist')
 | |
| 
 | |
|     finished_database_for_testing(self)
 | |
|     self.ctx.__exit__(True, None, None)
 | |
| 
 | |
|   def assertAnalyzed(self, layer, isAnalyzed, engineVersion):
 | |
|     self.assertEquals(isAnalyzed, layer.security_indexed)
 | |
|     self.assertEquals(engineVersion, layer.security_indexed_engine)
 | |
| 
 | |
|     # Ensure all parent layers are marked as analyzed.
 | |
|     parents = model.image.get_parent_images(ADMIN_ACCESS_USER, SIMPLE_REPO, layer)
 | |
|     for parent in parents:
 | |
|       self.assertEquals(isAnalyzed, parent.security_indexed)
 | |
|       self.assertEquals(engineVersion, parent.security_indexed_engine)
 | |
| 
 | |
| 
 | |
|   def test_get_layer_success(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     with HTTMock(get_layer_success_mock, response_content):
 | |
|       result = self.api.get_layer_data(layer, include_vulnerabilities=True)
 | |
|       self.assertIsNotNone(result)
 | |
|       self.assertEquals(result['Layer']['Name'], '17675ec01494d651e1ccf81dc9cf63959ebfeed4f978fddb1666b6ead008ed52')
 | |
| 
 | |
| 
 | |
|   def test_get_layer_failure(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     with HTTMock(get_layer_failure_mock, response_content):
 | |
|       result = self.api.get_layer_data(layer, include_vulnerabilities=True)
 | |
|       self.assertIsNone(result)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_nodirectdownload_success(self):
 | |
|     # Disable direct download in fake storage.
 | |
|     storage.put_content(['local_us'], 'supports_direct_download', 'false')
 | |
| 
 | |
|     try:
 | |
|       app.register_blueprint(v2_bp, url_prefix='/v2')
 | |
|     except:
 | |
|       # Already registered.
 | |
|       pass
 | |
| 
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     # Ensure that the download is a registry+JWT download.
 | |
|     uri, auth_header = self.api._get_image_url_and_auth(layer)
 | |
|     self.assertIsNotNone(uri)
 | |
|     self.assertIsNotNone(auth_header)
 | |
| 
 | |
|     # Ensure the download doesn't work without the header.
 | |
|     rv = self.app.head(uri)
 | |
|     self.assertEquals(rv.status_code, 401)
 | |
| 
 | |
|     # Ensure the download works with the header. Note we use a HEAD here, as GET causes DB
 | |
|     # access which messes with the test runner's rollback.
 | |
|     rv = self.app.head(uri, headers=[('authorization', auth_header)])
 | |
|     self.assertEquals(rv.status_code, 200)
 | |
| 
 | |
|     # Ensure the code works when called via analyze.
 | |
|     with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertAnalyzed(layer, True, 1)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_success(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertAnalyzed(layer, True, 1)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_failure(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     with HTTMock(analyze_layer_failure_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertAnalyzed(layer, False, 1)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_internal_error(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     with HTTMock(analyze_layer_internal_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertAnalyzed(layer, False, -1)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_bad_request(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     with HTTMock(analyze_layer_badrequest_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       try:
 | |
|         analyzer.analyze_recursively(layer)
 | |
|       except AnalyzeLayerException:
 | |
|         return
 | |
| 
 | |
|       self.fail('Expected exception on bad request')
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_missing_storage(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     # Delete the storage for the layer.
 | |
|     path = model.storage.get_layer_path(layer.storage)
 | |
|     locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE']
 | |
|     storage.remove(locations, path)
 | |
|     storage.remove(locations, 'all_files_exist')
 | |
| 
 | |
|     with HTTMock(analyze_layer_success_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertEquals(False, layer.security_indexed)
 | |
|       self.assertEquals(1, layer.security_indexed_engine)
 | |
| 
 | |
| 
 | |
|   def test_analyze_layer_success_events(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     self.assertFalse(layer.security_indexed)
 | |
|     self.assertEquals(-1, layer.security_indexed_engine)
 | |
| 
 | |
|     # Ensure there are no existing events.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     with HTTMock(analyze_layer_success_mock, get_layer_success_mock, response_content):
 | |
|       analyzer = LayerAnalyzer(app.config, self.api)
 | |
|       analyzer.analyze_recursively(layer)
 | |
| 
 | |
|       layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
 | |
|       self.assertAnalyzed(layer, True, 1)
 | |
| 
 | |
|     # Ensure an event was written for the tag.
 | |
|     queue_item = notification_queue.get()
 | |
|     self.assertIsNotNone(queue_item)
 | |
| 
 | |
|     body = json.loads(queue_item.body)
 | |
|     self.assertEquals(['latest', 'prod'], body['event_data']['tags'])
 | |
|     self.assertEquals('CVE-2014-9471', body['event_data']['vulnerability']['id'])
 | |
|     self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
 | |
|     self.assertTrue(body['event_data']['vulnerability']['has_fix'])
 | |
| 
 | |
| 
 | |
|   def _get_notification_data(self, new_layer_ids, old_layer_ids, new_severity='Low'):
 | |
|     return {
 | |
|       "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
 | |
|       "Created": "1456247389",
 | |
|       "Notified": "1456246708",
 | |
|       "Limit": 2,
 | |
|       "New": {
 | |
|         "Vulnerability": {
 | |
|           "Name": "CVE-TEST",
 | |
|           "Namespace": "debian:8",
 | |
|           "Description": "New CVE",
 | |
|           "Severity": new_severity,
 | |
|           "FixedIn": [
 | |
|             {
 | |
|               "Name": "grep",
 | |
|               "Namespace": "debian:8",
 | |
|               "Version": "2.25"
 | |
|             }
 | |
|           ]
 | |
|         },
 | |
|         "LayersIntroducingVulnerability": new_layer_ids,
 | |
|       },
 | |
|       "Old": {
 | |
|         "Vulnerability": {
 | |
|           "Name": "CVE-TEST",
 | |
|           "Namespace": "debian:8",
 | |
|           "Description": "New CVE",
 | |
|           "Severity": "Low",
 | |
|           "FixedIn": []
 | |
|         },
 | |
|         "LayersIntroducingVulnerability": old_layer_ids,
 | |
|       }
 | |
|     }
 | |
| 
 | |
| 
 | |
|   def _get_delete_notification_data(self, old_layer_ids):
 | |
|     return {
 | |
|       "Name": "ec45ec87-bfc8-4129-a1c3-d2b82622175a",
 | |
|       "Created": "1456247389",
 | |
|       "Notified": "1456246708",
 | |
|       "Limit": 2,
 | |
|       "Old": {
 | |
|         "Vulnerability": {
 | |
|           "Name": "CVE-TEST",
 | |
|           "Namespace": "debian:8",
 | |
|           "Description": "New CVE",
 | |
|           "Severity": "Low",
 | |
|           "FixedIn": []
 | |
|         },
 | |
|         "LayersIntroducingVulnerability": old_layer_ids,
 | |
|       }
 | |
|     }
 | |
| 
 | |
|   def test_notification_new_layers_not_vulnerable(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
 | |
|     def get_matching_layer_not_vulnerable(url, request):
 | |
|       return json.dumps({
 | |
|         "Layer": {
 | |
|           "Name": layer_id,
 | |
|           "Namespace": "debian:8",
 | |
|           "IndexedByVersion": 1,
 | |
|           "Features": [
 | |
|             {
 | |
|               "Name": "coreutils",
 | |
|               "Namespace": "debian:8",
 | |
|               "Version": "8.23-4",
 | |
|               "Vulnerabilities": [], # Report not vulnerable.
 | |
|             }
 | |
|           ]
 | |
|         }
 | |
|       })
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Fire off the notification processing.
 | |
|     with HTTMock(get_matching_layer_not_vulnerable, response_content):
 | |
|       notification_data = self._get_notification_data([layer_id], [])
 | |
|       self.assertTrue(process_notification_data(notification_data))
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
| 
 | |
|   def test_notification_delete(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Fire off the notification processing.
 | |
|     notification_data = self._get_delete_notification_data([layer_id])
 | |
|     self.assertTrue(process_notification_data(notification_data))
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
| 
 | |
|   def test_notification_new_layers(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
 | |
|     def get_matching_layer_vulnerable(url, request):
 | |
|       return json.dumps({
 | |
|         "Layer": {
 | |
|           "Name": layer_id,
 | |
|           "Namespace": "debian:8",
 | |
|           "IndexedByVersion": 1,
 | |
|           "Features": [
 | |
|             {
 | |
|               "Name": "coreutils",
 | |
|               "Namespace": "debian:8",
 | |
|               "Version": "8.23-4",
 | |
|               "Vulnerabilities": [
 | |
|                 {
 | |
|                   "Name": "CVE-TEST",
 | |
|                   "Namespace": "debian:8",
 | |
|                   "Severity": "Low",
 | |
|                 }
 | |
|               ],
 | |
|             }
 | |
|           ]
 | |
|         }
 | |
|       })
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Fire off the notification processing.
 | |
|     with HTTMock(get_matching_layer_vulnerable, response_content):
 | |
|       notification_data = self._get_notification_data([layer_id], [])
 | |
|       self.assertTrue(process_notification_data(notification_data))
 | |
| 
 | |
|     # Ensure an event was written for the tag.
 | |
|     queue_item = notification_queue.get()
 | |
|     self.assertIsNotNone(queue_item)
 | |
| 
 | |
|     body = json.loads(queue_item.body)
 | |
|     self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
 | |
|     self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
 | |
|     self.assertEquals('Low', body['event_data']['vulnerability']['priority'])
 | |
|     self.assertTrue(body['event_data']['vulnerability']['has_fix'])
 | |
| 
 | |
| 
 | |
|   def test_notification_no_new_layers(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Fire off the notification processing.
 | |
|     with HTTMock(response_content):
 | |
|       notification_data = self._get_notification_data([layer_id], [layer_id])
 | |
|       self.assertTrue(process_notification_data(notification_data))
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
| 
 | |
|   def test_notification_no_new_layers_increased_severity(self):
 | |
|     layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|     # Add a repo event for the layer.
 | |
|     repo = model.repository.get_repository(ADMIN_ACCESS_USER, SIMPLE_REPO)
 | |
|     notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 100})
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
 | |
|     def get_matching_layer_vulnerable(url, request):
 | |
|       return json.dumps({
 | |
|         "Layer": {
 | |
|           "Name": layer_id,
 | |
|           "Namespace": "debian:8",
 | |
|           "IndexedByVersion": 1,
 | |
|           "Features": [
 | |
|             {
 | |
|               "Name": "coreutils",
 | |
|               "Namespace": "debian:8",
 | |
|               "Version": "8.23-4",
 | |
|               "Vulnerabilities": [
 | |
|                 {
 | |
|                   "Name": "CVE-TEST",
 | |
|                   "Namespace": "debian:8",
 | |
|                   "Severity": "Low",
 | |
|                 }
 | |
|               ],
 | |
|             }
 | |
|           ]
 | |
|         }
 | |
|       })
 | |
| 
 | |
|     # Ensure that there are no event queue items for the layer.
 | |
|     self.assertIsNone(notification_queue.get())
 | |
| 
 | |
|     # Fire off the notification processing.
 | |
|     with HTTMock(get_matching_layer_vulnerable, response_content):
 | |
|       notification_data = self._get_notification_data([layer_id], [layer_id], new_severity='Critical')
 | |
|       self.assertTrue(process_notification_data(notification_data))
 | |
| 
 | |
|     # Ensure an event was written for the tag.
 | |
|     queue_item = notification_queue.get()
 | |
|     self.assertIsNotNone(queue_item)
 | |
| 
 | |
|     body = json.loads(queue_item.body)
 | |
|     self.assertEquals(['prod', 'latest'], body['event_data']['tags'])
 | |
|     self.assertEquals('CVE-TEST', body['event_data']['vulnerability']['id'])
 | |
|     self.assertEquals('Critical', body['event_data']['vulnerability']['priority'])
 | |
|     self.assertTrue(body['event_data']['vulnerability']['has_fix'])
 | |
| 
 | |
|     # Verify that an event would be raised.
 | |
|     event_data = body['event_data']
 | |
|     self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
 | |
| 
 | |
|     # Create another notification with a matching level and verify it will be raised.
 | |
|     notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 1})
 | |
|     self.assertTrue(VulnerabilityFoundEvent().should_perform(event_data, notification))
 | |
| 
 | |
|     # Create another notification with a higher level and verify it won't be raised.
 | |
|     notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0})
 | |
|     self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
 | |
| 
 | |
| 
 | |
|   def test_notification_worker(self):
 | |
|     pages_called = []
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE')
 | |
|     def delete_notification(url, request):
 | |
|       pages_called.append('DELETE')
 | |
|       return {'status_code': 201, 'content': ''}
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET')
 | |
|     def get_notification(url, request):
 | |
|       if url.query.find('page=nextpage') >= 0:
 | |
|         pages_called.append('GET-2')
 | |
|         layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod', include_storage=True)
 | |
|         layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|         data = {
 | |
|           'Notification': self._get_notification_data([layer_id], [layer_id]),
 | |
|         }
 | |
| 
 | |
|         return json.dumps(data)
 | |
|       else:
 | |
|         pages_called.append('GET-1')
 | |
|         layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest', include_storage=True)
 | |
|         layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
| 
 | |
|         notification_data = self._get_notification_data([layer_id], [layer_id])
 | |
|         notification_data['NextPage'] = 'nextpage'
 | |
| 
 | |
|         data = {
 | |
|           'Notification': notification_data,
 | |
|         }
 | |
| 
 | |
|         return json.dumps(data)
 | |
| 
 | |
|     @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)')
 | |
|     def unknown_notification(url, request):
 | |
|       return {'status_code': 404, 'content': 'Unknown notification'}
 | |
| 
 | |
|     # Test with an unknown notification.
 | |
|     with HTTMock(get_notification, unknown_notification):
 | |
|       worker = SecurityNotificationWorker(None)
 | |
|       self.assertFalse(worker.perform_notification_work({
 | |
|         'Name': 'unknownnotification'
 | |
|       }))
 | |
| 
 | |
|     # Test with a known notification with pages.
 | |
|     data = {
 | |
|       'Name': 'somenotification'
 | |
|     }
 | |
| 
 | |
|     with HTTMock(get_notification, delete_notification, unknown_notification):
 | |
|       worker = SecurityNotificationWorker(None)
 | |
|       self.assertTrue(worker.perform_notification_work(data))
 | |
| 
 | |
|     self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   unittest.main() |