from datetime import datetime, timedelta from data.database import QueueItem, db from app import app transaction_factory = app.config['DB_TRANSACTION_FACTORY'] MINIMUM_EXTENSION = timedelta(seconds=20) class WorkQueue(object): def __init__(self, queue_name, canonical_name_match_list=None): self.queue_name = queue_name if canonical_name_match_list is None: self.canonical_name_match_list = [] else: self.canonical_name_match_list = canonical_name_match_list @staticmethod def _canonical_name(name_list): return '/'.join(name_list) + '/' def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self._canonical_name([self.queue_name] + canonical_name_list), 'body': message, 'retries_remaining': retries_remaining, } if available_after: available_date = datetime.now() + timedelta(seconds=available_after) params['available_after'] = available_date QueueItem.create(**params) def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ now = datetime.now() name_match_query = '%s%%' % self._canonical_name([self.queue_name] + self.canonical_name_match_list) with transaction_factory(db): running = (QueueItem .select(QueueItem.queue_name) .where(QueueItem.available == False, QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running)) found = list(avail.limit(1).order_by(QueueItem.id)) if found: item = found[0] item.available = False item.processing_expires = now + timedelta(seconds=processing_time) item.retries_remaining -= 1 item.save() return item return None @staticmethod def complete(completed_item): completed_item.delete_instance() @staticmethod def incomplete(incomplete_item, retry_after=300, restore_retry=False): retry_date = datetime.now() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True if restore_retry: incomplete_item.retries_remaining += 1 incomplete_item.save() @staticmethod def extend_processing(queue_item, seconds_from_now): new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: queue_item.processing_expires = new_expiration queue_item.save() image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME']) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME']) webhook_queue = WorkQueue(app.config['WEBHOOK_QUEUE_NAME'])