import logging import time import json import features from app import secscan_notification_queue, secscan_api from workers.queueworker import QueueWorker, JobException from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) _EXTENDED_SECONDS = 600 _LAYER_LIMIT = 100 # The number of layers to request on each page. class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): notification_name = data['Name'] current_page = data.get('page', None) while True: (response_data, should_retry) = secscan_api.get_notification(notification_name, layer_limit=_LAYER_LIMIT, page=current_page) if response_data is None: if should_retry: raise JobException() else: # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to handle security notification %s', notification_name) return False notification_data = response_data['Notification'] if not process_notification_data(notification_data): raise JobException() # Check for a next page of results. If none, we're done. if 'NextPage' not in notification_data: # Mark the notification as read and processed. if not secscan_api.mark_notification_read(notification_name): # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to mark notification %s as read', notification_name) return False return True # Otherwise, save the next page token into the queue item (so we can pick up from here if # something goes wrong in the next loop iteration), and continue. current_page = notification_data['NextPage'] data['page'] = current_page self.extend_processing(_EXTENDED_SECONDS, json.dumps(data)) if __name__ == '__main__': if not features.SECURITY_SCANNER or not features.SECURITY_NOTIFICATIONS: logger.debug('Security scanner disabled; skipping SecurityNotificationWorker') while True: time.sleep(100000) worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30, reservation_seconds=30, retry_after_seconds=30) worker.start()