import logging import sys from collections import defaultdict from app import secscan_api from data.model.tag import filter_tags_have_repository_event, get_matching_tags from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository, RepositoryTag) from endpoints.notificationhelper import spawn_notification from util.secscan import PRIORITY_LEVELS from util.secscan.api import APIRequestFailure logger = logging.getLogger(__name__) def process_notification_data(notification_data): """ Processes the given notification data to spawn vulnerability notifications as necessary. Returns whether the processing succeeded. """ if not 'New' in notification_data: # Nothing to do. return True new_data = notification_data['New'] old_data = notification_data.get('Old', {}) new_vuln = new_data['Vulnerability'] old_vuln = old_data.get('Vulnerability', {}) new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', [])) old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', [])) new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint}) # By default we only notify the new layers that are affected by the vulnerability. If, however, # the severity of the vulnerability has increased, we need to notify *all* layers, as we might # need to send new notifications for older layers. notify_layers = new_layer_ids - old_layer_ids if new_severity['index'] < old_severity['index']: notify_layers = new_layer_ids | old_layer_ids if not notify_layers: # Nothing more to do. return True # Lookup the external event for when we have vulnerabilities. event = ExternalNotificationEvent.get(name='vulnerability_found') # For each layer, retrieving the matching tags and join with repository to determine which # require new notifications. tag_map = defaultdict(set) repository_map = {} cve_id = new_vuln['Name'] # Find all tags that contain the layer(s) introducing the vulnerability, # in repositories that have the event setup. for layer_id in notify_layers: # Split the layer ID into its Docker Image ID and storage ID. (docker_image_id, storage_uuid) = layer_id.split('.', 2) # Find the matching tags. matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository, Image, ImageStorage) tags = list(filter_tags_have_repository_event(matching, event)) check_map = {} for tag in tags: # Verify that the tag's root image has the vulnerability. tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) if not tag_layer_id in check_map: try: is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) except APIRequestFailure: return False check_map[tag_layer_id] = is_vulerable logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, check_map[tag_layer_id]) if check_map[tag_layer_id]: # Add the vulnerable tag to the list. tag_map[tag.repository_id].add(tag.name) repository_map[tag.repository_id] = tag.repository # For each of the tags found, issue a notification. for repository_id in tag_map: tags = tag_map[repository_id] event_data = { 'tags': list(tags), 'vulnerability': { 'id': cve_id, 'description': new_vuln.get('Description', None), 'link': new_vuln.get('Link', None), 'priority': new_severity['title'], 'has_fix': 'FixedIn' in new_vuln, }, } spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) return True