This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/queue.py

138 lines
4.8 KiB
Python

from datetime import datetime, timedelta
from data.database import QueueItem, db
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
class WorkQueue(object):
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None):
self._queue_name = queue_name
self._reporter = reporter
self._transaction_factory = transaction_factory
self._currently_processing = False
if canonical_name_match_list is None:
self._canonical_name_match_list = []
else:
self._canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
def _running_jobs(self, now, name_match_query):
return (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query, running_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def update_metrics(self):
with self._transaction_factory(db):
if self._reporter is None:
return
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count()
avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(self._currently_processing, running_count, running_count + available_count)
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after.
"""
params = {
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
'body': message,
'retries_remaining': retries_remaining,
}
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
params['available_after'] = available_date
with self._transaction_factory(db):
QueueItem.create(**params)
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
with self._transaction_factory(db):
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs(now, name_match_query, running)
item = None
try:
db_item = avail.order_by(QueueItem.id).get()
db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1
db_item.save()
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item_obj = QueueItem.get(QueueItem.id == completed_item.id)
completed_item_obj.delete_instance()
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item_obj = QueueItem.get(QueueItem.id == incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
if restore_retry:
incomplete_item_obj.retries_remaining += 1
incomplete_item_obj.save()
self._currently_processing = False
def extend_processing(self, queue_item, seconds_from_now):
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
queue_item_obj = QueueItem.get(QueueItem.id == queue_item.id)
if new_expiration - queue_item_obj.processing_expires > MINIMUM_EXTENSION:
with self._transaction_factory(db):
queue_item_obj.processing_expires = new_expiration
queue_item_obj.save()