Have QSS only add security scanner notifications once
This commit is contained in:
parent
3bbd8ca898
commit
97d150e281
3 changed files with 51 additions and 7 deletions
|
@ -53,11 +53,9 @@ class WorkQueue(object):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _running_jobs(cls, now, name_match_query):
|
def _running_jobs(cls, now, name_match_query):
|
||||||
return (QueueItem
|
return (cls
|
||||||
.select(QueueItem.queue_name)
|
._running_jobs_where(QueueItem.select(QueueItem.queue_name), now)
|
||||||
.where(QueueItem.available == False,
|
.where(QueueItem.queue_name ** name_match_query))
|
||||||
QueueItem.processing_expires > now,
|
|
||||||
QueueItem.queue_name ** name_match_query))
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _available_jobs(cls, now, name_match_query):
|
def _available_jobs(cls, now, name_match_query):
|
||||||
|
@ -65,6 +63,10 @@ class WorkQueue(object):
|
||||||
._available_jobs_where(QueueItem.select(), now)
|
._available_jobs_where(QueueItem.select(), now)
|
||||||
.where(QueueItem.queue_name ** name_match_query))
|
.where(QueueItem.queue_name ** name_match_query))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _running_jobs_where(query, now):
|
||||||
|
return query.where(QueueItem.available == False, QueueItem.processing_expires > now)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _available_jobs_where(query, now):
|
def _available_jobs_where(query, now):
|
||||||
return query.where(QueueItem.available_after <= now,
|
return query.where(QueueItem.available_after <= now,
|
||||||
|
@ -141,6 +143,23 @@ class WorkQueue(object):
|
||||||
queue_prefix = '%s/%s/%s%%' % (self._queue_name, namespace, subpath_query)
|
queue_prefix = '%s/%s/%s%%' % (self._queue_name, namespace, subpath_query)
|
||||||
QueueItem.delete().where(QueueItem.queue_name ** queue_prefix).execute()
|
QueueItem.delete().where(QueueItem.queue_name ** queue_prefix).execute()
|
||||||
|
|
||||||
|
def alive(self, canonical_name_list):
|
||||||
|
"""
|
||||||
|
Returns True if a job matching the canonical name list is currently processing
|
||||||
|
or available.
|
||||||
|
"""
|
||||||
|
canonical_name = self._canonical_name([self._queue_name] + canonical_name_list)
|
||||||
|
try:
|
||||||
|
select_query = QueueItem.select().where(QueueItem.queue_name == canonical_name)
|
||||||
|
now = datetime.utcnow()
|
||||||
|
|
||||||
|
overall_query = (self._available_jobs_where(select_query.clone(), now) |
|
||||||
|
self._running_jobs_where(select_query.clone(), now))
|
||||||
|
overall_query.get()
|
||||||
|
return True
|
||||||
|
except QueueItem.DoesNotExist:
|
||||||
|
return False
|
||||||
|
|
||||||
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||||
"""
|
"""
|
||||||
Put an item, if it shouldn't be processed for some number of seconds,
|
Put an item, if it shouldn't be processed for some number of seconds,
|
||||||
|
|
|
@ -3,7 +3,7 @@ import json
|
||||||
|
|
||||||
import features
|
import features
|
||||||
|
|
||||||
from app import secscan_notification_queue, secscan_api
|
from app import secscan_notification_queue
|
||||||
from flask import request, make_response, Blueprint, abort
|
from flask import request, make_response, Blueprint, abort
|
||||||
from endpoints.common import route_show_if
|
from endpoints.common import route_show_if
|
||||||
|
|
||||||
|
@ -19,5 +19,9 @@ def secscan_notification():
|
||||||
abort(400)
|
abort(400)
|
||||||
|
|
||||||
notification = data['Notification']
|
notification = data['Notification']
|
||||||
secscan_notification_queue.put(['named', notification['Name']], json.dumps(notification))
|
name = ['named', notification['Name']]
|
||||||
|
|
||||||
|
if not secscan_notification_queue.alive(name):
|
||||||
|
secscan_notification_queue.put(name, json.dumps(notification))
|
||||||
|
|
||||||
return make_response('Okay')
|
return make_response('Okay')
|
||||||
|
|
|
@ -7,6 +7,7 @@ from functools import wraps
|
||||||
from app import app
|
from app import app
|
||||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||||
from data.queue import WorkQueue
|
from data.queue import WorkQueue
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
|
||||||
QUEUE_NAME = 'testqueuename'
|
QUEUE_NAME = 'testqueuename'
|
||||||
|
@ -147,6 +148,26 @@ class TestQueue(QueueTestCase):
|
||||||
self.assertEqual(self.reporter.running_count, 1)
|
self.assertEqual(self.reporter.running_count, 1)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
|
def test_alive(self):
|
||||||
|
# No queue item = not alive.
|
||||||
|
self.assertFalse(self.queue.alive(['abc', 'def']))
|
||||||
|
|
||||||
|
# Add a queue item.
|
||||||
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
||||||
|
self.assertTrue(self.queue.alive(['abc', 'def']))
|
||||||
|
|
||||||
|
# Retrieve the queue item.
|
||||||
|
queue_item = self.queue.get()
|
||||||
|
self.assertIsNotNone(queue_item)
|
||||||
|
self.assertTrue(self.queue.alive(['abc', 'def']))
|
||||||
|
|
||||||
|
# Make sure it is running by trying to retrieve it again.
|
||||||
|
self.assertIsNone(self.queue.get())
|
||||||
|
|
||||||
|
# Delete the queue item.
|
||||||
|
self.queue.complete(queue_item)
|
||||||
|
self.assertFalse(self.queue.alive(['abc', 'def']))
|
||||||
|
|
||||||
def test_specialized_queue(self):
|
def test_specialized_queue(self):
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
||||||
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
||||||
|
|
Reference in a new issue