From 21e30014462985cda8667a0c1df93e47a1220f90 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 5 Dec 2016 23:58:20 -0500 Subject: [PATCH] Add a bulk insert for queue and notifications. Use it for Clair spawned notifications. --- data/queue.py | 41 ++++++++++++++++++++++++++++----- endpoints/notificationhelper.py | 39 +++++++++++++++++++++++-------- test/test_queue.py | 27 ++++++++++++++++++++-- util/secscan/notifier.py | 39 ++++++++++++++++--------------- 4 files changed, 110 insertions(+), 36 deletions(-) diff --git a/data/queue.py b/data/queue.py index 87cb6b9fa..8613f7fe7 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,10 +1,12 @@ from datetime import datetime, timedelta +from contextlib import contextmanager from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict MINIMUM_EXTENSION = timedelta(seconds=20) +DEFAULT_BATCH_SIZE = 1000 class NoopWith: @@ -160,18 +162,45 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): - """ - Put an item, if it shouldn't be processed for some number of seconds, - specify that amount as available_after. Returns the ID of the queue item added. - """ - item = QueueItem.create( + def _queue_body(self, canonical_name_list, message, available_after, retries_remaining): + return dict( queue_name=self._canonical_name([self._queue_name] + canonical_name_list), body=message, retries_remaining=retries_remaining, available_after=datetime.utcnow() + timedelta(seconds=available_after or 0), ) + @contextmanager + def batch_insert(self, batch_size=DEFAULT_BATCH_SIZE): + items_to_insert = [] + def batch_put(canonical_name_list, message, available_after=0, retries_remaining=5): + """ + Put an item, if it shouldn't be processed for some number of seconds, + specify that amount as available_after. Returns the ID of the queue item added. + """ + items_to_insert.append(self._queue_body(canonical_name_list, message, available_after, + retries_remaining)) + + yield batch_put + + # Chunk the inserted items into batch_size chunks and insert_many + remaining = list(items_to_insert) + while remaining: + QueueItem.insert_many(remaining[0:batch_size]).execute() + remaining = remaining[batch_size:] + + if self._metric_queue: + self._metric_queue.put_deprecated('Added', len(items_to_insert), + dimensions={'queue': self._queue_name}) + + def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): + """ + Put an item, if it shouldn't be processed for some number of seconds, + specify that amount as available_after. Returns the ID of the queue item added. + """ + item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after, + retries_remaining)) + if self._metric_queue: self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name}) diff --git a/endpoints/notificationhelper.py b/endpoints/notificationhelper.py index 3ec7c16eb..66d7ac0dc 100644 --- a/endpoints/notificationhelper.py +++ b/endpoints/notificationhelper.py @@ -1,10 +1,15 @@ import json +from contextlib import contextmanager + from app import app, notification_queue from data import model from auth.auth_context import get_authenticated_user, get_validated_oauth_token +DEFAULT_BATCH_SIZE = 1000 + + def build_event_data(repo, extra_data={}, subpage=None): repo_string = '%s/%s' % (repo.namespace_name, repo.name) homepage = '%s://%s/repository/%s' % (app.config['PREFERRED_URL_SCHEME'], @@ -50,14 +55,30 @@ def build_notification_data(notification, event_data, performer_data=None): } +@contextmanager +def notification_batch(batch_size=DEFAULT_BATCH_SIZE): + """ + Context manager implementation which returns a target callable with the same signature + as spawn_notification. When the the context block exits the notifications generated by + the callable will be bulk inserted into the queue with the specified batch size. + """ + with notification_queue.batch_insert(batch_size) as queue_put: + def spawn_notification_batch(repo, event_name, extra_data={}, subpage=None, pathargs=[], + performer_data=None): + event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage) + + notifications = model.notification.list_repo_notifications(repo.namespace_name, + repo.name, + event_name=event_name) + path = [repo.namespace_name, repo.name, event_name] + pathargs + for notification in list(notifications): + notification_data = build_notification_data(notification, event_data, performer_data) + queue_put(path, json.dumps(notification_data)) + + yield spawn_notification_batch + + def spawn_notification(repo, event_name, extra_data={}, subpage=None, pathargs=[], performer_data=None): - event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage) - - notifications = model.notification.list_repo_notifications(repo.namespace_name, - repo.name, - event_name=event_name) - for notification in list(notifications): - notification_data = build_notification_data(notification, event_data, performer_data) - path = [repo.namespace_name, repo.name, event_name] + pathargs - notification_queue.put(path, json.dumps(notification_data)) + with notification_batch(1) as batch_spawn: + batch_spawn(repo, event_name, extra_data, subpage, pathargs, performer_data) diff --git a/test/test_queue.py b/test/test_queue.py index 7b290c2df..066e17234 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -67,8 +67,9 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, None) self.assertEqual(self.reporter.total, None) - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1) + id_1 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)) + id_2 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)) + self.assertEqual(id_1 + 1, id_2) self.assertEqual(self.reporter.currently_processing, False) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) @@ -202,6 +203,28 @@ class TestQueue(QueueTestCase): msg = str(json_body['data']) self.assertIn(msg, seen) + def test_bulk_insert(self): + self.assertEqual(self.reporter.currently_processing, None) + self.assertEqual(self.reporter.running_count, None) + self.assertEqual(self.reporter.total, None) + + with self.queue.batch_insert() as queue_put: + queue_put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + queue_put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1) + + self.queue.update_metrics() + self.assertEqual(self.reporter.currently_processing, False) + self.assertEqual(self.reporter.running_count, 0) + self.assertEqual(self.reporter.total, 1) + + with self.queue.batch_insert() as queue_put: + queue_put(['abd', 'def'], self.TEST_MESSAGE_1, available_after=-1) + queue_put(['abd', 'ghi'], self.TEST_MESSAGE_2, available_after=-1) + + self.queue.update_metrics() + self.assertEqual(self.reporter.currently_processing, False) + self.assertEqual(self.reporter.running_count, 0) + self.assertEqual(self.reporter.total, 3) if __name__ == '__main__': unittest.main() diff --git a/util/secscan/notifier.py b/util/secscan/notifier.py index 908e5668a..e1aa68731 100644 --- a/util/secscan/notifier.py +++ b/util/secscan/notifier.py @@ -7,7 +7,7 @@ from app import secscan_api from data.model.tag import filter_tags_have_repository_event, get_matching_tags from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository, RepositoryTag) -from endpoints.notificationhelper import spawn_notification +from endpoints.notificationhelper import notification_batch from util.secscan import PRIORITY_LEVELS from util.secscan.api import APIRequestFailure from util.morecollections import AttrDict @@ -89,25 +89,26 @@ def process_notification_data(notification_data): repository_map[tag.repository_id] = tag.repository # For each of the tags found, issue a notification. - for repository_id in tag_map: - tags = tag_map[repository_id] - event_data = { - 'tags': list(tags), - 'vulnerability': { - 'id': cve_id, - 'description': new_vuln.get('Description', None), - 'link': new_vuln.get('Link', None), - 'priority': new_severity['title'], - 'has_fix': 'FixedIn' in new_vuln, - }, - } + with notification_batch() as spawn_notification: + for repository_id in tag_map: + tags = tag_map[repository_id] + event_data = { + 'tags': list(tags), + 'vulnerability': { + 'id': cve_id, + 'description': new_vuln.get('Description', None), + 'link': new_vuln.get('Link', None), + 'priority': new_severity['title'], + 'has_fix': 'FixedIn' in new_vuln, + }, + } - # TODO(jzelinskie): remove when more endpoints have been converted to using interfaces - repository = AttrDict({ - 'namespace_name': repository_map[repository_id].namespace_user.username, - 'name': repository_map[repository_id].name, - }) - spawn_notification(repository, 'vulnerability_found', event_data) + # TODO(jzelinskie): remove when more endpoints have been converted to using interfaces + repository = AttrDict({ + 'namespace_name': repository_map[repository_id].namespace_user.username, + 'name': repository_map[repository_id].name, + }) + spawn_notification(repository, 'vulnerability_found', event_data) return True