from datetime import datetime, timedelta from database import QueueItem class WorkQueue(object): def __init__(self, queue_name): self.queue_name = queue_name def put(self, message, available_after=0): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. """ params = { 'queue_name': self.queue_name, 'body': message, } if available_after: available_date = datetime.now() + timedelta(seconds=available_after) params['available_after'] = available_date QueueItem.create(**params) def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five minutes. """ now = datetime.now() available_or_expired = ((QueueItem.available == True) | (QueueItem.processing_expires <= now)) # TODO the query and the update should be atomic, but for now we only # have one worker. avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name, QueueItem.available_after <= now, available_or_expired) found = list(avail.limit(1).order_by(QueueItem.available_after)) if found: item = found[0] item.available = False item.processing_expires = now + timedelta(seconds=processing_time) item.save() return item return None def complete(self, completed_item): completed_item.delete_instance() image_diff_queue = WorkQueue('imagediff')