diff --git a/data/queue.py b/data/queue.py index 79e645ebf..548ee380c 100644 --- a/data/queue.py +++ b/data/queue.py @@ -41,20 +41,20 @@ class WorkQueue(object): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) def update_metrics(self): + if self._reporter is None: + return + with self._transaction_factory(db): - if self._reporter is None: - return - now = datetime.utcnow() name_match_query = self._name_match_query() running_query = self._running_jobs(now, name_match_query) - running_count =running_query.distinct().count() + running_count = running_query.distinct().count() avialable_query = self._available_jobs(now, name_match_query, running_query) available_count = avialable_query.select(QueueItem.queue_name).distinct().count() - self._reporter(self._currently_processing, running_count, running_count + available_count) + self._reporter(self._currently_processing, running_count, running_count + available_count) def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ diff --git a/workers/worker.py b/workers/worker.py index 1442195ce..506a16f97 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -122,6 +122,8 @@ class Worker(object): with self._current_item_lock: current_queue_item = self.current_queue_item if current_queue_item is None: + # Close the db handle. + self._close_db_handle() break logger.debug('Queue gave us some work: %s', current_queue_item.body) @@ -143,7 +145,7 @@ class Worker(object): self._stop.set() finally: - # Close the db handle periodically + # Close the db handle. self._close_db_handle() if not self._stop.is_set():