From 97d150e28156e791bac5703964ab4dba301153ac Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 5 Dec 2016 19:08:20 -0500 Subject: [PATCH] Have QSS only add security scanner notifications once --- data/queue.py | 29 ++++++++++++++++++++++++----- endpoints/secscan.py | 8 ++++++-- test/test_queue.py | 21 +++++++++++++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/data/queue.py b/data/queue.py index 421d7d6e7..7afcd705b 100644 --- a/data/queue.py +++ b/data/queue.py @@ -53,11 +53,9 @@ class WorkQueue(object): @classmethod def _running_jobs(cls, now, name_match_query): - return (QueueItem - .select(QueueItem.queue_name) - .where(QueueItem.available == False, - QueueItem.processing_expires > now, - QueueItem.queue_name ** name_match_query)) + return (cls + ._running_jobs_where(QueueItem.select(QueueItem.queue_name), now) + .where(QueueItem.queue_name ** name_match_query)) @classmethod def _available_jobs(cls, now, name_match_query): @@ -65,6 +63,10 @@ class WorkQueue(object): ._available_jobs_where(QueueItem.select(), now) .where(QueueItem.queue_name ** name_match_query)) + @staticmethod + def _running_jobs_where(query, now): + return query.where(QueueItem.available == False, QueueItem.processing_expires > now) + @staticmethod def _available_jobs_where(query, now): return query.where(QueueItem.available_after <= now, @@ -141,6 +143,23 @@ class WorkQueue(object): queue_prefix = '%s/%s/%s%%' % (self._queue_name, namespace, subpath_query) QueueItem.delete().where(QueueItem.queue_name ** queue_prefix).execute() + def alive(self, canonical_name_list): + """ + Returns True if a job matching the canonical name list is currently processing + or available. + """ + canonical_name = self._canonical_name([self._queue_name] + canonical_name_list) + try: + select_query = QueueItem.select().where(QueueItem.queue_name == canonical_name) + now = datetime.utcnow() + + overall_query = (self._available_jobs_where(select_query.clone(), now) | + self._running_jobs_where(select_query.clone(), now)) + overall_query.get() + return True + except QueueItem.DoesNotExist: + return False + def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, diff --git a/endpoints/secscan.py b/endpoints/secscan.py index 9151026a4..6ce803e1e 100644 --- a/endpoints/secscan.py +++ b/endpoints/secscan.py @@ -3,7 +3,7 @@ import json import features -from app import secscan_notification_queue, secscan_api +from app import secscan_notification_queue from flask import request, make_response, Blueprint, abort from endpoints.common import route_show_if @@ -19,5 +19,9 @@ def secscan_notification(): abort(400) notification = data['Notification'] - secscan_notification_queue.put(['named', notification['Name']], json.dumps(notification)) + name = ['named', notification['Name']] + + if not secscan_notification_queue.alive(name): + secscan_notification_queue.put(name, json.dumps(notification)) + return make_response('Okay') diff --git a/test/test_queue.py b/test/test_queue.py index d31268850..7b290c2df 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -7,6 +7,7 @@ from functools import wraps from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.queue import WorkQueue +from datetime import timedelta QUEUE_NAME = 'testqueuename' @@ -147,6 +148,26 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) + def test_alive(self): + # No queue item = not alive. + self.assertFalse(self.queue.alive(['abc', 'def'])) + + # Add a queue item. + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + self.assertTrue(self.queue.alive(['abc', 'def'])) + + # Retrieve the queue item. + queue_item = self.queue.get() + self.assertIsNotNone(queue_item) + self.assertTrue(self.queue.alive(['abc', 'def'])) + + # Make sure it is running by trying to retrieve it again. + self.assertIsNone(self.queue.get()) + + # Delete the queue item. + self.queue.complete(queue_item) + self.assertFalse(self.queue.alive(['abc', 'def'])) + def test_specialized_queue(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1)