From f4c488f9b675a1ba9babf7ec43978b7068c63945 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Thu, 22 May 2014 13:50:06 -0400 Subject: [PATCH] Fix the queue query for old jobs which won't run. --- data/queue.py | 40 ++++++++++++++++++---------------------- util/queuemetrics.py | 2 ++ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/data/queue.py b/data/queue.py index 77868ad0a..be0c8301b 100644 --- a/data/queue.py +++ b/data/queue.py @@ -29,6 +29,13 @@ class WorkQueue(object): QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) + def _available_jobs(self, now, name_match_query, running_query): + return (QueueItem + .select() + .where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, + ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), + QueueItem.retries_remaining > 0, ~(QueueItem.queue_name << running_query))) + def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) @@ -39,18 +46,14 @@ class WorkQueue(object): now = datetime.now() name_match_query = self._name_match_query() - total_jobs = (QueueItem - .select(QueueItem.queue_name) - .where(QueueItem.queue_name ** name_match_query, - QueueItem.available_after <= now, - ((QueueItem.available == True) | (QueueItem.processing_expires > now) | - (QueueItem.retries_remaining > 0))) - .distinct() - .count()) + running_query = self._running_jobs(now, name_match_query) + running_count =running_query.distinct().count() - running = self._running_jobs(now, name_match_query).distinct().count() + avialable_query = self._available_jobs(now, name_match_query, running_query) + available_count = avialable_query.select(QueueItem.queue_name).distinct().count() - self._reporter(running, total_jobs) + + self._reporter(running_count, running_count + available_count) def update_metrics(self): with self._transaction_factory(db): @@ -87,24 +90,17 @@ class WorkQueue(object): with self._transaction_factory(db): running = self._running_jobs(now, name_match_query) - - avail = QueueItem.select().where(QueueItem.queue_name ** name_match_query, - QueueItem.available_after <= now, - ((QueueItem.available == True) | - (QueueItem.processing_expires <= now)), - QueueItem.retries_remaining > 0, - ~(QueueItem.queue_name << running)) - - found = list(avail.limit(1).order_by(QueueItem.id)) + avail = self._available_jobs(now, name_match_query, running) item = None - - if found: - item = found[0] + try: + item = avail.order_by(QueueItem.id).get() item.available = False item.processing_expires = now + timedelta(seconds=processing_time) item.retries_remaining -= 1 item.save() + except QueueItem.DoesNotExist: + pass self._report_queue_metrics() diff --git a/util/queuemetrics.py b/util/queuemetrics.py index bc4784b4e..10b5caf3b 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -17,6 +17,8 @@ class CloudWatchReporter(object): self._name = name def report(self, running_count, total_count): + logger.debug('Worker indicated %s running count and %s total count', running_count, + total_count) need_capacity_count = total_count - running_count self._connection.put_metric_data(self._namespace, self._name, need_capacity_count, unit='Count')