diff --git a/data/queue.py b/data/queue.py index 5f292a1d7..e88c56a94 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from database import QueueItem +from data.database import QueueItem, db class WorkQueue(object): @@ -34,25 +34,24 @@ class WorkQueue(object): available_or_expired = ((QueueItem.available == True) | (QueueItem.processing_expires <= now)) - # TODO the query and the update should be atomic, but for now we only - # have one worker. - avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, - QueueItem.available_after <= now, - available_or_expired, - QueueItem.retries_remaining > 0) + with db.transaction(): + avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, + QueueItem.available_after <= now, + available_or_expired, + QueueItem.retries_remaining > 0) - found = list(avail.limit(1).order_by(QueueItem.available_after)) + found = list(avail.limit(1).order_by(QueueItem.available_after)) - if found: - item = found[0] - item.available = False - item.processing_expires = now + timedelta(seconds=processing_time) - item.retries_remaining -= 1 - item.save() + if found: + item = found[0] + item.available = False + item.processing_expires = now + timedelta(seconds=processing_time) + item.retries_remaining -= 1 + item.save() - return item + return item - return None + return None def complete(self, completed_item): completed_item.delete_instance()