from datetime import datetime, timedelta from data.database import QueueItem, db from app import app transaction_factory = app.config['DB_TRANSACTION_FACTORY'] class WorkQueue(object): def __init__(self, queue_name): self.queue_name = queue_name def put(self, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self.queue_name, 'body': message, 'retries_remaining': retries_remaining, } if available_after: available_date = datetime.now() + timedelta(seconds=available_after) params['available_after'] = available_date QueueItem.create(**params) def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ now = datetime.now() available_or_expired = ((QueueItem.available == True) | (QueueItem.processing_expires <= now)) with transaction_factory(db): avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, QueueItem.available_after <= now, available_or_expired, QueueItem.retries_remaining > 0) found = list(avail.limit(1).order_by(QueueItem.available_after)) if found: item = found[0] item.available = False item.processing_expires = now + timedelta(seconds=processing_time) item.retries_remaining -= 1 item.save() return item return None def complete(self, completed_item): completed_item.delete_instance() def incomplete(self, incomplete_item, retry_after=300): retry_date = datetime.now() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True incomplete_item.save() image_diff_queue = WorkQueue('imagediff') dockerfile_build_queue = WorkQueue('dockerfilebuild3') webhook_queue = WorkQueue('webhook')