Fix the queue query for old jobs which won't run.

This commit is contained in:
Jake Moshenko 2014-05-22 13:50:06 -04:00
parent f6726bd0a4
commit f4c488f9b6
2 changed files with 20 additions and 22 deletions

View file

@ -29,6 +29,13 @@ class WorkQueue(object):
QueueItem.processing_expires > now, QueueItem.processing_expires > now,
QueueItem.queue_name ** name_match_query)) QueueItem.queue_name ** name_match_query))
def _available_jobs(self, now, name_match_query, running_query):
return (QueueItem
.select()
.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query)))
def _name_match_query(self): def _name_match_query(self):
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
@ -39,18 +46,14 @@ class WorkQueue(object):
now = datetime.now() now = datetime.now()
name_match_query = self._name_match_query() name_match_query = self._name_match_query()
total_jobs = (QueueItem running_query = self._running_jobs(now, name_match_query)
.select(QueueItem.queue_name) running_count =running_query.distinct().count()
.where(QueueItem.queue_name ** name_match_query,
QueueItem.available_after <= now,
((QueueItem.available == True) | (QueueItem.processing_expires > now) |
(QueueItem.retries_remaining > 0)))
.distinct()
.count())
running = self._running_jobs(now, name_match_query).distinct().count() avialable_query = self._available_jobs(now, name_match_query, running_query)
available_count = avialable_query.select(QueueItem.queue_name).distinct().count()
self._reporter(running, total_jobs)
self._reporter(running_count, running_count + available_count)
def update_metrics(self): def update_metrics(self):
with self._transaction_factory(db): with self._transaction_factory(db):
@ -87,24 +90,17 @@ class WorkQueue(object):
with self._transaction_factory(db): with self._transaction_factory(db):
running = self._running_jobs(now, name_match_query) running = self._running_jobs(now, name_match_query)
avail = self._available_jobs(now, name_match_query, running)
avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query,
QueueItem.available_after <= now,
((QueueItem.available == True) |
(QueueItem.processing_expires <= now)),
QueueItem.retries_remaining > 0,
~(QueueItem.queue_name << running))
found = list(avail.limit(1).order_by(QueueItem.id))
item = None item = None
try:
if found: item = avail.order_by(QueueItem.id).get()
item = found[0]
item.available = False item.available = False
item.processing_expires = now + timedelta(seconds=processing_time) item.processing_expires = now + timedelta(seconds=processing_time)
item.retries_remaining -= 1 item.retries_remaining -= 1
item.save() item.save()
except QueueItem.DoesNotExist:
pass
self._report_queue_metrics() self._report_queue_metrics()

View file

@ -17,6 +17,8 @@ class CloudWatchReporter(object):
self._name = name self._name = name
def report(self, running_count, total_count): def report(self, running_count, total_count):
logger.debug('Worker indicated %s running count and %s total count', running_count,
total_count)
need_capacity_count = total_count - running_count need_capacity_count = total_count - running_count
self._connection.put_metric_data(self._namespace, self._name, need_capacity_count, self._connection.put_metric_data(self._namespace, self._name, need_capacity_count,
unit='Count') unit='Count')