From ced01495208c179642663ed1be0e521d12db4398 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 6 Dec 2016 16:08:11 -0500 Subject: [PATCH] Implement helper classes for tracking streaming diffs, both indexed and non-indexed These classes will be used to handle the Layer ID paginated diffs from Clair. --- test/test_morecollections.py | 285 +++++++++++++++++++++++++++++++++++ test/test_secscan.py | 104 +++++++++++++ util/morecollections.py | 214 ++++++++++++++++++++++++++ util/secscan/notifier.py | 34 +++-- 4 files changed, 624 insertions(+), 13 deletions(-) create mode 100644 test/test_morecollections.py diff --git a/test/test_morecollections.py b/test/test_morecollections.py new file mode 100644 index 000000000..ad458f533 --- /dev/null +++ b/test/test_morecollections.py @@ -0,0 +1,285 @@ +import unittest + +from util.morecollections import (FastIndexList, StreamingDiffTracker, + IndexedStreamingDiffTracker) + +class FastIndexListTests(unittest.TestCase): + def test_basic_usage(self): + indexlist = FastIndexList() + + # Add 1 + indexlist.add(1) + self.assertEquals([1], indexlist.values()) + self.assertEquals(0, indexlist.index(1)) + + # Add 2 + indexlist.add(2) + self.assertEquals([1, 2], indexlist.values()) + self.assertEquals(0, indexlist.index(1)) + self.assertEquals(1, indexlist.index(2)) + + # Pop nothing. + indexlist.pop_until(-1) + self.assertEquals([1, 2], indexlist.values()) + self.assertEquals(0, indexlist.index(1)) + self.assertEquals(1, indexlist.index(2)) + + # Pop 1. + self.assertEquals([1], indexlist.pop_until(0)) + self.assertEquals([2], indexlist.values()) + self.assertIsNone(indexlist.index(1)) + self.assertEquals(0, indexlist.index(2)) + + # Add 3. + indexlist.add(3) + self.assertEquals([2, 3], indexlist.values()) + self.assertEquals(0, indexlist.index(2)) + self.assertEquals(1, indexlist.index(3)) + + # Pop 2, 3. + self.assertEquals([2, 3], indexlist.pop_until(1)) + self.assertEquals([], indexlist.values()) + self.assertIsNone(indexlist.index(1)) + self.assertIsNone(indexlist.index(2)) + self.assertIsNone(indexlist.index(3)) + + def test_popping(self): + indexlist = FastIndexList() + indexlist.add('hello') + indexlist.add('world') + indexlist.add('you') + indexlist.add('rock') + + self.assertEquals(0, indexlist.index('hello')) + self.assertEquals(1, indexlist.index('world')) + self.assertEquals(2, indexlist.index('you')) + self.assertEquals(3, indexlist.index('rock')) + + indexlist.pop_until(1) + self.assertEquals(0, indexlist.index('you')) + self.assertEquals(1, indexlist.index('rock')) + + +class IndexedStreamingDiffTrackerTests(unittest.TestCase): + def test_basic(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 3) + tracker.push_new([('a', 0), ('b', 1), ('c', 2)]) + tracker.push_old([('b', 1)]) + tracker.done() + + self.assertEquals(['a', 'c'], added) + + def test_same_streams(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 3) + tracker.push_new([('a', 0), ('b', 1), ('c', 2)]) + tracker.push_old([('a', 0), ('b', 1), ('c', 2)]) + tracker.done() + + self.assertEquals([], added) + + def test_only_new(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 3) + tracker.push_new([('a', 0), ('b', 1), ('c', 2)]) + tracker.push_old([]) + tracker.done() + + self.assertEquals(['a', 'b', 'c'], added) + + def test_pagination(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('a', 0), ('b', 1)]) + tracker.push_old([]) + + tracker.push_new([('c', 2)]) + tracker.push_old([]) + + tracker.done() + + self.assertEquals(['a', 'b', 'c'], added) + + def test_old_pagination(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('a', 10), ('b', 11)]) + tracker.push_old([('z', 1), ('y', 2)]) + + tracker.push_new([('c', 12)]) + tracker.push_old([('a', 10)]) + + tracker.done() + + self.assertEquals(['b', 'c'], added) + + def test_very_offset(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('a', 10), ('b', 11)]) + tracker.push_old([('z', 1), ('y', 2)]) + + tracker.push_new([('c', 12), ('d', 13)]) + tracker.push_old([('x', 3), ('w', 4)]) + + tracker.push_new([('e', 14)]) + tracker.push_old([('a', 10), ('d', 13)]) + + tracker.done() + + self.assertEquals(['b', 'c', 'e'], added) + + def test_many_old(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('z', 26), ('hello', 100)]) + tracker.push_old([('a', 1), ('b', 2)]) + + tracker.push_new([]) + tracker.push_old([('c', 1), ('d', 2)]) + + tracker.push_new([]) + tracker.push_old([('e', 3), ('f', 4)]) + + tracker.push_new([]) + tracker.push_old([('g', 5), ('z', 26)]) + + tracker.done() + + self.assertEquals(['hello'], added) + + def test_high_old_bound(self): + added = [] + + tracker = IndexedStreamingDiffTracker(added.append, 2) + tracker.push_new([('z', 26), ('hello', 100)]) + tracker.push_old([('end1', 999), ('end2', 1000)]) + + tracker.push_new([]) + tracker.push_old([]) + + tracker.done() + + self.assertEquals(['z', 'hello'], added) + + +class StreamingDiffTrackerTests(unittest.TestCase): + def test_basic(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 3) + tracker.push_new(['a', 'b', 'c']) + tracker.push_old(['b']) + tracker.done() + + self.assertEquals(['a', 'c'], added) + + def test_same_streams(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 3) + tracker.push_new(['a', 'b', 'c']) + tracker.push_old(['a', 'b', 'c']) + tracker.done() + + self.assertEquals([], added) + + def test_some_new(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 5) + tracker.push_new(['a', 'b', 'c', 'd', 'e']) + tracker.push_old(['a', 'b', 'c']) + tracker.done() + + self.assertEquals(['d', 'e'], added) + + def test_offset_new(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 5) + tracker.push_new(['b', 'c', 'd', 'e']) + tracker.push_old(['a', 'b', 'c']) + tracker.done() + + self.assertEquals(['d', 'e'], added) + + def test_multiple_calls(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 3) + tracker.push_new(['a', 'b', 'c']) + tracker.push_old(['b', 'd', 'e']) + + tracker.push_new(['f', 'g', 'h']) + tracker.push_old(['g', 'h']) + tracker.done() + + self.assertEquals(['a', 'c', 'f'], added) + + def test_empty_old(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 3) + tracker.push_new(['a', 'b', 'c']) + tracker.push_old([]) + + tracker.push_new(['f', 'g', 'h']) + tracker.push_old([]) + tracker.done() + + self.assertEquals(['a', 'b', 'c', 'f', 'g', 'h'], added) + + def test_more_old(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 2) + tracker.push_new(['c', 'd']) + tracker.push_old(['a', 'b']) + + tracker.push_new([]) + tracker.push_old(['c']) + tracker.done() + + self.assertEquals(['d'], added) + + def test_more_new(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 4) + tracker.push_new(['a', 'b', 'c', 'd']) + tracker.push_old(['r']) + + tracker.push_new(['e', 'f', 'r', 'z']) + tracker.push_old([]) + tracker.done() + + self.assertEquals(['a', 'b', 'c', 'd', 'e', 'f', 'z'], added) + + def test_more_new2(self): + added = [] + + tracker = StreamingDiffTracker(added.append, 4) + tracker.push_new(['a', 'b', 'c', 'd']) + tracker.push_old(['r']) + + tracker.push_new(['e', 'f', 'g', 'h']) + tracker.push_old([]) + + tracker.push_new(['i', 'j', 'r', 'z']) + tracker.push_old([]) + tracker.done() + + self.assertEquals(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'z'], added) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/test_secscan.py b/test/test_secscan.py index bb6fee94b..e6a9d3f1e 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -650,5 +650,109 @@ class TestSecurityScanner(unittest.TestCase): self.assertIsNotNone(notification_queue.get()) + def test_notification_worker_offset_pages(self): + + def get_layer_id(repo_name, tag): + # Create a repository notification for the repo, if it doesn't exist. + has_notification = model.notification.list_repo_notifications(ADMIN_ACCESS_USER, repo_name, + 'vulnerability_found') + if not list(has_notification): + repo = model.repository.get_repository(ADMIN_ACCESS_USER, repo_name) + model.notification.create_repo_notification(repo, 'vulnerability_found', + 'quay_notification', {}, {'level': 100}) + + layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, repo_name, tag, include_storage=True) + return '%s.%s' % (layer.docker_image_id, layer.storage.uuid) + + # Define offsetting sets of layer IDs, to test cross-pagination support. In this test, we + # will only serve 2 layer IDs per page: the first page will serve both of the 'New' layer IDs, + # but since the first 2 'Old' layer IDs are "earlier" than the shared ID of + # `devtable/simple:latest`, they won't get served in the 'New' list until the *second* page. The + # notification handling system should correctly not notify for this layer, even though it is + # marked 'New' on page 1 and marked 'Old' on page 2. In practice, Clair will served these IDs + # sorted in the same manner. + new_layer_ids = [get_layer_id('simple', 'latest'), get_layer_id('complex', 'prod')] + old_layer_ids = ['someid1', 'someid2', get_layer_id('simple', 'latest')] + + apis_called = [] + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/layers/(.+)') + def get_matching_layer_vulnerable(url, request): + apis_called.append('VULN') + return json.dumps({ + "Layer": { + "Name": 'somelayerid', + "Namespace": "debian:8", + "IndexedByVersion": 1, + "Features": [ + { + "Name": "coreutils", + "Namespace": "debian:8", + "Version": "8.23-4", + "Vulnerabilities": [ + { + "Name": "CVE-TEST", + "Namespace": "debian:8", + "Severity": "Low", + } + ], + } + ] + } + }) + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE') + def delete_notification(url, request): + apis_called.append('DELETE') + return {'status_code': 201, 'content': ''} + + @urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET') + def get_notification(url, request): + if url.query.find('page=nextpage') >= 0: + apis_called.append('GET-2') + + data = { + 'Notification': self._get_notification_data(new_layer_ids[2:], old_layer_ids[2:]), + } + + return json.dumps(data) + else: + apis_called.append('GET-1') + + notification_data = self._get_notification_data(new_layer_ids[0:2], old_layer_ids[0:2]) + notification_data['NextPage'] = 'nextpage' + + data = { + 'Notification': notification_data, + } + + return json.dumps(data) + + # Ensure that there are no event queue items for any layers. + self.assertIsNone(notification_queue.get()) + + # Test with a known notification with pages. + data = { + 'Name': 'somenotification' + } + + with HTTMock(get_notification, delete_notification, get_matching_layer_vulnerable): + worker = SecurityNotificationWorker(None) + self.assertTrue(worker.perform_notification_work(data)) + + # Verify each of the expected API calls were made. + self.assertEquals(set(['GET-1', 'GET-2', 'DELETE', 'VULN']), set(apis_called)) + + # Verify that we have notifications *just* for the New layer. + expected_item = notification_queue.get() + self.assertIsNotNone(expected_item) + item_body = json.loads(expected_item['body']) + self.assertEquals('devtable/complex', item_body['event_data']['repository']) + self.assertEquals(['prod'], item_body['event_data']['tags']) + + # Make sure we have no additional notifications. + self.assertIsNone(notification_queue.get()) + + if __name__ == '__main__': unittest.main() diff --git a/util/morecollections.py b/util/morecollections.py index 6d05c4d25..c9f5ff0cb 100644 --- a/util/morecollections.py +++ b/util/morecollections.py @@ -10,3 +10,217 @@ class AttrDict(dict): if isinstance(value, AttrDict): copy[key] = cls.deep_copy(value) return copy + + +class FastIndexList(object): + """ List which keeps track of the indicies of its items in a fast manner, and allows for + quick removal of items. + """ + def __init__(self): + self._list = [] + self._index_map = {} + self._index_offset = 0 + self._counter = 0 + + def add(self, item): + """ Adds an item to the index list. """ + self._list.append(item) + self._index_map[item] = self._counter + self._counter = self._counter + 1 + + def values(self): + """ Returns an iterable stream of all the values in the list. """ + return list(self._list) + + def index(self, value): + """ Returns the index of the given item in the list or None if none. """ + found = self._index_map.get(value, None) + if found is None: + return None + + return found - self._index_offset + + def pop_until(self, index_inclusive): + """ Pops off any items in the list until the given index, inclusive, and returns them. """ + values = self._list[0:index_inclusive+1] + for value in values: + self._index_map.pop(value, None) + + self._index_offset = self._index_offset + index_inclusive + 1 + self._list = self._list[index_inclusive+1:] + return values + + +class IndexedStreamingDiffTracker(object): + """ Helper class which tracks the difference between two streams of strings, + calling the `added` callback for strings when they are successfully verified + as being present in the first stream and not present in the second stream. + Unlike StreamingDiffTracker, this class expects each string value to have an + associated `index` value, which must be the same for equal values in both + streams and *must* be in order. This allows us to be a bit more efficient + in clearing up items that we know won't be present. The `index` is *not* + assumed to start at 0 or be contiguous, merely increasing. + """ + def __init__(self, reporter, result_per_stream): + self._reporter = reporter + self._reports_per_stream = result_per_stream + self._new_stream_finished = False + self._old_stream_finished = False + + self._new_stream = [] + self._old_stream = [] + + self._new_stream_map = {} + self._old_stream_map = {} + + def push_new(self, stream_tuples): + """ Pushes a list of values for the `New` stream. + """ + stream_tuples_list = list(stream_tuples) + assert len(stream_tuples_list) <= self._reports_per_stream + + if len(stream_tuples_list) < self._reports_per_stream: + self._new_stream_finished = True + + for (item, index) in stream_tuples_list: + if self._new_stream: + assert index > self._new_stream[-1].index + + self._new_stream_map[index] = item + self._new_stream.append(AttrDict(item=item, index=index)) + + self._process() + + def push_old(self, stream_tuples): + """ Pushes a list of values for the `Old` stream. + """ + if self._new_stream_finished and not self._new_stream: + # Nothing more to do. + return + + stream_tuples_list = list(stream_tuples) + assert len(stream_tuples_list) <= self._reports_per_stream + + if len(stream_tuples_list) < self._reports_per_stream: + self._old_stream_finished = True + + for (item, index) in stream_tuples: + if self._old_stream: + assert index > self._old_stream[-1].index + + self._old_stream_map[index] = item + self._old_stream.append(AttrDict(item=item, index=index)) + + self._process() + + def done(self): + self._old_stream_finished = True + self._process() + + def _process(self): + # Process any new items that can be reported. + old_lower_bound = self._old_stream[0].index if self._old_stream else -1 + for item_info in self._new_stream: + # If the new item's index <= the old_lower_bound, then we know + # we can check the old item map for it. + if item_info.index <= old_lower_bound or self._old_stream_finished: + if self._old_stream_map.get(item_info.index, None) is None: + self._reporter(item_info.item) + + # Remove the item from the map. + self._new_stream_map.pop(item_info.index, None) + + # Rebuild the new stream list (faster than just removing). + self._new_stream = [item_info for item_info in self._new_stream + if self._new_stream_map.get(item_info.index)] + + # Process any old items that can be removed. + new_lower_bound = self._new_stream[0].index if self._new_stream else -1 + for item_info in list(self._old_stream): + # Any items with indexes below the new lower bound can be removed, + # as any comparison from the new stream was done above. + if item_info.index < new_lower_bound: + self._old_stream_map.pop(item_info.index, None) + + # Rebuild the old stream list (faster than just removing). + self._old_stream = [item_info for item_info in self._old_stream + if self._old_stream_map.get(item_info.index)] + + + +class StreamingDiffTracker(object): + """ Helper class which tracks the difference between two streams of strings, calling the + `added` callback for strings when they are successfully verified as being present in + the first stream and not present in the second stream. This class requires that the + streams of strings be consistently ordered *in some way common to both* (but the + strings themselves do not need to be sorted). + """ + def __init__(self, reporter, result_per_stream): + self._reporter = reporter + self._reports_per_stream = result_per_stream + self._old_stream_finished = False + + self._old_stream = FastIndexList() + self._new_stream = FastIndexList() + + def done(self): + self._old_stream_finished = True + self.push_new([]) + + def push_new(self, stream_values): + """ Pushes a list of values for the `New` stream. + """ + + # Add all the new values to the list. + counter = 0 + for value in stream_values: + self._new_stream.add(value) + counter = counter + 1 + + assert counter <= self._reports_per_stream + + # Process them all to see if anything has changed. + for value in self._new_stream.values(): + old_index = self._old_stream.index(value) + if old_index is not None: + # The item is present, so we cannot report it. However, since we've reached this point, + # all items *before* this item in the `Old` stream are no longer necessary, so we can + # throw them out, along with this item. + self._old_stream.pop_until(old_index) + else: + # If the old stream has completely finished, then we can report, knowing no more old + # information will be present. + if self._old_stream_finished: + self._reporter(value) + self._new_stream.pop_until(self._new_stream.index(value)) + + def push_old(self, stream_values): + """ Pushes a stream of values for the `Old` stream. + """ + + if self._old_stream_finished: + return + + value_list = list(stream_values) + assert len(value_list) <= self._reports_per_stream + + for value in value_list: + # If the value exists in the new stream somewhere, then we know that all items *before* + # that index in the new stream will not be in the old stream, so we can report them. We can + # also remove the matching `New` item, as it is clearly in both streams. + new_index = self._new_stream.index(value) + if new_index is not None: + # Report all items up to the current item. + for item in self._new_stream.pop_until(new_index - 1): + self._reporter(item) + + # Remove the current item from the new stream. + self._new_stream.pop_until(0) + else: + # This item may be seen later. Add it to the old stream set. + self._old_stream.add(value) + + # Check to see if the `Old` stream has finished. + if len(value_list) < self._reports_per_stream: + self._old_stream_finished = True + diff --git a/util/secscan/notifier.py b/util/secscan/notifier.py index e1aa68731..a31905cad 100644 --- a/util/secscan/notifier.py +++ b/util/secscan/notifier.py @@ -1,6 +1,8 @@ import logging import sys +from enum import Enum + from collections import defaultdict from app import secscan_api @@ -10,21 +12,27 @@ from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repos from endpoints.notificationhelper import notification_batch from util.secscan import PRIORITY_LEVELS from util.secscan.api import APIRequestFailure -from util.morecollections import AttrDict +from util.morecollections import AttrDict, StreamingDiffTracker logger = logging.getLogger(__name__) -def process_notification_data(notification_data): - """ Processes the given notification data to spawn vulnerability notifications as necessary. - Returns whether the processing succeeded. - """ - if not 'New' in notification_data: - # Nothing to do. - return True +class ProcessNotificationPageResult(Enum): + FINISHED_PAGE = 'Finished Page' + FINISHED_PROCESSING = 'Finished Processing' + FAILED = 'Failed' - new_data = notification_data['New'] - old_data = notification_data.get('Old', {}) + +def process_notification_page_data(notification_page_data): + """ Processes the given notification page data to spawn vulnerability notifications as necessary. + Returns the status of the processing. + """ + if not 'New' in notification_page_data: + # Nothing more to do. + return ProcessNotificationPageResult.FINISHED_PROCESSING + + new_data = notification_page_data['New'] + old_data = notification_page_data.get('Old', {}) new_vuln = new_data['Vulnerability'] old_vuln = old_data.get('Vulnerability', {}) @@ -44,7 +52,7 @@ def process_notification_data(notification_data): if not notify_layers: # Nothing more to do. - return True + return ProcessNotificationPageResult.FINISHED_PAGE # Lookup the external event for when we have vulnerabilities. event = ExternalNotificationEvent.get(name='vulnerability_found') @@ -76,7 +84,7 @@ def process_notification_data(notification_data): try: is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) except APIRequestFailure: - return False + return ProcessNotificationPageResult.FAILED check_map[tag_layer_id] = is_vulerable @@ -110,5 +118,5 @@ def process_notification_data(notification_data): }) spawn_notification(repository, 'vulnerability_found', event_data) - return True + return ProcessNotificationPageResult.FINISHED_PAGE