Merge pull request #2209 from coreos-inc/clair-notification-read
Clair notification read and queue fixes
This commit is contained in:
commit
3a7119d499
4 changed files with 55 additions and 8 deletions
|
@ -53,11 +53,9 @@ class WorkQueue(object):
|
|||
|
||||
@classmethod
|
||||
def _running_jobs(cls, now, name_match_query):
|
||||
return (QueueItem
|
||||
.select(QueueItem.queue_name)
|
||||
.where(QueueItem.available == False,
|
||||
QueueItem.processing_expires > now,
|
||||
QueueItem.queue_name ** name_match_query))
|
||||
return (cls
|
||||
._running_jobs_where(QueueItem.select(QueueItem.queue_name), now)
|
||||
.where(QueueItem.queue_name ** name_match_query))
|
||||
|
||||
@classmethod
|
||||
def _available_jobs(cls, now, name_match_query):
|
||||
|
@ -65,6 +63,10 @@ class WorkQueue(object):
|
|||
._available_jobs_where(QueueItem.select(), now)
|
||||
.where(QueueItem.queue_name ** name_match_query))
|
||||
|
||||
@staticmethod
|
||||
def _running_jobs_where(query, now):
|
||||
return query.where(QueueItem.available == False, QueueItem.processing_expires > now)
|
||||
|
||||
@staticmethod
|
||||
def _available_jobs_where(query, now):
|
||||
return query.where(QueueItem.available_after <= now,
|
||||
|
@ -141,6 +143,23 @@ class WorkQueue(object):
|
|||
queue_prefix = '%s/%s/%s%%' % (self._queue_name, namespace, subpath_query)
|
||||
QueueItem.delete().where(QueueItem.queue_name ** queue_prefix).execute()
|
||||
|
||||
def alive(self, canonical_name_list):
|
||||
"""
|
||||
Returns True if a job matching the canonical name list is currently processing
|
||||
or available.
|
||||
"""
|
||||
canonical_name = self._canonical_name([self._queue_name] + canonical_name_list)
|
||||
try:
|
||||
select_query = QueueItem.select().where(QueueItem.queue_name == canonical_name)
|
||||
now = datetime.utcnow()
|
||||
|
||||
overall_query = (self._available_jobs_where(select_query.clone(), now) |
|
||||
self._running_jobs_where(select_query.clone(), now))
|
||||
overall_query.get()
|
||||
return True
|
||||
except QueueItem.DoesNotExist:
|
||||
return False
|
||||
|
||||
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||
"""
|
||||
Put an item, if it shouldn't be processed for some number of seconds,
|
||||
|
|
Reference in a new issue