From 7c82e0b5b39bece242f216bbc62bbaa5805498a0 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Mon, 21 Sep 2015 13:34:12 -0400 Subject: [PATCH] move UseThenDisconnect into queueworker This makes the tests pass while maintaining the same behavior. --- data/queue.py | 30 ++++++++++++++---------------- workers/queueworker.py | 6 +++++- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/data/queue.py b/data/queue.py index e99c98e68..b787d22be 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta -from app import app -from data.database import QueueItem, db, db_for_update, UseThenDisconnect +from data.database import QueueItem, db, db_for_update from util.morecollections import AttrDict @@ -69,24 +68,23 @@ class WorkQueue(object): return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get() def get_metrics(self, require_transaction=True): - with UseThenDisconnect(app.config): - guard = self._transaction_factory(db) if require_transaction else NoopWith() - with guard: - now = datetime.utcnow() - name_match_query = self._name_match_query() + guard = self._transaction_factory(db) if require_transaction else NoopWith() + with guard: + now = datetime.utcnow() + name_match_query = self._name_match_query() - running_query = self._running_jobs(now, name_match_query) - running_count = running_query.distinct().count() + running_query = self._running_jobs(now, name_match_query) + running_count = running_query.distinct().count() - available_query = self._available_jobs(now, name_match_query) - available_count = available_query.select(QueueItem.queue_name).distinct().count() + available_query = self._available_jobs(now, name_match_query) + available_count = available_query.select(QueueItem.queue_name).distinct().count() - available_not_running_query = self._available_jobs_not_running(now, name_match_query, - running_query) - available_not_running_count = (available_not_running_query.select(QueueItem.queue_name) - .distinct().count()) + available_not_running_query = self._available_jobs_not_running(now, name_match_query, + running_query) + available_not_running_count = (available_not_running_query.select(QueueItem.queue_name) + .distinct().count()) - return (running_count, available_not_running_count, available_count) + return (running_count, available_not_running_count, available_count) def update_metrics(self): if self._reporter is None and self._metric_queue is None: diff --git a/workers/queueworker.py b/workers/queueworker.py index 08ba1699d..955f43ed7 100644 --- a/workers/queueworker.py +++ b/workers/queueworker.py @@ -8,8 +8,11 @@ from datetime import datetime, timedelta from threading import Thread from time import sleep +from app import app + from data.model import db from data.queue import WorkQueue +from data.database import UseThenDisconnect from workers.worker import Worker @@ -123,7 +126,8 @@ class QueueWorker(Worker): logger.debug('No more work.') def update_queue_metrics(self): - self._queue.update_metrics() + with UseThenDisconnect(app.config): + self._queue.update_metrics() def mark_current_incomplete(self, restore_retry=False): with self._current_item_lock: