2013-10-17 22:25:19 +00:00
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
2013-11-15 21:44:33 +00:00
|
|
|
from data.database import QueueItem, db
|
2014-10-01 18:23:15 +00:00
|
|
|
from util.morecollections import AttrDict
|
2013-10-17 22:25:19 +00:00
|
|
|
|
|
|
|
|
2014-05-06 22:46:19 +00:00
|
|
|
MINIMUM_EXTENSION = timedelta(seconds=20)
|
|
|
|
|
|
|
|
|
2013-10-17 22:25:19 +00:00
|
|
|
class WorkQueue(object):
|
2014-05-22 16:13:41 +00:00
|
|
|
def __init__(self, queue_name, transaction_factory,
|
|
|
|
canonical_name_match_list=None, reporter=None):
|
2014-05-21 23:50:37 +00:00
|
|
|
self._queue_name = queue_name
|
|
|
|
self._reporter = reporter
|
2014-05-22 16:13:41 +00:00
|
|
|
self._transaction_factory = transaction_factory
|
2014-05-23 18:16:26 +00:00
|
|
|
self._currently_processing = False
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-04-11 22:34:47 +00:00
|
|
|
if canonical_name_match_list is None:
|
2014-05-21 23:50:37 +00:00
|
|
|
self._canonical_name_match_list = []
|
2014-04-11 22:34:47 +00:00
|
|
|
else:
|
2014-05-21 23:50:37 +00:00
|
|
|
self._canonical_name_match_list = canonical_name_match_list
|
2014-04-11 22:34:47 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _canonical_name(name_list):
|
|
|
|
return '/'.join(name_list) + '/'
|
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
def _running_jobs(self, now, name_match_query):
|
|
|
|
return (QueueItem
|
2014-10-01 18:23:15 +00:00
|
|
|
.select(QueueItem.queue_name)
|
|
|
|
.where(QueueItem.available == False,
|
|
|
|
QueueItem.processing_expires > now,
|
|
|
|
QueueItem.queue_name ** name_match_query))
|
2014-05-21 23:50:37 +00:00
|
|
|
|
2014-05-22 17:50:06 +00:00
|
|
|
def _available_jobs(self, now, name_match_query, running_query):
|
|
|
|
return (QueueItem
|
2014-10-01 18:23:15 +00:00
|
|
|
.select()
|
|
|
|
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
|
|
|
|
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
|
|
|
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
|
2014-05-22 17:50:06 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
def _name_match_query(self):
|
|
|
|
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
|
|
|
|
2014-05-23 18:16:26 +00:00
|
|
|
def update_metrics(self):
|
2014-10-24 15:40:02 +00:00
|
|
|
if self._reporter is None:
|
|
|
|
return
|
2014-11-24 21:07:38 +00:00
|
|
|
|
2014-05-23 18:16:26 +00:00
|
|
|
with self._transaction_factory(db):
|
|
|
|
now = datetime.utcnow()
|
|
|
|
name_match_query = self._name_match_query()
|
2014-05-22 17:50:06 +00:00
|
|
|
|
2014-05-23 18:16:26 +00:00
|
|
|
running_query = self._running_jobs(now, name_match_query)
|
2014-10-01 18:23:15 +00:00
|
|
|
running_count = running_query.distinct().count()
|
2014-05-21 23:50:37 +00:00
|
|
|
|
2014-05-23 18:16:26 +00:00
|
|
|
avialable_query = self._available_jobs(now, name_match_query, running_query)
|
|
|
|
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
|
2014-05-21 23:50:37 +00:00
|
|
|
|
2014-10-24 15:40:02 +00:00
|
|
|
self._reporter(self._currently_processing, running_count, running_count + available_count)
|
2014-05-21 23:50:37 +00:00
|
|
|
|
2014-04-11 22:34:47 +00:00
|
|
|
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
2013-10-17 22:25:19 +00:00
|
|
|
"""
|
|
|
|
Put an item, if it shouldn't be processed for some number of seconds,
|
|
|
|
specify that amount as available_after.
|
|
|
|
"""
|
|
|
|
|
|
|
|
params = {
|
2014-05-21 23:50:37 +00:00
|
|
|
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
|
2013-10-17 22:25:19 +00:00
|
|
|
'body': message,
|
2013-11-15 20:49:26 +00:00
|
|
|
'retries_remaining': retries_remaining,
|
2013-10-17 22:25:19 +00:00
|
|
|
}
|
|
|
|
|
2014-09-22 16:52:57 +00:00
|
|
|
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
|
|
|
|
params['available_after'] = available_date
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-05-22 16:13:41 +00:00
|
|
|
with self._transaction_factory(db):
|
2014-05-21 23:50:37 +00:00
|
|
|
QueueItem.create(**params)
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2013-10-18 19:28:16 +00:00
|
|
|
def get(self, processing_time=300):
|
2013-10-17 22:25:19 +00:00
|
|
|
"""
|
|
|
|
Get an available item and mark it as unavailable for the default of five
|
|
|
|
minutes.
|
|
|
|
"""
|
2014-05-23 18:16:26 +00:00
|
|
|
now = datetime.utcnow()
|
2014-04-11 22:34:47 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
name_match_query = self._name_match_query()
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-05-22 16:13:41 +00:00
|
|
|
with self._transaction_factory(db):
|
2014-05-21 23:50:37 +00:00
|
|
|
running = self._running_jobs(now, name_match_query)
|
2014-05-22 17:50:06 +00:00
|
|
|
avail = self._available_jobs(now, name_match_query, running)
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
item = None
|
2014-05-22 17:50:06 +00:00
|
|
|
try:
|
2014-10-01 18:23:15 +00:00
|
|
|
db_item = avail.order_by(QueueItem.id).get()
|
|
|
|
db_item.available = False
|
|
|
|
db_item.processing_expires = now + timedelta(seconds=processing_time)
|
|
|
|
db_item.retries_remaining -= 1
|
|
|
|
db_item.save()
|
|
|
|
|
|
|
|
item = AttrDict({
|
|
|
|
'id': db_item.id,
|
|
|
|
'body': db_item.body,
|
|
|
|
})
|
2014-05-23 18:16:26 +00:00
|
|
|
|
|
|
|
self._currently_processing = True
|
2014-05-22 17:50:06 +00:00
|
|
|
except QueueItem.DoesNotExist:
|
2014-05-23 18:16:26 +00:00
|
|
|
self._currently_processing = False
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-10-01 18:23:15 +00:00
|
|
|
# Return a view of the queue item rather than an active db object
|
2014-05-21 23:50:37 +00:00
|
|
|
return item
|
2013-10-17 22:25:19 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
def complete(self, completed_item):
|
2014-05-22 16:13:41 +00:00
|
|
|
with self._transaction_factory(db):
|
2014-10-01 18:23:15 +00:00
|
|
|
completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id)
|
|
|
|
completed_item_obj.delete_instance()
|
2014-05-23 18:16:26 +00:00
|
|
|
self._currently_processing = False
|
2013-10-18 21:27:09 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
|
2014-05-22 16:13:41 +00:00
|
|
|
with self._transaction_factory(db):
|
2014-05-23 18:16:26 +00:00
|
|
|
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
|
2014-10-01 18:23:15 +00:00
|
|
|
incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id)
|
|
|
|
incomplete_item_obj.available_after = retry_date
|
|
|
|
incomplete_item_obj.available = True
|
2014-05-06 22:46:19 +00:00
|
|
|
|
2014-05-21 23:50:37 +00:00
|
|
|
if restore_retry:
|
2014-10-01 18:23:15 +00:00
|
|
|
incomplete_item_obj.retries_remaining += 1
|
2014-05-06 22:46:19 +00:00
|
|
|
|
2014-10-01 18:23:15 +00:00
|
|
|
incomplete_item_obj.save()
|
2014-05-23 18:16:26 +00:00
|
|
|
self._currently_processing = False
|
2013-10-29 19:42:19 +00:00
|
|
|
|
2014-04-11 22:34:47 +00:00
|
|
|
@staticmethod
|
2014-11-21 19:27:06 +00:00
|
|
|
def extend_processing(queue_item, seconds_from_now, retry_count=None,
|
|
|
|
minimum_extension=MINIMUM_EXTENSION):
|
2014-05-23 18:16:26 +00:00
|
|
|
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
|
2014-05-06 22:46:19 +00:00
|
|
|
|
|
|
|
# Only actually write the new expiration to the db if it moves the expiration some minimum
|
2014-11-21 19:27:06 +00:00
|
|
|
if new_expiration - queue_item.processing_expires > minimum_extension:
|
|
|
|
if retry_count is not None:
|
|
|
|
queue_item.retries_remaining = retry_count
|
|
|
|
|
2014-05-06 22:46:19 +00:00
|
|
|
queue_item.processing_expires = new_expiration
|
2014-12-01 17:48:59 +00:00
|
|
|
queue_item.save()
|