2015-11-10 00:17:15 +00:00
|
|
|
import logging
|
|
|
|
import time
|
2016-02-25 20:58:42 +00:00
|
|
|
import json
|
2015-11-10 00:17:15 +00:00
|
|
|
|
|
|
|
import features
|
|
|
|
|
2015-11-10 20:01:33 +00:00
|
|
|
from app import secscan_notification_queue, secscan_api
|
2016-02-25 20:58:42 +00:00
|
|
|
from workers.queueworker import QueueWorker, JobException
|
|
|
|
from util.secscan.notifier import process_notification_data
|
2015-11-10 00:17:15 +00:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2016-02-25 20:58:42 +00:00
|
|
|
_EXTENDED_SECONDS = 600
|
2016-03-19 00:28:06 +00:00
|
|
|
_LAYER_LIMIT = 100 # The number of layers to request on each page.
|
2015-11-10 00:17:15 +00:00
|
|
|
|
|
|
|
class SecurityNotificationWorker(QueueWorker):
|
2015-11-12 22:47:19 +00:00
|
|
|
def process_queue_item(self, data):
|
2016-03-28 20:41:37 +00:00
|
|
|
self.perform_notification_work(data)
|
|
|
|
|
|
|
|
def perform_notification_work(self, data):
|
|
|
|
""" Performs the work for handling a security notification as referenced by the given data
|
|
|
|
object. Returns True on successful handling, False on non-retryable failure and raises
|
|
|
|
a JobException on retryable failure.
|
|
|
|
"""
|
|
|
|
|
2016-02-25 20:58:42 +00:00
|
|
|
notification_name = data['Name']
|
|
|
|
current_page = data.get('page', None)
|
2015-11-10 00:17:15 +00:00
|
|
|
|
2016-02-25 20:58:42 +00:00
|
|
|
while True:
|
2016-03-19 00:28:06 +00:00
|
|
|
(response_data, should_retry) = secscan_api.get_notification(notification_name,
|
|
|
|
layer_limit=_LAYER_LIMIT,
|
|
|
|
page=current_page)
|
2016-02-25 20:58:42 +00:00
|
|
|
if response_data is None:
|
|
|
|
if should_retry:
|
|
|
|
raise JobException()
|
|
|
|
else:
|
|
|
|
# Return to mark the job as "complete", as we'll never be able to finish it.
|
|
|
|
logger.error('Failed to handle security notification %s', notification_name)
|
2016-03-19 00:28:06 +00:00
|
|
|
return False
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
notification_data = response_data['Notification']
|
|
|
|
if not process_notification_data(notification_data):
|
|
|
|
raise JobException()
|
|
|
|
|
|
|
|
# Check for a next page of results. If none, we're done.
|
|
|
|
if 'NextPage' not in notification_data:
|
2016-03-19 00:28:06 +00:00
|
|
|
# Mark the notification as read and processed.
|
|
|
|
if not secscan_api.mark_notification_read(notification_name):
|
|
|
|
# Return to mark the job as "complete", as we'll never be able to finish it.
|
|
|
|
logger.error('Failed to mark notification %s as read', notification_name)
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
2016-02-25 20:58:42 +00:00
|
|
|
|
|
|
|
# Otherwise, save the next page token into the queue item (so we can pick up from here if
|
|
|
|
# something goes wrong in the next loop iteration), and continue.
|
|
|
|
current_page = notification_data['NextPage']
|
|
|
|
data['page'] = current_page
|
|
|
|
self.extend_processing(_EXTENDED_SECONDS, json.dumps(data))
|
2015-11-10 00:17:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2016-03-01 20:35:00 +00:00
|
|
|
if not features.SECURITY_SCANNER or not features.SECURITY_NOTIFICATIONS:
|
2015-11-10 00:17:15 +00:00
|
|
|
logger.debug('Security scanner disabled; skipping SecurityNotificationWorker')
|
|
|
|
while True:
|
|
|
|
time.sleep(100000)
|
|
|
|
|
|
|
|
worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30,
|
|
|
|
reservation_seconds=30, retry_after_seconds=30)
|
|
|
|
worker.start()
|