From 7c490b46c8c5cb298c75f249724c9c8b179da58a Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 5 Dec 2016 18:05:18 -0500 Subject: [PATCH 1/3] Only save dirty fields on Queue queries. --- data/database.py | 5 +++++ data/queue.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/data/database.py b/data/database.py index afc1573b4..c5c7af0d8 100644 --- a/data/database.py +++ b/data/database.py @@ -758,6 +758,11 @@ class QueueItem(BaseModel): processing_expires = DateTimeField(null=True, index=True) retries_remaining = IntegerField(default=5, index=True) + class Meta: + database = db + read_slaves = (read_slave,) + only_save_dirty = True + class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) diff --git a/data/queue.py b/data/queue.py index 421d7d6e7..0f9865d29 100644 --- a/data/queue.py +++ b/data/queue.py @@ -270,7 +270,7 @@ class WorkQueue(object): queue_item.processing_expires = new_expiration has_change = True - if updated_data is not None: + if updated_data is not None and queue_item.body != updated_data: queue_item.body = updated_data has_change = True From 709edd7eb6718b4b045679924f56f6acd0348f97 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 5 Dec 2016 18:10:04 -0500 Subject: [PATCH 2/3] Reduce the update period on queue worker metrics. --- config.py | 1 + workers/queueworker.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/config.py b/config.py index 3b9eebfc8..f6c15a2d6 100644 --- a/config.py +++ b/config.py @@ -105,6 +105,7 @@ class DefaultConfig(object): # Build Queue Metrics QUEUE_METRICS_TYPE = 'Null' + QUEUE_WORKER_METRICS_REFRESH_SECONDS = 300 # Exception logging EXCEPTION_LOG_TYPE = 'FakeSentry' diff --git a/workers/queueworker.py b/workers/queueworker.py index cdd96d9e7..8b76a5a90 100644 --- a/workers/queueworker.py +++ b/workers/queueworker.py @@ -40,7 +40,8 @@ class QueueWorker(Worker): # Add the various operations. self.add_operation(self.poll_queue, self._poll_period_seconds) - self.add_operation(self.update_queue_metrics, 60) + self.add_operation(self.update_queue_metrics, + app.config['QUEUE_WORKER_METRICS_REFRESH_SECONDS']) self.add_operation(self.run_watchdog, self._watchdog_period_seconds) def process_queue_item(self, job_details): From c263772703936ee9bc8f864eb927f0325c3599b1 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 5 Dec 2016 18:11:21 -0500 Subject: [PATCH 3/3] Do not extend processing immediately after taking queue item. --- workers/security_notification_worker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py index a33bc9762..91f482aec 100644 --- a/workers/security_notification_worker.py +++ b/workers/security_notification_worker.py @@ -12,7 +12,6 @@ from util.secscan.notifier import process_notification_data logger = logging.getLogger(__name__) -_READING_SECONDS = 120 # 2 minutes _PROCESSING_SECONDS = 60 * 60 # 1 hour _LAYER_LIMIT = 1000 # The number of layers to request on each page. @@ -31,7 +30,6 @@ class SecurityNotificationWorker(QueueWorker): current_page = data.get('page', None) while True: - self.extend_processing(_READING_SECONDS, json.dumps(data)) (response_data, should_retry) = secscan_api.get_notification(notification_name, layer_limit=_LAYER_LIMIT, page=current_page)