This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/queue.py
Matt Jibson cfb6e884f2 Refactor metric collection
This change adds a generic queue onto which metrics can be pushed. A
separate module removes metrics from the queue and adds them to Cloudwatch.
Since these are now separate ideas, we can easily change the consumer from
Cloudwatch to anything else.

This change maintains near feature parity (the only change is there is now
just one queue instead of two - not a big deal).
2015-08-12 12:15:52 -04:00

205 lines
7.7 KiB
Python

from datetime import datetime, timedelta
from data.database import QueueItem, db, db_for_update
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
class NoopWith:
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
pass
class WorkQueue(object):
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, metric_queue=None):
self._queue_name = queue_name
self._metric_queue = metric_queue
self._transaction_factory = transaction_factory
self._currently_processing = False
if canonical_name_match_list is None:
self._canonical_name_match_list = []
else:
self._canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
def _running_jobs(self, now, name_match_query):
return (QueueItem
.select(QueueItem.queue_name)
.where(QueueItem.available == False,
QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0))
def _available_jobs_not_running(self, now, name_match_query, running_query):
return (self
._available_jobs(now, name_match_query)
.where(~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
def _item_by_id_for_update(self, queue_id):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
def get_metrics(self, require_transaction=True):
guard = self._transaction_factory(db) if require_transaction else NoopWith()
with guard:
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count()
available_query = self._available_jobs(now, name_match_query)
available_count = available_query.select(QueueItem.queue_name).distinct().count()
available_not_running_query = self._available_jobs_not_running(now, name_match_query,
running_query)
available_not_running_count = (available_not_running_query.select(QueueItem.queue_name)
.distinct().count())
return (running_count, available_not_running_count, available_count)
def update_metrics(self):
if self._metric_queue is None:
return
(running_count, available_not_running_count, available_count) = self.get_metrics()
self._metric_queue.put('BuildCapacityShortage', available_not_running_count, unit='Count')
building_percent = 100 if self._currently_processing else 0
self._metric_queue.put('PercentBuilding', building_percent, unit='Percent')
def has_retries_remaining(self, item_id):
""" Returns whether the queue item with the given id has any retries remaining. If the
queue item does not exist, returns False. """
with self._transaction_factory(db):
try:
return QueueItem.get(id=item_id).retries_remaining > 0
except QueueItem.DoesNotExist:
return False
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
params = {
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
'body': message,
'retries_remaining': retries_remaining,
}
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
params['available_after'] = available_date
with self._transaction_factory(db):
return str(QueueItem.create(**params).id)
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
item = None
try:
db_item_candidate = avail.order_by(QueueItem.id).get()
with self._transaction_factory(db):
still_available_query = (db_for_update(self
._available_jobs(now, name_match_query)
.where(QueueItem.id == db_item_candidate.id)))
db_item = still_available_query.get()
db_item.available = False
db_item.processing_expires = now + timedelta(seconds=processing_time)
db_item.retries_remaining -= 1
db_item.save()
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
'retries_remaining': db_item.retries_remaining
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
def cancel(self, item_id):
""" Attempts to cancel the queue item with the given ID from the queue. Returns true on success
and false if the queue item could not be canceled. A queue item can only be canceled if
if is available and has retries remaining.
"""
with self._transaction_factory(db):
# Load the build queue item for update.
try:
queue_item = db_for_update(QueueItem.select()
.where(QueueItem.id == item_id)).get()
except QueueItem.DoesNotExist:
return False
# Check the queue item.
if not queue_item.available or queue_item.retries_remaining == 0:
return False
# Delete the queue item.
queue_item.delete_instance(recursive=True)
return True
def complete(self, completed_item):
with self._transaction_factory(db):
completed_item_obj = self._item_by_id_for_update(completed_item.id)
completed_item_obj.delete_instance(recursive=True)
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
if restore_retry:
incomplete_item_obj.retries_remaining += 1
incomplete_item_obj.save()
self._currently_processing = False
return incomplete_item_obj.retries_remaining > 0
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION):
with self._transaction_factory(db):
queue_item = self._item_by_id_for_update(item.id)
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > minimum_extension:
queue_item.processing_expires = new_expiration
queue_item.save()