63 lines
1.7 KiB
Python
63 lines
1.7 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from database import QueueItem
|
|
|
|
|
|
class WorkQueue(object):
|
|
def __init__(self, queue_name):
|
|
self.queue_name = queue_name
|
|
|
|
def put(self, message, available_after=0):
|
|
"""
|
|
Put an item, if it shouldn't be processed for some number of seconds,
|
|
specify that amount as available_after.
|
|
"""
|
|
|
|
params = {
|
|
'queue_name': self.queue_name,
|
|
'body': message,
|
|
}
|
|
|
|
if available_after:
|
|
available_date = datetime.now() + timedelta(seconds=available_after)
|
|
params['available_after'] = available_date
|
|
|
|
QueueItem.create(**params)
|
|
|
|
def get(self, processing_time=300):
|
|
"""
|
|
Get an available item and mark it as unavailable for the default of five
|
|
minutes.
|
|
"""
|
|
now = datetime.now()
|
|
available_or_expired = ((QueueItem.available == True) |
|
|
(QueueItem.processing_expires <= now))
|
|
|
|
# TODO the query and the update should be atomic, but for now we only
|
|
# have one worker.
|
|
avail = QueueItem.select().where(QueueItem.queue_name == self.queue_name,
|
|
QueueItem.available_after <= now,
|
|
available_or_expired)
|
|
|
|
found = list(avail.limit(1).order_by(QueueItem.available_after))
|
|
|
|
if found:
|
|
item = found[0]
|
|
item.available = False
|
|
item.processing_expires = now + timedelta(seconds=processing_time)
|
|
item.save()
|
|
|
|
return item
|
|
|
|
return None
|
|
|
|
def complete(self, completed_item):
|
|
completed_item.delete_instance()
|
|
|
|
def incomplete(self, incomplete_item):
|
|
incomplete_item.available = True
|
|
incomplete_item.save()
|
|
|
|
|
|
image_diff_queue = WorkQueue('imagediff')
|
|
dockerfile_build_queue = WorkQueue('dockerfilebuild')
|