This repository has been archived on 2020-03-24. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
quay/data/queue.py
Jake Moshenko 21e3001446 Add a bulk insert for queue and notifications.
Use it for Clair spawned notifications.
2016-12-06 14:00:16 -05:00

348 lines
14 KiB
Python

from datetime import datetime, timedelta
from contextlib import contextmanager
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20)
DEFAULT_BATCH_SIZE = 1000
class NoopWith:
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
pass
class BuildMetricQueueReporter(object):
""" Metric queue reporter for the build system. """
def __init__(self, metric_queue):
self._metric_queue = metric_queue
def __call__(self, currently_processing, running_count, total_count):
need_capacity_count = total_count - running_count
self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count')
self._metric_queue.build_capacity_shortage.Set(need_capacity_count)
building_percent = 100 if currently_processing else 0
self._metric_queue.put_deprecated('PercentBuilding', building_percent, unit='Percent')
self._metric_queue.percent_building.Set(building_percent)
class WorkQueue(object):
""" Work queue defines methods for interacting with a queue backed by the database. """
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None, metric_queue=None,
has_namespace=False):
self._queue_name = queue_name
self._reporter = reporter
self._metric_queue = metric_queue
self._transaction_factory = transaction_factory
self._currently_processing = False
self._has_namespaced_items = has_namespace
if canonical_name_match_list is None:
self._canonical_name_match_list = []
else:
self._canonical_name_match_list = canonical_name_match_list
@staticmethod
def _canonical_name(name_list):
return '/'.join(name_list) + '/'
@classmethod
def _running_jobs(cls, now, name_match_query):
return (cls
._running_jobs_where(QueueItem.select(QueueItem.queue_name), now)
.where(QueueItem.queue_name ** name_match_query))
@classmethod
def _available_jobs(cls, now, name_match_query):
return (cls
._available_jobs_where(QueueItem.select(), now)
.where(QueueItem.queue_name ** name_match_query))
@staticmethod
def _running_jobs_where(query, now):
return query.where(QueueItem.available == False, QueueItem.processing_expires > now)
@staticmethod
def _available_jobs_where(query, now):
return query.where(QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0)
@classmethod
def _available_jobs_not_running(cls, now, name_match_query, running_query):
return (cls
._available_jobs(now, name_match_query)
.where(~(QueueItem.queue_name << running_query)))
def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
@staticmethod
def _item_by_id_for_update(queue_id):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
def get_metrics(self):
now = datetime.utcnow()
name_match_query = self._name_match_query()
running_query = self._running_jobs(now, name_match_query)
running_count = running_query.distinct().count()
available_query = self._available_jobs(now, name_match_query)
available_count = available_query.select(QueueItem.queue_name).distinct().count()
available_not_running_query = self._available_jobs_not_running(now, name_match_query,
running_query)
available_not_running_count = (available_not_running_query
.select(QueueItem.queue_name)
.distinct()
.count())
return (running_count, available_not_running_count, available_count)
def update_metrics(self):
if self._reporter is None and self._metric_queue is None:
return
(running_count, available_not_running_count, available_count) = self.get_metrics()
if self._metric_queue:
dim = {'queue': self._queue_name}
self._metric_queue.put_deprecated('Running', running_count, dimensions=dim)
self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count,
dimensions=dim)
self._metric_queue.put_deprecated('Available', available_count, dimensions=dim)
self._metric_queue.work_queue_running.Set(running_count, labelvalues=[self._queue_name])
self._metric_queue.work_queue_available.Set(available_count, labelvalues=[self._queue_name])
if self._reporter:
self._reporter(self._currently_processing, running_count,
running_count + available_not_running_count)
def has_retries_remaining(self, item_id):
""" Returns whether the queue item with the given id has any retries remaining. If the
queue item does not exist, returns False. """
with self._transaction_factory(db):
try:
return QueueItem.get(id=item_id).retries_remaining > 0
except QueueItem.DoesNotExist:
return False
def delete_namespaced_items(self, namespace, subpath=None):
""" Deletes all items in this queue that exist under the given namespace. """
if not self._has_namespaced_items:
return False
subpath_query = '%s/' % subpath if subpath else ''
queue_prefix = '%s/%s/%s%%' % (self._queue_name, namespace, subpath_query)
QueueItem.delete().where(QueueItem.queue_name ** queue_prefix).execute()
def alive(self, canonical_name_list):
"""
Returns True if a job matching the canonical name list is currently processing
or available.
"""
canonical_name = self._canonical_name([self._queue_name] + canonical_name_list)
try:
select_query = QueueItem.select().where(QueueItem.queue_name == canonical_name)
now = datetime.utcnow()
overall_query = (self._available_jobs_where(select_query.clone(), now) |
self._running_jobs_where(select_query.clone(), now))
overall_query.get()
return True
except QueueItem.DoesNotExist:
return False
def _queue_body(self, canonical_name_list, message, available_after, retries_remaining):
return dict(
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
body=message,
retries_remaining=retries_remaining,
available_after=datetime.utcnow() + timedelta(seconds=available_after or 0),
)
@contextmanager
def batch_insert(self, batch_size=DEFAULT_BATCH_SIZE):
items_to_insert = []
def batch_put(canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
items_to_insert.append(self._queue_body(canonical_name_list, message, available_after,
retries_remaining))
yield batch_put
# Chunk the inserted items into batch_size chunks and insert_many
remaining = list(items_to_insert)
while remaining:
QueueItem.insert_many(remaining[0:batch_size]).execute()
remaining = remaining[batch_size:]
if self._metric_queue:
self._metric_queue.put_deprecated('Added', len(items_to_insert),
dimensions={'queue': self._queue_name})
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after,
retries_remaining))
if self._metric_queue:
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
return str(item.id)
def get(self, processing_time=300, ordering_required=False):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
item = None
try:
if ordering_required:
# The previous solution to this used a select for update in a
# transaction to prevent multiple instances from processing the
# same queue item. This suffered performance problems. This solution
# instead has instances attempt to update the potential queue item to be
# unavailable. However, since their update clause is restricted to items
# that are available=False, only one instance's update will succeed, and
# it will have a changed row count of 1. Instances that have 0 changed
# rows know that another instance is already handling that item.
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
db_item = avail.order_by(QueueItem.id).get()
else:
# If we don't require ordering, we grab a random item from any of the first 50 available.
subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1')
db_item = (QueueItem
.select()
.join(subquery, on=QueueItem.id == subquery.c.id)
.order_by(db_random_func())
.get())
set_unavailable_query = (QueueItem
.update(available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining-1)
.where(QueueItem.id == db_item.id))
changed_query = (self._available_jobs_where(set_unavailable_query, now)
.where(QueueItem.processing_expires == db_item.processing_expires))
changed = changed_query.execute()
if changed == 1:
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
'retries_remaining': db_item.retries_remaining - 1,
})
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
def cancel(self, item_id):
""" Attempts to cancel the queue item with the given ID from the queue. Returns true on success
and false if the queue item could not be canceled.
"""
with self._transaction_factory(db):
# Load the build queue item for update.
try:
queue_item = db_for_update(QueueItem.select()
.where(QueueItem.id == item_id)).get()
except QueueItem.DoesNotExist:
return False
# Delete the queue item.
queue_item.delete_instance(recursive=True)
return True
def complete(self, completed_item):
with self._transaction_factory(db):
try:
completed_item_obj = self._item_by_id_for_update(completed_item.id)
except QueueItem.DoesNotExist:
self._currently_processing = False
return
completed_item_obj.delete_instance(recursive=True)
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
try:
incomplete_item_obj = self._item_by_id_for_update(incomplete_item.id)
incomplete_item_obj.available_after = retry_date
incomplete_item_obj.available = True
if restore_retry:
incomplete_item_obj.retries_remaining += 1
incomplete_item_obj.save()
self._currently_processing = False
return incomplete_item_obj.retries_remaining > 0
except QueueItem.DoesNotExist:
return False
def extend_processing(self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION,
updated_data=None):
with self._transaction_factory(db):
try:
queue_item = self._item_by_id_for_update(item.id)
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
has_change = False
# Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > minimum_extension:
queue_item.processing_expires = new_expiration
has_change = True
if updated_data is not None and queue_item.body != updated_data:
queue_item.body = updated_data
has_change = True
if has_change:
queue_item.save()
except QueueItem.DoesNotExist:
return
def delete_expired(expiration_threshold, deletion_threshold, batch_size):
"""
Deletes all queue items that are older than the provided expiration threshold in batches of the
provided size. If there are less items than the deletion threshold, this method does nothing.
Returns the number of items deleted.
"""
to_delete = list(QueueItem
.select()
.where(QueueItem.processing_expires <= expiration_threshold)
.limit(batch_size))
if len(to_delete) < deletion_threshold:
return 0
QueueItem.delete().where(QueueItem.id << to_delete).execute()
return len(to_delete)