From 08a3b70b5659f1c15c7e1a4de600baaa3a1f2cbe Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Aug 2016 13:08:38 -0400 Subject: [PATCH] Extend processing before processing security notifications Makes sure queue items don't expire during processing Fixes #1776 --- workers/security_notification_worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index 554bb03a1..0a14bfadb 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -10,7 +10,8 @@ from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) -_EXTENDED_SECONDS = 600 +_READING_SECONDS = 120 # 2 minutes +_PROCESSING_SECONDS = 60 * 60 # 1 hour _LAYER_LIMIT = 100 # The number of layers to request on each page. class SecurityNotificationWorker(QueueWorker): @@ -27,6 +28,7 @@ class SecurityNotificationWorker(QueueWorker): current_page = data.get('page', None) while True: + self.extend_processing(_READING_SECONDS, json.dumps(data)) (response_data, should_retry) = secscan_api.get_notification(notification_name, layer_limit=_LAYER_LIMIT, page=current_page) @@ -38,6 +40,7 @@ class SecurityNotificationWorker(QueueWorker): logger.error('Failed to handle security notification %s', notification_name) return False + self.extend_processing(_PROCESSING_SECONDS, json.dumps(data)) notification_data = response_data['Notification'] if not process_notification_data(notification_data): raise JobException() @@ -56,7 +59,6 @@ class SecurityNotificationWorker(QueueWorker): # something goes wrong in the next loop iteration), and continue. current_page = notification_data['NextPage'] data['page'] = current_page - self.extend_processing(_EXTENDED_SECONDS, json.dumps(data)) if __name__ == '__main__':