from datetime import datetime, timedelta from data.database import QueueItem, db, db_for_update from util.morecollections import AttrDict MINIMUM_EXTENSION = timedelta(seconds=20) class WorkQueue(object): def __init__(self, queue_name, transaction_factory, canonical_name_match_list=None, reporter=None): self._queue_name = queue_name self._reporter = reporter self._transaction_factory = transaction_factory self._currently_processing = False if canonical_name_match_list is None: self._canonical_name_match_list = [] else: self._canonical_name_match_list = canonical_name_match_list @staticmethod def _canonical_name(name_list): return '/'.join(name_list) + '/' def _running_jobs(self, now, name_match_query): return (QueueItem .select(QueueItem.queue_name) .where(QueueItem.available == False, QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) def _available_jobs(self, now, name_match_query): return (QueueItem .select() .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), QueueItem.retries_remaining > 0)) def _available_jobs_not_running(self, now, name_match_query, running_query): return (self ._available_jobs(now, name_match_query) .where(~(QueueItem.queue_name << running_query))) def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) def _item_by_id_for_update(self, queue_id): return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get() def update_metrics(self): if self._reporter is None: return with self._transaction_factory(db): now = datetime.utcnow() name_match_query = self._name_match_query() running_query = self._running_jobs(now, name_match_query) running_count = running_query.distinct().count() available_query = self._available_jobs_not_running(now, name_match_query, running_query) available_count = available_query.select(QueueItem.queue_name).distinct().count() self._reporter(self._currently_processing, running_count, running_count + available_count) def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self._canonical_name([self._queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } available_date = datetime.utcnow() + timedelta(seconds=available_after or 0) params['available_after'] = available_date with self._transaction_factory(db): return QueueItem.create(**params) def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. The result of this method must always be composed of simple python objects which are JSON serializable for network portability reasons. """ now = datetime.utcnow() name_match_query = self._name_match_query() running = self._running_jobs(now, name_match_query) avail = self._available_jobs_not_running(now, name_match_query, running) item = None try: db_item_candidate = avail.order_by(QueueItem.id).get() with self._transaction_factory(db): still_available_query = (db_for_update(self ._available_jobs(now, name_match_query) .where(QueueItem.id == db_item_candidate.id))) db_item = still_available_query.get() db_item.available = False db_item.processing_expires = now + timedelta(seconds=processing_time) db_item.retries_remaining -= 1 db_item.save() item = AttrDict({ 'id': db_item.id, 'body': db_item.body, 'retries_remaining': db_item.retries_remaining }) self._currently_processing = True except QueueItem.DoesNotExist: self._currently_processing = False # Return a view of the queue item rather than an active db object return item def complete(self, completed_item): with self._transaction_factory(db): completed_item_obj = self._item_by_id_for_update(completed_item.id) completed_item_obj.delete_instance() self._currently_processing = False def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): retry_date = datetime.utcnow() + timedelta(seconds=retry_after) incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id) incomplete_item_obj.available_after = retry_date incomplete_item_obj.available = True if restore_retry: incomplete_item_obj.retries_remaining += 1 incomplete_item_obj.save() self._currently_processing = False return incomplete_item_obj.retries_remaining > 0 def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION): with self._transaction_factory(db): queue_item = self._item_by_id_for_update(item.id) new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum if new_expiration - queue_item.processing_expires > minimum_extension: queue_item.processing_expires = new_expiration queue_item.save()