From ce7033489bbf33cb293cfd7361c9d7a63c828ee9 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Tue, 3 Feb 2015 14:50:01 -0500 Subject: [PATCH] Hopefully fix the deadlock in the queue. --- data/queue.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/data/queue.py b/data/queue.py index 52ccd9770..cdfa4e9f9 100644 --- a/data/queue.py +++ b/data/queue.py @@ -31,12 +31,17 @@ class WorkQueue(object): QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) - def _available_jobs(self, now, name_match_query, running_query): + def _available_jobs(self, now, name_match_query): return (QueueItem .select() .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), - QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query))) + QueueItem.retries_remaining > 0)) + + def _available_jobs_not_running(self, now, name_match_query, running_query): + return (self + ._available_jobs(now, name_match_query) + .where(~(QueueItem.queue_name << running_query))) def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) @@ -55,7 +60,7 @@ class WorkQueue(object): running_query = self._running_jobs(now, name_match_query) running_count = running_query.distinct().count() - avialable_query = self._available_jobs(now, name_match_query, running_query) + avialable_query = self._available_jobs_not_running(now, name_match_query, running_query) available_count = avialable_query.select(QueueItem.queue_name).distinct().count() self._reporter(self._currently_processing, running_count, running_count + available_count) @@ -88,13 +93,19 @@ class WorkQueue(object): name_match_query = self._name_match_query() - with self._transaction_factory(db): - running = self._running_jobs(now, name_match_query) - avail = self._available_jobs(now, name_match_query, running) + running = self._running_jobs(now, name_match_query) + avail = self._available_jobs_not_running(now, name_match_query, running) - item = None - try: - db_item = db_for_update(avail.order_by(QueueItem.id)).get() + item = None + try: + db_item_candidate = avail.order_by(QueueItem.id).get() + + with self._transaction_factory(db): + still_available_query = (db_for_update(self + ._available_jobs(now, name_match_query) + .where(QueueItem.id == db_item_candidate.id))) + + db_item = still_available_query.get() db_item.available = False db_item.processing_expires = now + timedelta(seconds=processing_time) db_item.retries_remaining -= 1 @@ -106,11 +117,11 @@ class WorkQueue(object): }) self._currently_processing = True - except QueueItem.DoesNotExist: - self._currently_processing = False + except QueueItem.DoesNotExist: + self._currently_processing = False - # Return a view of the queue item rather than an active db object - return item + # Return a view of the queue item rather than an active db object + return item def complete(self, completed_item): with self._transaction_factory(db):