diff --git a/workers/worker.py b/workers/worker.py index 1af1d8ab4..acfff9638 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -113,25 +113,32 @@ class Worker(object): with self._current_item_lock: self.current_queue_item = self._queue.get() - while self.current_queue_item: - logger.debug('Queue gave us some work: %s', self.current_queue_item.body) + while True: + # Retrieve the current item in the queue over which to operate. We do so under + # a lock to make sure we are always retrieving an item when in a healthy state. + current_queue_item = None + with self._current_item_lock: + current_queue_item = self.current_queue_item + if current_queue_item is None: + break - job_details = json.loads(self.current_queue_item.body) + logger.debug('Queue gave us some work: %s', current_queue_item.body) + job_details = json.loads(current_queue_item.body) try: self.process_queue_item(job_details) - self._queue.complete(self.current_queue_item) + self.mark_current_complete() + except JobException: - logger.warning('An error occurred processing request: %s', self.current_queue_item.body) - self._queue.incomplete(self.current_queue_item) + logger.warning('An error occurred processing request: %s', current_queue_item.body) + self.mark_current_incomplete() + except WorkerUnhealthyException: logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.') self._stop.set() self.mark_current_incomplete() - finally: - with self._current_item_lock: - self.current_queue_item = None + finally: # Close the db handle periodically self._close_db_handle() @@ -182,8 +189,15 @@ class Worker(object): def mark_current_incomplete(self): with self._current_item_lock: - if self.current_queue_item is not None and self.current_queue_item.retries_remaining == 0: + if self.current_queue_item is not None: self._queue.incomplete(self.current_queue_item, restore_retry=True) + self.current_queue_item = None + + def mark_current_complete(self): + with self._current_item_lock: + if self.current_queue_item is not None: + self._queue.complete(self.current_queue_item) + self.current_queue_item = None def terminate(self, signal_num=None, stack_frame=None, graceful=False): if self._terminated.is_set():