This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/security_notification_worker.py

77 lines
2.9 KiB
Python
Raw Normal View History

2015-11-10 00:17:15 +00:00
import logging
import time
import json
2015-11-10 00:17:15 +00:00
import features
from app import secscan_notification_queue, secscan_api
from workers.queueworker import QueueWorker, JobException
from util.secscan.notifier import process_notification_data
2015-11-10 00:17:15 +00:00
2016-10-28 21:11:54 +00:00
2015-11-10 00:17:15 +00:00
logger = logging.getLogger(__name__)
2016-10-28 21:11:54 +00:00
_PROCESSING_SECONDS = 60 * 60 # 1 hour
_LAYER_LIMIT = 1000 # The number of layers to request on each page.
2015-11-10 00:17:15 +00:00
2016-10-28 21:11:54 +00:00
2015-11-10 00:17:15 +00:00
class SecurityNotificationWorker(QueueWorker):
2015-11-12 22:47:19 +00:00
def process_queue_item(self, data):
self.perform_notification_work(data)
def perform_notification_work(self, data, layer_limit=_LAYER_LIMIT):
""" Performs the work for handling a security notification as referenced by the given data
object. Returns True on successful handling, False on non-retryable failure and raises
a JobException on retryable failure.
"""
notification_name = data['Name']
current_page = data.get('page', None)
2015-11-10 00:17:15 +00:00
while True:
(response_data, should_retry) = secscan_api.get_notification(notification_name,
layer_limit=layer_limit,
page=current_page)
if response_data is None:
if should_retry:
raise JobException()
else:
# Remove the job from the API.
logger.error('Failed to handle security notification %s', notification_name)
secscan_api.mark_notification_read(notification_name)
# Return to mark the job as "complete", as we'll never be able to finish it.
return False
self.extend_processing(_PROCESSING_SECONDS, json.dumps(data))
notification_data = response_data['Notification']
if not process_notification_data(notification_data):
raise JobException()
# Check for a next page of results. If none, we're done.
if 'NextPage' not in notification_data:
# Mark the notification as read and processed.
if not secscan_api.mark_notification_read(notification_name):
# Return to mark the job as "complete", as we'll never be able to finish it.
logger.error('Failed to mark notification %s as read', notification_name)
return False
return True
# Otherwise, save the next page token into the queue item (so we can pick up from here if
# something goes wrong in the next loop iteration), and continue.
current_page = notification_data['NextPage']
data['page'] = current_page
2015-11-10 00:17:15 +00:00
if __name__ == '__main__':
if not features.SECURITY_SCANNER or not features.SECURITY_NOTIFICATIONS:
2015-11-10 00:17:15 +00:00
logger.debug('Security scanner disabled; skipping SecurityNotificationWorker')
while True:
time.sleep(100000)
worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30,
reservation_seconds=30, retry_after_seconds=30)
worker.start()