Merge pull request #1100 from coreos-inc/queueitemnonexistant
Catch other cases where the queue item has been removed
This commit is contained in:
commit
d6fd9a07c2
1 changed files with 22 additions and 14 deletions
|
@ -213,23 +213,31 @@ class WorkQueue(object):
|
||||||
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
|
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
|
||||||
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
|
|
||||||
incomplete_item_obj.available_after = retry_date
|
|
||||||
incomplete_item_obj.available = True
|
|
||||||
|
|
||||||
if restore_retry:
|
try:
|
||||||
incomplete_item_obj.retries_remaining += 1
|
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
|
||||||
|
incomplete_item_obj.available_after = retry_date
|
||||||
|
incomplete_item_obj.available = True
|
||||||
|
|
||||||
incomplete_item_obj.save()
|
if restore_retry:
|
||||||
self._currently_processing = False
|
incomplete_item_obj.retries_remaining += 1
|
||||||
return incomplete_item_obj.retries_remaining > 0
|
|
||||||
|
incomplete_item_obj.save()
|
||||||
|
self._currently_processing = False
|
||||||
|
return incomplete_item_obj.retries_remaining > 0
|
||||||
|
except QueueItem.DoesNotExist:
|
||||||
|
return False
|
||||||
|
|
||||||
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
|
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
|
||||||
with self._transaction_factory(db):
|
with self._transaction_factory(db):
|
||||||
queue_item = self._item_by_id_for_update(item.id)
|
try:
|
||||||
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
queue_item = self._item_by_id_for_update(item.id)
|
||||||
|
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
||||||
|
|
||||||
|
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
||||||
|
if new_expiration - queue_item.processing_expires > minimum_extension:
|
||||||
|
queue_item.processing_expires = new_expiration
|
||||||
|
queue_item.save()
|
||||||
|
except QueueItem.DoesNotExist:
|
||||||
|
return
|
||||||
|
|
||||||
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
|
||||||
if new_expiration - queue_item.processing_expires > minimum_extension:
|
|
||||||
queue_item.processing_expires = new_expiration
|
|
||||||
queue_item.save()
|
|
||||||
|
|
Reference in a new issue