Implement helper classes for tracking streaming diffs, both indexed and non-indexed

These classes will be used to handle the Layer ID paginated diffs from Clair.
This commit is contained in:
Joseph Schorr 2016-12-06 16:08:11 -05:00
parent a2ac62f5ce
commit ced0149520
4 changed files with 624 additions and 13 deletions

View file

@ -0,0 +1,285 @@
import unittest
from util.morecollections import (FastIndexList, StreamingDiffTracker,
IndexedStreamingDiffTracker)
class FastIndexListTests(unittest.TestCase):
def test_basic_usage(self):
indexlist = FastIndexList()
# Add 1
indexlist.add(1)
self.assertEquals([1], indexlist.values())
self.assertEquals(0, indexlist.index(1))
# Add 2
indexlist.add(2)
self.assertEquals([1, 2], indexlist.values())
self.assertEquals(0, indexlist.index(1))
self.assertEquals(1, indexlist.index(2))
# Pop nothing.
indexlist.pop_until(-1)
self.assertEquals([1, 2], indexlist.values())
self.assertEquals(0, indexlist.index(1))
self.assertEquals(1, indexlist.index(2))
# Pop 1.
self.assertEquals([1], indexlist.pop_until(0))
self.assertEquals([2], indexlist.values())
self.assertIsNone(indexlist.index(1))
self.assertEquals(0, indexlist.index(2))
# Add 3.
indexlist.add(3)
self.assertEquals([2, 3], indexlist.values())
self.assertEquals(0, indexlist.index(2))
self.assertEquals(1, indexlist.index(3))
# Pop 2, 3.
self.assertEquals([2, 3], indexlist.pop_until(1))
self.assertEquals([], indexlist.values())
self.assertIsNone(indexlist.index(1))
self.assertIsNone(indexlist.index(2))
self.assertIsNone(indexlist.index(3))
def test_popping(self):
indexlist = FastIndexList()
indexlist.add('hello')
indexlist.add('world')
indexlist.add('you')
indexlist.add('rock')
self.assertEquals(0, indexlist.index('hello'))
self.assertEquals(1, indexlist.index('world'))
self.assertEquals(2, indexlist.index('you'))
self.assertEquals(3, indexlist.index('rock'))
indexlist.pop_until(1)
self.assertEquals(0, indexlist.index('you'))
self.assertEquals(1, indexlist.index('rock'))
class IndexedStreamingDiffTrackerTests(unittest.TestCase):
def test_basic(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 3)
tracker.push_new([('a', 0), ('b', 1), ('c', 2)])
tracker.push_old([('b', 1)])
tracker.done()
self.assertEquals(['a', 'c'], added)
def test_same_streams(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 3)
tracker.push_new([('a', 0), ('b', 1), ('c', 2)])
tracker.push_old([('a', 0), ('b', 1), ('c', 2)])
tracker.done()
self.assertEquals([], added)
def test_only_new(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 3)
tracker.push_new([('a', 0), ('b', 1), ('c', 2)])
tracker.push_old([])
tracker.done()
self.assertEquals(['a', 'b', 'c'], added)
def test_pagination(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('a', 0), ('b', 1)])
tracker.push_old([])
tracker.push_new([('c', 2)])
tracker.push_old([])
tracker.done()
self.assertEquals(['a', 'b', 'c'], added)
def test_old_pagination(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('a', 10), ('b', 11)])
tracker.push_old([('z', 1), ('y', 2)])
tracker.push_new([('c', 12)])
tracker.push_old([('a', 10)])
tracker.done()
self.assertEquals(['b', 'c'], added)
def test_very_offset(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('a', 10), ('b', 11)])
tracker.push_old([('z', 1), ('y', 2)])
tracker.push_new([('c', 12), ('d', 13)])
tracker.push_old([('x', 3), ('w', 4)])
tracker.push_new([('e', 14)])
tracker.push_old([('a', 10), ('d', 13)])
tracker.done()
self.assertEquals(['b', 'c', 'e'], added)
def test_many_old(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('z', 26), ('hello', 100)])
tracker.push_old([('a', 1), ('b', 2)])
tracker.push_new([])
tracker.push_old([('c', 1), ('d', 2)])
tracker.push_new([])
tracker.push_old([('e', 3), ('f', 4)])
tracker.push_new([])
tracker.push_old([('g', 5), ('z', 26)])
tracker.done()
self.assertEquals(['hello'], added)
def test_high_old_bound(self):
added = []
tracker = IndexedStreamingDiffTracker(added.append, 2)
tracker.push_new([('z', 26), ('hello', 100)])
tracker.push_old([('end1', 999), ('end2', 1000)])
tracker.push_new([])
tracker.push_old([])
tracker.done()
self.assertEquals(['z', 'hello'], added)
class StreamingDiffTrackerTests(unittest.TestCase):
def test_basic(self):
added = []
tracker = StreamingDiffTracker(added.append, 3)
tracker.push_new(['a', 'b', 'c'])
tracker.push_old(['b'])
tracker.done()
self.assertEquals(['a', 'c'], added)
def test_same_streams(self):
added = []
tracker = StreamingDiffTracker(added.append, 3)
tracker.push_new(['a', 'b', 'c'])
tracker.push_old(['a', 'b', 'c'])
tracker.done()
self.assertEquals([], added)
def test_some_new(self):
added = []
tracker = StreamingDiffTracker(added.append, 5)
tracker.push_new(['a', 'b', 'c', 'd', 'e'])
tracker.push_old(['a', 'b', 'c'])
tracker.done()
self.assertEquals(['d', 'e'], added)
def test_offset_new(self):
added = []
tracker = StreamingDiffTracker(added.append, 5)
tracker.push_new(['b', 'c', 'd', 'e'])
tracker.push_old(['a', 'b', 'c'])
tracker.done()
self.assertEquals(['d', 'e'], added)
def test_multiple_calls(self):
added = []
tracker = StreamingDiffTracker(added.append, 3)
tracker.push_new(['a', 'b', 'c'])
tracker.push_old(['b', 'd', 'e'])
tracker.push_new(['f', 'g', 'h'])
tracker.push_old(['g', 'h'])
tracker.done()
self.assertEquals(['a', 'c', 'f'], added)
def test_empty_old(self):
added = []
tracker = StreamingDiffTracker(added.append, 3)
tracker.push_new(['a', 'b', 'c'])
tracker.push_old([])
tracker.push_new(['f', 'g', 'h'])
tracker.push_old([])
tracker.done()
self.assertEquals(['a', 'b', 'c', 'f', 'g', 'h'], added)
def test_more_old(self):
added = []
tracker = StreamingDiffTracker(added.append, 2)
tracker.push_new(['c', 'd'])
tracker.push_old(['a', 'b'])
tracker.push_new([])
tracker.push_old(['c'])
tracker.done()
self.assertEquals(['d'], added)
def test_more_new(self):
added = []
tracker = StreamingDiffTracker(added.append, 4)
tracker.push_new(['a', 'b', 'c', 'd'])
tracker.push_old(['r'])
tracker.push_new(['e', 'f', 'r', 'z'])
tracker.push_old([])
tracker.done()
self.assertEquals(['a', 'b', 'c', 'd', 'e', 'f', 'z'], added)
def test_more_new2(self):
added = []
tracker = StreamingDiffTracker(added.append, 4)
tracker.push_new(['a', 'b', 'c', 'd'])
tracker.push_old(['r'])
tracker.push_new(['e', 'f', 'g', 'h'])
tracker.push_old([])
tracker.push_new(['i', 'j', 'r', 'z'])
tracker.push_old([])
tracker.done()
self.assertEquals(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'z'], added)
if __name__ == '__main__':
unittest.main()

View file

@ -650,5 +650,109 @@ class TestSecurityScanner(unittest.TestCase):
self.assertIsNotNone(notification_queue.get())
def test_notification_worker_offset_pages(self):
def get_layer_id(repo_name, tag):
# Create a repository notification for the repo, if it doesn't exist.
has_notification = model.notification.list_repo_notifications(ADMIN_ACCESS_USER, repo_name,
'vulnerability_found')
if not list(has_notification):
repo = model.repository.get_repository(ADMIN_ACCESS_USER, repo_name)
model.notification.create_repo_notification(repo, 'vulnerability_found',
'quay_notification', {}, {'level': 100})
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, repo_name, tag, include_storage=True)
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
# Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we
# will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs,
# but since the first 2 'Old' layer IDs are "earlier" than the shared ID of
# `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page. The
# notification handling system should correctly not notify for this layer, even though it is
# marked 'New' on page 1 and marked 'Old' on page 2. In practice, Clair will served these IDs
# sorted in the same manner.
new_layer_ids = [get_layer_id('simple', 'latest'), get_layer_id('complex', 'prod')]
old_layer_ids = ['someid1', 'someid2', get_layer_id('simple', 'latest')]
apis_called = []
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)')
def get_matching_layer_vulnerable(url, request):
apis_called.append('VULN')
return json.dumps({
"Layer": {
"Name": 'somelayerid',
"Namespace": "debian:8",
"IndexedByVersion": 1,
"Features": [
{
"Name": "coreutils",
"Namespace": "debian:8",
"Version": "8.23-4",
"Vulnerabilities": [
{
"Name": "CVE-TEST",
"Namespace": "debian:8",
"Severity": "Low",
}
],
}
]
}
})
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE')
def delete_notification(url, request):
apis_called.append('DELETE')
return {'status_code': 201, 'content': ''}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET')
def get_notification(url, request):
if url.query.find('page=nextpage') >= 0:
apis_called.append('GET-2')
data = {
'Notification': self._get_notification_data(new_layer_ids[2:], old_layer_ids[2:]),
}
return json.dumps(data)
else:
apis_called.append('GET-1')
notification_data = self._get_notification_data(new_layer_ids[0:2], old_layer_ids[0:2])
notification_data['NextPage'] = 'nextpage'
data = {
'Notification': notification_data,
}
return json.dumps(data)
# Ensure that there are no event queue items for any layers.
self.assertIsNone(notification_queue.get())
# Test with a known notification with pages.
data = {
'Name': 'somenotification'
}
with HTTMock(get_notification, delete_notification, get_matching_layer_vulnerable):
worker = SecurityNotificationWorker(None)
self.assertTrue(worker.perform_notification_work(data))
# Verify each of the expected API calls were made.
self.assertEquals(set(['GET-1', 'GET-2', 'DELETE', 'VULN']), set(apis_called))
# Verify that we have notifications *just* for the New layer.
expected_item = notification_queue.get()
self.assertIsNotNone(expected_item)
item_body = json.loads(expected_item['body'])
self.assertEquals('devtable/complex', item_body['event_data']['repository'])
self.assertEquals(['prod'], item_body['event_data']['tags'])
# Make sure we have no additional notifications.
self.assertIsNone(notification_queue.get())
if __name__ == '__main__':
unittest.main()

View file

@ -10,3 +10,217 @@ class AttrDict(dict):
if isinstance(value, AttrDict):
copy[key] = cls.deep_copy(value)
return copy
class FastIndexList(object):
""" List which keeps track of the indicies of its items in a fast manner, and allows for
quick removal of items.
"""
def __init__(self):
self._list = []
self._index_map = {}
self._index_offset = 0
self._counter = 0
def add(self, item):
""" Adds an item to the index list. """
self._list.append(item)
self._index_map[item] = self._counter
self._counter = self._counter + 1
def values(self):
""" Returns an iterable stream of all the values in the list. """
return list(self._list)
def index(self, value):
""" Returns the index of the given item in the list or None if none. """
found = self._index_map.get(value, None)
if found is None:
return None
return found - self._index_offset
def pop_until(self, index_inclusive):
""" Pops off any items in the list until the given index, inclusive, and returns them. """
values = self._list[0:index_inclusive+1]
for value in values:
self._index_map.pop(value, None)
self._index_offset = self._index_offset + index_inclusive + 1
self._list = self._list[index_inclusive+1:]
return values
class IndexedStreamingDiffTracker(object):
""" Helper class which tracks the difference between two streams of strings,
calling the `added` callback for strings when they are successfully verified
as being present in the first stream and not present in the second stream.
Unlike StreamingDiffTracker, this class expects each string value to have an
associated `index` value, which must be the same for equal values in both
streams and *must* be in order. This allows us to be a bit more efficient
in clearing up items that we know won't be present. The `index` is *not*
assumed to start at 0 or be contiguous, merely increasing.
"""
def __init__(self, reporter, result_per_stream):
self._reporter = reporter
self._reports_per_stream = result_per_stream
self._new_stream_finished = False
self._old_stream_finished = False
self._new_stream = []
self._old_stream = []
self._new_stream_map = {}
self._old_stream_map = {}
def push_new(self, stream_tuples):
""" Pushes a list of values for the `New` stream.
"""
stream_tuples_list = list(stream_tuples)
assert len(stream_tuples_list) <= self._reports_per_stream
if len(stream_tuples_list) < self._reports_per_stream:
self._new_stream_finished = True
for (item, index) in stream_tuples_list:
if self._new_stream:
assert index > self._new_stream[-1].index
self._new_stream_map[index] = item
self._new_stream.append(AttrDict(item=item, index=index))
self._process()
def push_old(self, stream_tuples):
""" Pushes a list of values for the `Old` stream.
"""
if self._new_stream_finished and not self._new_stream:
# Nothing more to do.
return
stream_tuples_list = list(stream_tuples)
assert len(stream_tuples_list) <= self._reports_per_stream
if len(stream_tuples_list) < self._reports_per_stream:
self._old_stream_finished = True
for (item, index) in stream_tuples:
if self._old_stream:
assert index > self._old_stream[-1].index
self._old_stream_map[index] = item
self._old_stream.append(AttrDict(item=item, index=index))
self._process()
def done(self):
self._old_stream_finished = True
self._process()
def _process(self):
# Process any new items that can be reported.
old_lower_bound = self._old_stream[0].index if self._old_stream else -1
for item_info in self._new_stream:
# If the new item's index <= the old_lower_bound, then we know
# we can check the old item map for it.
if item_info.index <= old_lower_bound or self._old_stream_finished:
if self._old_stream_map.get(item_info.index, None) is None:
self._reporter(item_info.item)
# Remove the item from the map.
self._new_stream_map.pop(item_info.index, None)
# Rebuild the new stream list (faster than just removing).
self._new_stream = [item_info for item_info in self._new_stream
if self._new_stream_map.get(item_info.index)]
# Process any old items that can be removed.
new_lower_bound = self._new_stream[0].index if self._new_stream else -1
for item_info in list(self._old_stream):
# Any items with indexes below the new lower bound can be removed,
# as any comparison from the new stream was done above.
if item_info.index < new_lower_bound:
self._old_stream_map.pop(item_info.index, None)
# Rebuild the old stream list (faster than just removing).
self._old_stream = [item_info for item_info in self._old_stream
if self._old_stream_map.get(item_info.index)]
class StreamingDiffTracker(object):
""" Helper class which tracks the difference between two streams of strings, calling the
`added` callback for strings when they are successfully verified as being present in
the first stream and not present in the second stream. This class requires that the
streams of strings be consistently ordered *in some way common to both* (but the
strings themselves do not need to be sorted).
"""
def __init__(self, reporter, result_per_stream):
self._reporter = reporter
self._reports_per_stream = result_per_stream
self._old_stream_finished = False
self._old_stream = FastIndexList()
self._new_stream = FastIndexList()
def done(self):
self._old_stream_finished = True
self.push_new([])
def push_new(self, stream_values):
""" Pushes a list of values for the `New` stream.
"""
# Add all the new values to the list.
counter = 0
for value in stream_values:
self._new_stream.add(value)
counter = counter + 1
assert counter <= self._reports_per_stream
# Process them all to see if anything has changed.
for value in self._new_stream.values():
old_index = self._old_stream.index(value)
if old_index is not None:
# The item is present, so we cannot report it. However, since we've reached this point,
# all items *before* this item in the `Old` stream are no longer necessary, so we can
# throw them out, along with this item.
self._old_stream.pop_until(old_index)
else:
# If the old stream has completely finished, then we can report, knowing no more old
# information will be present.
if self._old_stream_finished:
self._reporter(value)
self._new_stream.pop_until(self._new_stream.index(value))
def push_old(self, stream_values):
""" Pushes a stream of values for the `Old` stream.
"""
if self._old_stream_finished:
return
value_list = list(stream_values)
assert len(value_list) <= self._reports_per_stream
for value in value_list:
# If the value exists in the new stream somewhere, then we know that all items *before*
# that index in the new stream will not be in the old stream, so we can report them. We can
# also remove the matching `New` item, as it is clearly in both streams.
new_index = self._new_stream.index(value)
if new_index is not None:
# Report all items up to the current item.
for item in self._new_stream.pop_until(new_index - 1):
self._reporter(item)
# Remove the current item from the new stream.
self._new_stream.pop_until(0)
else:
# This item may be seen later. Add it to the old stream set.
self._old_stream.add(value)
# Check to see if the `Old` stream has finished.
if len(value_list) < self._reports_per_stream:
self._old_stream_finished = True

View file

@ -1,6 +1,8 @@
import logging
import sys
from enum import Enum
from collections import defaultdict
from app import secscan_api
@ -10,21 +12,27 @@ from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repos
from endpoints.notificationhelper import notification_batch
from util.secscan import PRIORITY_LEVELS
from util.secscan.api import APIRequestFailure
from util.morecollections import AttrDict
from util.morecollections import AttrDict, StreamingDiffTracker
logger = logging.getLogger(__name__)
def process_notification_data(notification_data):
""" Processes the given notification data to spawn vulnerability notifications as necessary.
Returns whether the processing succeeded.
"""
if not 'New' in notification_data:
# Nothing to do.
return True
class ProcessNotificationPageResult(Enum):
FINISHED_PAGE = 'Finished Page'
FINISHED_PROCESSING = 'Finished Processing'
FAILED = 'Failed'
new_data = notification_data['New']
old_data = notification_data.get('Old', {})
def process_notification_page_data(notification_page_data):
""" Processes the given notification page data to spawn vulnerability notifications as necessary.
Returns the status of the processing.
"""
if not 'New' in notification_page_data:
# Nothing more to do.
return ProcessNotificationPageResult.FINISHED_PROCESSING
new_data = notification_page_data['New']
old_data = notification_page_data.get('Old', {})
new_vuln = new_data['Vulnerability']
old_vuln = old_data.get('Vulnerability', {})
@ -44,7 +52,7 @@ def process_notification_data(notification_data):
if not notify_layers:
# Nothing more to do.
return True
return ProcessNotificationPageResult.FINISHED_PAGE
# Lookup the external event for when we have vulnerabilities.
event = ExternalNotificationEvent.get(name='vulnerability_found')
@ -76,7 +84,7 @@ def process_notification_data(notification_data):
try:
is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id)
except APIRequestFailure:
return False
return ProcessNotificationPageResult.FAILED
check_map[tag_layer_id] = is_vulerable
@ -110,5 +118,5 @@ def process_notification_data(notification_data):
})
spawn_notification(repository, 'vulnerability_found', event_data)
return True
return ProcessNotificationPageResult.FINISHED_PAGE