Merge pull request #2249 from coreos-inc/notifier-fixes

Security notification pagination fix
This commit is contained in:
josephschorr 2017-01-17 11:33:25 -05:00 committed by GitHub
commit eb2cafacd4
6 changed files with 810 additions and 113 deletions

View file

@ -10,3 +10,217 @@ class AttrDict(dict):
if isinstance(value, AttrDict):
copy[key] = cls.deep_copy(value)
return copy
class FastIndexList(object):
""" List which keeps track of the indicies of its items in a fast manner, and allows for
quick removal of items.
"""
def __init__(self):
self._list = []
self._index_map = {}
self._index_offset = 0
self._counter = 0
def add(self, item):
""" Adds an item to the index list. """
self._list.append(item)
self._index_map[item] = self._counter
self._counter = self._counter + 1
def values(self):
""" Returns an iterable stream of all the values in the list. """
return list(self._list)
def index(self, value):
""" Returns the index of the given item in the list or None if none. """
found = self._index_map.get(value, None)
if found is None:
return None
return found - self._index_offset
def pop_until(self, index_inclusive):
""" Pops off any items in the list until the given index, inclusive, and returns them. """
values = self._list[0:index_inclusive+1]
for value in values:
self._index_map.pop(value, None)
self._index_offset = self._index_offset + index_inclusive + 1
self._list = self._list[index_inclusive+1:]
return values
class IndexedStreamingDiffTracker(object):
""" Helper class which tracks the difference between two streams of strings,
calling the `added` callback for strings when they are successfully verified
as being present in the first stream and not present in the second stream.
Unlike StreamingDiffTracker, this class expects each string value to have an
associated `index` value, which must be the same for equal values in both
streams and *must* be in order. This allows us to be a bit more efficient
in clearing up items that we know won't be present. The `index` is *not*
assumed to start at 0 or be contiguous, merely increasing.
"""
def __init__(self, reporter, result_per_stream):
self._reporter = reporter
self._reports_per_stream = result_per_stream
self._new_stream_finished = False
self._old_stream_finished = False
self._new_stream = []
self._old_stream = []
self._new_stream_map = {}
self._old_stream_map = {}
def push_new(self, stream_tuples):
""" Pushes a list of values for the `New` stream.
"""
stream_tuples_list = list(stream_tuples)
assert len(stream_tuples_list) <= self._reports_per_stream
if len(stream_tuples_list) < self._reports_per_stream:
self._new_stream_finished = True
for (item, index) in stream_tuples_list:
if self._new_stream:
assert index > self._new_stream[-1].index
self._new_stream_map[index] = item
self._new_stream.append(AttrDict(item=item, index=index))
self._process()
def push_old(self, stream_tuples):
""" Pushes a list of values for the `Old` stream.
"""
if self._new_stream_finished and not self._new_stream:
# Nothing more to do.
return
stream_tuples_list = list(stream_tuples)
assert len(stream_tuples_list) <= self._reports_per_stream
if len(stream_tuples_list) < self._reports_per_stream:
self._old_stream_finished = True
for (item, index) in stream_tuples:
if self._old_stream:
assert index > self._old_stream[-1].index
self._old_stream_map[index] = item
self._old_stream.append(AttrDict(item=item, index=index))
self._process()
def done(self):
self._old_stream_finished = True
self._process()
def _process(self):
# Process any new items that can be reported.
old_lower_bound = self._old_stream[0].index if self._old_stream else -1
for item_info in self._new_stream:
# If the new item's index <= the old_lower_bound, then we know
# we can check the old item map for it.
if item_info.index <= old_lower_bound or self._old_stream_finished:
if self._old_stream_map.get(item_info.index, None) is None:
self._reporter(item_info.item)
# Remove the item from the map.
self._new_stream_map.pop(item_info.index, None)
# Rebuild the new stream list (faster than just removing).
self._new_stream = [item_info for item_info in self._new_stream
if self._new_stream_map.get(item_info.index)]
# Process any old items that can be removed.
new_lower_bound = self._new_stream[0].index if self._new_stream else -1
for item_info in list(self._old_stream):
# Any items with indexes below the new lower bound can be removed,
# as any comparison from the new stream was done above.
if item_info.index < new_lower_bound:
self._old_stream_map.pop(item_info.index, None)
# Rebuild the old stream list (faster than just removing).
self._old_stream = [item_info for item_info in self._old_stream
if self._old_stream_map.get(item_info.index)]
class StreamingDiffTracker(object):
""" Helper class which tracks the difference between two streams of strings, calling the
`added` callback for strings when they are successfully verified as being present in
the first stream and not present in the second stream. This class requires that the
streams of strings be consistently ordered *in some way common to both* (but the
strings themselves do not need to be sorted).
"""
def __init__(self, reporter, result_per_stream):
self._reporter = reporter
self._reports_per_stream = result_per_stream
self._old_stream_finished = False
self._old_stream = FastIndexList()
self._new_stream = FastIndexList()
def done(self):
self._old_stream_finished = True
self.push_new([])
def push_new(self, stream_values):
""" Pushes a list of values for the `New` stream.
"""
# Add all the new values to the list.
counter = 0
for value in stream_values:
self._new_stream.add(value)
counter = counter + 1
assert counter <= self._reports_per_stream
# Process them all to see if anything has changed.
for value in self._new_stream.values():
old_index = self._old_stream.index(value)
if old_index is not None:
# The item is present, so we cannot report it. However, since we've reached this point,
# all items *before* this item in the `Old` stream are no longer necessary, so we can
# throw them out, along with this item.
self._old_stream.pop_until(old_index)
else:
# If the old stream has completely finished, then we can report, knowing no more old
# information will be present.
if self._old_stream_finished:
self._reporter(value)
self._new_stream.pop_until(self._new_stream.index(value))
def push_old(self, stream_values):
""" Pushes a stream of values for the `Old` stream.
"""
if self._old_stream_finished:
return
value_list = list(stream_values)
assert len(value_list) <= self._reports_per_stream
for value in value_list:
# If the value exists in the new stream somewhere, then we know that all items *before*
# that index in the new stream will not be in the old stream, so we can report them. We can
# also remove the matching `New` item, as it is clearly in both streams.
new_index = self._new_stream.index(value)
if new_index is not None:
# Report all items up to the current item.
for item in self._new_stream.pop_until(new_index - 1):
self._reporter(item)
# Remove the current item from the new stream.
self._new_stream.pop_until(0)
else:
# This item may be seen later. Add it to the old stream set.
self._old_stream.add(value)
# Check to see if the `Old` stream has finished.
if len(value_list) < self._reports_per_stream:
self._old_stream_finished = True

View file

@ -58,15 +58,22 @@ class FakeSecurityScanner(object):
""" Returns whether a notification with the given ID is found in the scanner. """
return notification_id in self.notifications
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln):
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln, max_per_page=100,
indexed_old_layer_ids=None, indexed_new_layer_ids=None):
""" Adds a new notification over the given sets of layer IDs and vulnerability information,
returning the structural data of the notification created.
"""
notification_id = str(uuid.uuid4())
if old_vuln is None:
old_vuln = dict(new_vuln)
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
new_layer_ids=new_layer_ids,
old_vuln=old_vuln,
new_vuln=new_vuln)
new_vuln=new_vuln,
max_per_page=max_per_page,
indexed_old_layer_ids=indexed_old_layer_ids,
indexed_new_layer_ids=indexed_new_layer_ids)
return self._get_notification_data(notification_id, 0, 100)
@ -106,6 +113,8 @@ class FakeSecurityScanner(object):
""" Returns the structural data for the notification with the given ID, paginated using
the given page and limit. """
notification = self.notifications[notification_id]
limit = min(limit, notification['max_per_page'])
notification_data = {
"Name": notification_id,
"Created": "1456247389",
@ -127,6 +136,11 @@ class FakeSecurityScanner(object):
'LayersIntroducingVulnerability': old_layer_ids,
}
if notification.get('indexed_old_layer_ids', None):
indexed_old_layer_ids = notification['indexed_old_layer_ids'][start_index:end_index]
notification_data['Old']['OrderedLayersIntroducingVulnerability'] = indexed_old_layer_ids
if notification.get('new_vuln'):
new_layer_ids = notification['new_layer_ids']
new_layer_ids = new_layer_ids[start_index:end_index]
@ -137,6 +151,11 @@ class FakeSecurityScanner(object):
'LayersIntroducingVulnerability': new_layer_ids,
}
if notification.get('indexed_new_layer_ids', None):
indexed_new_layer_ids = notification['indexed_new_layer_ids'][start_index:end_index]
notification_data['New']['OrderedLayersIntroducingVulnerability'] = indexed_new_layer_ids
if has_additional_page:
notification_data['NextPage'] = str(page+1)

View file

@ -2,6 +2,7 @@ import logging
import sys
from collections import defaultdict
from enum import Enum
from app import secscan_api
from data.model.tag import filter_tags_have_repository_event, get_matching_tags
@ -10,105 +11,169 @@ from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repos
from endpoints.notificationhelper import notification_batch
from util.secscan import PRIORITY_LEVELS
from util.secscan.api import APIRequestFailure
from util.morecollections import AttrDict
from util.morecollections import AttrDict, StreamingDiffTracker, IndexedStreamingDiffTracker
logger = logging.getLogger(__name__)
class ProcessNotificationPageResult(Enum):
FINISHED_PAGE = 'Finished Page'
FINISHED_PROCESSING = 'Finished Processing'
FAILED = 'Failed'
def process_notification_data(notification_data):
""" Processes the given notification data to spawn vulnerability notifications as necessary.
Returns whether the processing succeeded.
class SecurityNotificationHandler(object):
""" Class to process paginated notifications from the security scanner and issue
Quay vulnerability_found notifications for all necessary tags. Callers should
initialize, call process_notification_page_data for each page until it returns
FINISHED_PROCESSING or FAILED and, if succeeded, then call send_notifications
to send out the notifications queued.
"""
if not 'New' in notification_data:
# Nothing to do.
return True
def __init__(self, results_per_stream):
self.tag_map = defaultdict(set)
self.repository_map = {}
self.check_map = {}
new_data = notification_data['New']
old_data = notification_data.get('Old', {})
self.stream_tracker = None
self.results_per_stream = results_per_stream
self.reporting_failed = False
self.vulnerability_info = None
new_vuln = new_data['Vulnerability']
old_vuln = old_data.get('Vulnerability', {})
self.event = ExternalNotificationEvent.get(name='vulnerability_found')
new_layer_ids = set(new_data.get('LayersIntroducingVulnerability', []))
old_layer_ids = set(old_data.get('LayersIntroducingVulnerability', []))
def send_notifications(self):
""" Sends all queued up notifications. """
if self.vulnerability_info is None:
return
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
new_vuln = self.vulnerability_info
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
# By default we only notify the new layers that are affected by the vulnerability. If, however,
# the severity of the vulnerability has increased, we need to notify *all* layers, as we might
# need to send new notifications for older layers.
notify_layers = new_layer_ids - old_layer_ids
if new_severity['index'] < old_severity['index']:
notify_layers = new_layer_ids | old_layer_ids
# For each of the tags found, issue a notification.
with notification_batch() as spawn_notification:
for repository_id in self.tag_map:
tags = self.tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': new_vuln['Name'],
'description': new_vuln.get('Description', None),
'link': new_vuln.get('Link', None),
'priority': new_severity['title'],
'has_fix': 'FixedIn' in new_vuln,
},
}
if not notify_layers:
# Nothing more to do.
return True
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repository = AttrDict({
'namespace_name': self.repository_map[repository_id].namespace_user.username,
'name': self.repository_map[repository_id].name,
})
# Lookup the external event for when we have vulnerabilities.
event = ExternalNotificationEvent.get(name='vulnerability_found')
spawn_notification(repository, 'vulnerability_found', event_data)
# For each layer, retrieving the matching tags and join with repository to determine which
# require new notifications.
tag_map = defaultdict(set)
repository_map = {}
cve_id = new_vuln['Name']
def process_notification_page_data(self, notification_page_data):
""" Processes the given notification page data to spawn vulnerability notifications as
necessary. Returns the status of the processing.
"""
if not 'New' in notification_page_data:
return self._done()
# Find all tags that contain the layer(s) introducing the vulnerability,
# in repositories that have the event setup.
for layer_id in notify_layers:
new_data = notification_page_data['New']
old_data = notification_page_data.get('Old', {})
new_vuln = new_data['Vulnerability']
old_vuln = old_data.get('Vulnerability', {})
self.vulnerability_info = new_vuln
new_layer_ids = new_data.get('LayersIntroducingVulnerability', [])
old_layer_ids = old_data.get('LayersIntroducingVulnerability', [])
new_severity = PRIORITY_LEVELS.get(new_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
old_severity = PRIORITY_LEVELS.get(old_vuln.get('Severity', 'Unknown'), {'index': sys.maxint})
# Check if the severity of the vulnerability has increased. If so, then we report this
# vulnerability for *all* layers, rather than a difference, as it is important for everyone.
if new_severity['index'] < old_severity['index']:
# The vulnerability has had its severity increased. Report for *all* layers.
all_layer_ids = set(new_layer_ids) | set(old_layer_ids)
for layer_id in all_layer_ids:
self._report(layer_id)
if 'NextPage' not in notification_page_data:
return self._done()
else:
return ProcessNotificationPageResult.FINISHED_PAGE
# Otherwise, only send the notification to new layers. To find only the new layers, we
# need to do a streaming diff vs the old layer IDs stream.
# Check for ordered data. If found, we use the indexed tracker, which is faster and
# more memory efficient.
is_indexed = False
if 'OrderedLayersIntroducingVulnerability' in new_data:
def tuplize(stream):
return [(entry['LayerName'], entry['Index']) for entry in stream]
new_layer_ids = tuplize(new_data.get('OrderedLayersIntroducingVulnerability', []))
old_layer_ids = tuplize(old_data.get('OrderedLayersIntroducingVulnerability', []))
is_indexed = True
# If this is the first call, initialize the tracker.
if self.stream_tracker is None:
self.stream_tracker = (IndexedStreamingDiffTracker(self._report, self.results_per_stream)
if is_indexed
else StreamingDiffTracker(self._report, self.results_per_stream))
# Call to add the old and new layer ID streams to the tracker. The tracker itself will
# call _report whenever it has determined a new layer has been found.
self.stream_tracker.push_new(new_layer_ids)
self.stream_tracker.push_old(old_layer_ids)
# If the reporting failed at any point, nothing more we can do.
if self.reporting_failed:
return ProcessNotificationPageResult.FAILED
# Check to see if there are any additional pages to process.
if 'NextPage' not in notification_page_data:
return self._done()
else:
return ProcessNotificationPageResult.FINISHED_PAGE
def _done(self):
if self.stream_tracker is not None:
self.stream_tracker.done()
if self.reporting_failed:
return ProcessNotificationPageResult.FAILED
return ProcessNotificationPageResult.FINISHED_PROCESSING
def _report(self, new_layer_id):
# Split the layer ID into its Docker Image ID and storage ID.
(docker_image_id, storage_uuid) = layer_id.split('.', 2)
(docker_image_id, storage_uuid) = new_layer_id.split('.', 2)
# Find the matching tags.
matching = get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, Repository,
Image, ImageStorage)
tags = list(filter_tags_have_repository_event(matching, event))
tags = list(filter_tags_have_repository_event(matching, self.event))
check_map = {}
cve_id = self.vulnerability_info['Name']
for tag in tags:
# Verify that the tag's root image has the vulnerability.
# Verify that the tag's *top layer* has the vulnerability.
tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid)
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)
if not tag_layer_id in check_map:
if not tag_layer_id in self.check_map:
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)
try:
is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id)
self.check_map[tag_layer_id] = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id)
except APIRequestFailure:
return False
check_map[tag_layer_id] = is_vulerable
self.reporting_failed = True
return
logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id,
check_map[tag_layer_id])
if check_map[tag_layer_id]:
self.check_map[tag_layer_id])
if self.check_map[tag_layer_id]:
# Add the vulnerable tag to the list.
tag_map[tag.repository_id].add(tag.name)
repository_map[tag.repository_id] = tag.repository
# For each of the tags found, issue a notification.
with notification_batch() as spawn_notification:
for repository_id in tag_map:
tags = tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': cve_id,
'description': new_vuln.get('Description', None),
'link': new_vuln.get('Link', None),
'priority': new_severity['title'],
'has_fix': 'FixedIn' in new_vuln,
},
}
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
repository = AttrDict({
'namespace_name': repository_map[repository_id].namespace_user.username,
'name': repository_map[repository_id].name,
})
spawn_notification(repository, 'vulnerability_found', event_data)
return True
self.tag_map[tag.repository_id].add(tag.name)
self.repository_map[tag.repository_id] = tag.repository