move UseThenDisconnect into queueworker

This makes the tests pass while maintaining the same behavior.
This commit is contained in:
Jimmy Zelinskie 2015-09-21 13:34:12 -04:00
parent 98d6262a7f
commit 7c82e0b5b3
2 changed files with 19 additions and 17 deletions

View file

@ -1,7 +1,6 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from app import app from data.database import QueueItem, db, db_for_update
from data.database import QueueItem, db, db_for_update, UseThenDisconnect
from util.morecollections import AttrDict from util.morecollections import AttrDict
@ -69,24 +68,23 @@ class WorkQueue(object):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get() return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
def get_metrics(self, require_transaction=True): def get_metrics(self, require_transaction=True):
with UseThenDisconnect(app.config): guard = self._transaction_factory(db) if require_transaction else NoopWith()
guard = self._transaction_factory(db) if require_transaction else NoopWith() with guard:
with guard: now = datetime.utcnow()
now = datetime.utcnow() name_match_query = self._name_match_query()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query) running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count() running_count = running_query.distinct().count()
available_query = self._available_jobs(now, name_match_query) available_query = self._available_jobs(now, name_match_query)
available_count = available_query.select(QueueItem.queue_name).distinct().count() available_count = available_query.select(QueueItem.queue_name).distinct().count()
available_not_running_query = self._available_jobs_not_running(now, name_match_query, available_not_running_query = self._available_jobs_not_running(now, name_match_query,
running_query) running_query)
available_not_running_count = (available_not_running_query.select(QueueItem.queue_name) available_not_running_count = (available_not_running_query.select(QueueItem.queue_name)
.distinct().count()) .distinct().count())
return (running_count, available_not_running_count, available_count) return (running_count, available_not_running_count, available_count)
def update_metrics(self): def update_metrics(self):
if self._reporter is None and self._metric_queue is None: if self._reporter is None and self._metric_queue is None:

View file

@ -8,8 +8,11 @@ from datetime import datetime, timedelta
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from app import app
from data.model import db from data.model import db
from data.queue import WorkQueue from data.queue import WorkQueue
from data.database import UseThenDisconnect
from workers.worker import Worker from workers.worker import Worker
@ -123,7 +126,8 @@ class QueueWorker(Worker):
logger.debug('No more work.') logger.debug('No more work.')
def update_queue_metrics(self): def update_queue_metrics(self):
self._queue.update_metrics() with UseThenDisconnect(app.config):
self._queue.update_metrics()
def mark_current_incomplete(self, restore_retry=False): def mark_current_incomplete(self, restore_retry=False):
with self._current_item_lock: with self._current_item_lock: