from datetime import datetime, timedelta from data.database import QueueItem, db from util.morecollections import AttrDict MINIMUM_EXTENSION = timedelta(seconds=20) class WorkQueue(object): def __init__(self, queue_name, transaction_factory, canonical_name_match_list=None, reporter=None): self._queue_name = queue_name self._reporter = reporter self._transaction_factory = transaction_factory self._currently_processing = False if canonical_name_match_list is None: self._canonical_name_match_list = [] else: self._canonical_name_match_list = canonical_name_match_list @staticmethod def _canonical_name(name_list): return '/'.join(name_list) + '/' def _running_jobs(self, now, name_match_query): return (QueueItem .select(QueueItem.queue_name) .where(QueueItem.available == False, QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) def _available_jobs(self, now, name_match_query, running_query): return (QueueItem .select() .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query))) def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) def update_metrics(self): if self._reporter is None: return with self._transaction_factory(db): now = datetime.utcnow() name_match_query = self._name_match_query() running_query = self._running_jobs(now, name_match_query) running_count = running_query.distinct().count() avialable_query = self._available_jobs(now, name_match_query, running_query) available_count = avialable_query.select(QueueItem.queue_name).distinct().count() self._reporter(self._currently_processing, running_count, running_count + available_count) def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self._canonical_name([self._queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } available_date = datetime.utcnow() + timedelta(seconds=available_after or 0) params['available_after'] = available_date with self._transaction_factory(db): QueueItem.create(**params) def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ now = datetime.utcnow() name_match_query = self._name_match_query() with self._transaction_factory(db): running = self._running_jobs(now, name_match_query) avail = self._available_jobs(now, name_match_query, running) item = None try: db_item = avail.order_by(QueueItem.id).get() db_item.available = False db_item.processing_expires = now + timedelta(seconds=processing_time) db_item.retries_remaining -= 1 db_item.save() item = AttrDict({ 'id': db_item.id, 'body': db_item.body, }) self._currently_processing = True except QueueItem.DoesNotExist: self._currently_processing = False # Return a view of the queue item rather than an active db object return item def complete(self, completed_item): with self._transaction_factory(db): completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id) completed_item_obj.delete_instance() self._currently_processing = False def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): retry_date = datetime.utcnow() + timedelta(seconds=retry_after) incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id) incomplete_item_obj.available_after = retry_date incomplete_item_obj.available = True if restore_retry: incomplete_item_obj.retries_remaining += 1 incomplete_item_obj.save() self._currently_processing = False def extend_processing(self, queue_item, seconds_from_now): new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum queue_item_obj = QueueItem.get(QueueItem.id == queue_item.id) if new_expiration - queue_item_obj.processing_expires > MINIMUM_EXTENSION: with self._transaction_factory(db): queue_item_obj.processing_expires = new_expiration queue_item_obj.save()