from datetime import datetime, timedelta from data.database import QueueItem, db MINIMUM_EXTENSION = timedelta(seconds=20) class WorkQueue(object): def __init__(self, queue_name, transaction_factory, canonical_name_match_list=None, reporter=None): self._queue_name = queue_name self._reporter = reporter self._transaction_factory = transaction_factory if canonical_name_match_list is None: self._canonical_name_match_list = [] else: self._canonical_name_match_list = canonical_name_match_list @staticmethod def _canonical_name(name_list): return '/'.join(name_list) + '/' def _running_jobs(self, now, name_match_query): return (QueueItem .select(QueueItem.queue_name) .where(QueueItem.available == False, QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) def _report_queue_metrics(self): if self._reporter is None: return now = datetime.now() name_match_query = self._name_match_query() total_jobs = (QueueItem .select(QueueItem.queue_name) .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires > now) | (QueueItem.retries_remaining > 0))) .distinct() .count()) running = self._running_jobs(now, name_match_query).distinct().count() self._reporter(running, total_jobs) def update_metrics(self): with self._transaction_factory(db): self._report_queue_metrics() def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self._canonical_name([self._queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } if available_after: available_date = datetime.now() + timedelta(seconds=available_after) params['available_after'] = available_date with self._transaction_factory(db): QueueItem.create(**params) self._report_queue_metrics() def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ now = datetime.now() name_match_query = self._name_match_query() with self._transaction_factory(db): running = self._running_jobs(now, name_match_query) avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running)) found = list(avail.limit(1).order_by(QueueItem.id)) item = None if found: item = found[0] item.available = False item.processing_expires = now + timedelta(seconds=processing_time) item.retries_remaining -= 1 item.save() self._report_queue_metrics() return item def complete(self, completed_item): with self._transaction_factory(db): completed_item.delete_instance() self._report_queue_metrics() def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): retry_date = datetime.now() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True if restore_retry: incomplete_item.retries_remaining += 1 incomplete_item.save() self._report_queue_metrics() @staticmethod def extend_processing(queue_item, seconds_from_now): new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: queue_item.processing_expires = new_expiration queue_item.save()