import logging import time import json import features from app import secscan_notification_queue, secscan_api from workers.queueworker import QueueWorker, JobException from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) _READING_SECONDS = 120 # 2 minutes _PROCESSING_SECONDS = 60 * 60 # 1 hour _LAYER_LIMIT = 100 # The number of layers to request on each page. class SecurityNotificationWorker(QueueWorker): def process_queue_item(self, data): self.perform_notification_work(data) def perform_notification_work(self, data): """ Performs the work for handling a security notification as referenced by the given data object. Returns True on successful handling, False on non-retryable failure and raises a JobException on retryable failure. """ notification_name = data['Name'] current_page = data.get('page', None) while True: self.extend_processing(_READING_SECONDS, json.dumps(data)) (response_data, should_retry) = secscan_api.get_notification(notification_name, layer_limit=_LAYER_LIMIT, page=current_page) if response_data is None: if should_retry: raise JobException() else: # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to handle security notification %s', notification_name) return False self.extend_processing(_PROCESSING_SECONDS, json.dumps(data)) notification_data = response_data['Notification'] if not process_notification_data(notification_data): raise JobException() # Check for a next page of results. If none, we're done. if 'NextPage' not in notification_data: # Mark the notification as read and processed. if not secscan_api.mark_notification_read(notification_name): # Return to mark the job as "complete", as we'll never be able to finish it. logger.error('Failed to mark notification %s as read', notification_name) return False return True # Otherwise, save the next page token into the queue item (so we can pick up from here if # something goes wrong in the next loop iteration), and continue. current_page = notification_data['NextPage'] data['page'] = current_page if __name__ == '__main__': if not features.SECURITY_SCANNER or not features.SECURITY_NOTIFICATIONS: logger.debug('Security scanner disabled; skipping SecurityNotificationWorker') while True: time.sleep(100000) worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30, reservation_seconds=30, retry_after_seconds=30) worker.start()