Phase 4 of the namespace to user migration: actually remove the column from the db and remove the dependence on serialized namespaces in the workers and queues

This commit is contained in:
Jake Moshenko 2014-10-01 14:23:15 -04:00
parent 2c5cc7990f
commit e8b3d1cc4a
17 changed files with 273 additions and 123 deletions

View file

@ -1,6 +1,7 @@
from datetime import datetime, timedelta
from data.database import QueueItem, db
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
@ -25,17 +26,17 @@ class WorkQueue(object):
def _running_jobs(self, now, name_match_query):
return (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query, running_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
@ -49,7 +50,7 @@ class WorkQueue(object):
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count =running_query.distinct().count()
running_count = running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
@ -89,41 +90,49 @@ class WorkQueue(object):
item = None
try:
item = avail.order_by(QueueItem.id).get()
item.available = False
item.processing_expires = now + timedelta(seconds=processing_time)
item.retries_remaining -= 1
item.save()
db_item = avail.order_by(QueueItem.id).get()
db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1
db_item.save()
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
pass
# Return a view of the queue item rather than an active db object
return item
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item.delete_instance()
completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id)
completed_item_obj.delete_instance()
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date
incomplete_item.available = True
incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
if restore_retry:
incomplete_item.retries_remaining += 1
incomplete_item_obj.retries_remaining += 1
incomplete_item.save()
incomplete_item_obj.save()
self._currently_processing = False
@staticmethod
def extend_processing(queue_item, seconds_from_now):
def extend_processing(self, queue_item, seconds_from_now):
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION:
queue_item.processing_expires = new_expiration
queue_item.save()
queue_item_obj = QueueItem.get(QueueItem.id == queue_item.id)
if new_expiration - queue_item_obj.processing_expires > MINIMUM_EXTENSION:
with self._transaction_factory(db):
queue_item_obj.processing_expires = new_expiration
queue_item_obj.save()