data.queue: lint
This commit is contained in:
parent
8a5aa65d74
commit
26300d3c8e
1 changed files with 21 additions and 13 deletions
|
@ -42,30 +42,36 @@ class WorkQueue(object):
|
|||
def _canonical_name(name_list):
|
||||
return '/'.join(name_list) + '/'
|
||||
|
||||
def _running_jobs(self, now, name_match_query):
|
||||
@classmethod
|
||||
def _running_jobs(cls, now, name_match_query):
|
||||
return (QueueItem
|
||||
.select(QueueItem.queue_name)
|
||||
.where(QueueItem.available == False,
|
||||
QueueItem.processing_expires > now,
|
||||
QueueItem.queue_name ** name_match_query))
|
||||
|
||||
def _available_jobs(self, now, name_match_query):
|
||||
return self._available_jobs_where(QueueItem.select(), now, name_match_query)
|
||||
@classmethod
|
||||
def _available_jobs(cls, now, name_match_query):
|
||||
return cls._available_jobs_where(QueueItem.select(), now, name_match_query)
|
||||
|
||||
def _available_jobs_where(self, query, now, name_match_query):
|
||||
return query.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
|
||||
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
||||
QueueItem.retries_remaining > 0)
|
||||
@staticmethod
|
||||
def _available_jobs_where(query, now, name_match_query):
|
||||
return query.where(QueueItem.queue_name ** name_match_query,
|
||||
QueueItem.available_after <= now,
|
||||
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
||||
QueueItem.retries_remaining > 0)
|
||||
|
||||
def _available_jobs_not_running(self, now, name_match_query, running_query):
|
||||
return (self
|
||||
@classmethod
|
||||
def _available_jobs_not_running(cls, now, name_match_query, running_query):
|
||||
return (cls
|
||||
._available_jobs(now, name_match_query)
|
||||
.where(~(QueueItem.queue_name << running_query)))
|
||||
|
||||
def _name_match_query(self):
|
||||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||
|
||||
def _item_by_id_for_update(self, queue_id):
|
||||
@staticmethod
|
||||
def _item_by_id_for_update(queue_id):
|
||||
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
|
||||
|
||||
def get_metrics(self):
|
||||
|
@ -80,8 +86,10 @@ class WorkQueue(object):
|
|||
|
||||
available_not_running_query = self._available_jobs_not_running(now, name_match_query,
|
||||
running_query)
|
||||
available_not_running_count = (available_not_running_query.select(QueueItem.queue_name)
|
||||
.distinct().count())
|
||||
available_not_running_count = (available_not_running_query
|
||||
.select(QueueItem.queue_name)
|
||||
.distinct()
|
||||
.count())
|
||||
|
||||
return (running_count, available_not_running_count, available_count)
|
||||
|
||||
|
@ -193,7 +201,7 @@ class WorkQueue(object):
|
|||
# Load the build queue item for update.
|
||||
try:
|
||||
queue_item = db_for_update(QueueItem.select()
|
||||
.where(QueueItem.id == item_id)).get()
|
||||
.where(QueueItem.id == item_id)).get()
|
||||
except QueueItem.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
|
Reference in a new issue