Query for and take a queue item in a transaction.
This commit is contained in:
parent
41c92deb0d
commit
7a071fa731
1 changed files with 15 additions and 16 deletions
|
@ -1,6 +1,6 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from database import QueueItem
|
||||
from data.database import QueueItem, db
|
||||
|
||||
|
||||
class WorkQueue(object):
|
||||
|
@ -34,25 +34,24 @@ class WorkQueue(object):
|
|||
available_or_expired = ((QueueItem.available == True) |
|
||||
(QueueItem.processing_expires <= now))
|
||||
|
||||
# TODO the query and the update should be atomic, but for now we only
|
||||
# have one worker.
|
||||
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
|
||||
QueueItem.available_after <= now,
|
||||
available_or_expired,
|
||||
QueueItem.retries_remaining > 0)
|
||||
with db.transaction():
|
||||
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
|
||||
QueueItem.available_after <= now,
|
||||
available_or_expired,
|
||||
QueueItem.retries_remaining > 0)
|
||||
|
||||
found = list(avail.limit(1).order_by(QueueItem.available_after))
|
||||
found = list(avail.limit(1).order_by(QueueItem.available_after))
|
||||
|
||||
if found:
|
||||
item = found[0]
|
||||
item.available = False
|
||||
item.processing_expires = now + timedelta(seconds=processing_time)
|
||||
item.retries_remaining -= 1
|
||||
item.save()
|
||||
if found:
|
||||
item = found[0]
|
||||
item.available = False
|
||||
item.processing_expires = now + timedelta(seconds=processing_time)
|
||||
item.retries_remaining -= 1
|
||||
item.save()
|
||||
|
||||
return item
|
||||
return item
|
||||
|
||||
return None
|
||||
return None
|
||||
|
||||
def complete(self, completed_item):
|
||||
completed_item.delete_instance()
|
||||
|
|
Reference in a new issue