This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/queue.py
2015-02-03 14:50:01 -05:00

153 lines
5.5 KiB
Python

from datetime import datetime, timedelta
from data.database import QueueItem, db, db_for_update
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
class WorkQueue(object):
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None):
self._queue_name = queue_name
self._reporter = reporter
self._transaction_factory = transaction_factory
self._currently_processing = False
if canonical_name_match_list is None:
self._canonical_name_match_list = []
else:
self._canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
def _running_jobs(self, now, name_match_query):
return (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0))
def _available_jobs_not_running(self, now, name_match_query, running_query):
return (self
._available_jobs(now, name_match_query)
.where(~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def _item_by_id_for_update(self, queue_id):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
def update_metrics(self):
if self._reporter is None:
return
with self._transaction_factory(db):
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count()
avialable_query = self._available_jobs_not_running(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(self._currently_processing, running_count, running_count + available_count)
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after.
"""
params = {
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
'body': message,
'retries_remaining': retries_remaining,
}
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
params['available_after'] = available_date
with self._transaction_factory(db):
QueueItem.create(**params)
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
item = None
try:
db_item_candidate = avail.order_by(QueueItem.id).get()
with self._transaction_factory(db):
still_available_query = (db_for_update(self
._available_jobs(now, name_match_query)
.where(QueueItem.id == db_item_candidate.id)))
db_item = still_available_query.get()
db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1
db_item.save()
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item_obj = self._item_by_id_for_update(completed_item.id)
completed_item_obj.delete_instance()
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
if restore_retry:
incomplete_item_obj.retries_remaining += 1
incomplete_item_obj.save()
self._currently_processing = False
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
with self._transaction_factory(db):
queue_item = self._item_by_id_for_update(item.id)
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > minimum_extension:
queue_item.processing_expires = new_expiration
queue_item.save()