Hopefully fix the deadlock in the queue.

This commit is contained in:
Jake Moshenko 2015-02-03 14:50:01 -05:00
parent d709e0f64a
commit ce7033489b

View file

@ -31,12 +31,17 @@ class WorkQueue(object):
QueueItem.processing_expires > now, QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query)) QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query, running_query): def _available_jobs(self, now, name_match_query):
return (QueueItem return (QueueItem
.select() .select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)), ((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query))) QueueItem.retries_remaining > 0))
def _available_jobs_not_running(self, now, name_match_query, running_query):
return (self
._available_jobs(now, name_match_query)
.where(~(QueueItem.queue_name << running_query)))
def _name_match_query(self): def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
@ -55,7 +60,7 @@ class WorkQueue(object):
running_query = self._running_jobs(now, name_match_query) running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count() running_count = running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query) avialable_query = self._available_jobs_not_running(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count() available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(self._currently_processing, running_count, running_count + available_count) self._reporter(self._currently_processing, running_count, running_count + available_count)
@ -88,13 +93,19 @@ class WorkQueue(object):
name_match_query = self._name_match_query() name_match_query = self._name_match_query()
with self._transaction_factory(db): running = self._running_jobs(now, name_match_query)
running = self._running_jobs(now, name_match_query) avail = self._available_jobs_not_running(now, name_match_query, running)
avail = self._available_jobs(now, name_match_query, running)
item = None item = None
try: try:
db_item = db_for_update(avail.order_by(QueueItem.id)).get() db_item_candidate = avail.order_by(QueueItem.id).get()
with self._transaction_factory(db):
still_available_query = (db_for_update(self
._available_jobs(now, name_match_query)
.where(QueueItem.id == db_item_candidate.id)))
db_item = still_available_query.get()
db_item.available = False db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time) db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1 db_item.retries_remaining -= 1
@ -106,11 +117,11 @@ class WorkQueue(object):
}) })
self._currently_processing = True self._currently_processing = True
except QueueItem.DoesNotExist: except QueueItem.DoesNotExist:
self._currently_processing = False self._currently_processing = False
# Return a view of the queue item rather than an active db object # Return a view of the queue item rather than an active db object
return item return item
def complete(self, completed_item): def complete(self, completed_item):
with self._transaction_factory(db): with self._transaction_factory(db):