Make sure completion marking is also under the lock

This commit is contained in:
Joseph Schorr 2014-07-30 18:45:40 -04:00
parent 4aec422e24
commit bab3a0949c

View file

@ -113,25 +113,32 @@ class Worker(object):
with self._current_item_lock: with self._current_item_lock:
self.current_queue_item = self._queue.get() self.current_queue_item = self._queue.get()
while self.current_queue_item: while True:
logger.debug('Queue gave us some work: %s', self.current_queue_item.body) # Retrieve the current item in the queue over which to operate. We do so under
# a lock to make sure we are always retrieving an item when in a healthy state.
current_queue_item = None
with self._current_item_lock:
current_queue_item = self.current_queue_item
if current_queue_item is None:
break
job_details = json.loads(self.current_queue_item.body) logger.debug('Queue gave us some work: %s', current_queue_item.body)
job_details = json.loads(current_queue_item.body)
try: try:
self.process_queue_item(job_details) self.process_queue_item(job_details)
self._queue.complete(self.current_queue_item) self.mark_current_complete()
except JobException: except JobException:
logger.warning('An error occurred processing request: %s', self.current_queue_item.body) logger.warning('An error occurred processing request: %s', current_queue_item.body)
self._queue.incomplete(self.current_queue_item) self.mark_current_incomplete()
except WorkerUnhealthyException: except WorkerUnhealthyException:
logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.') logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.')
self._stop.set() self._stop.set()
self.mark_current_incomplete() self.mark_current_incomplete()
finally:
with self._current_item_lock:
self.current_queue_item = None
finally:
# Close the db handle periodically # Close the db handle periodically
self._close_db_handle() self._close_db_handle()
@ -182,8 +189,15 @@ class Worker(object):
def mark_current_incomplete(self): def mark_current_incomplete(self):
with self._current_item_lock: with self._current_item_lock:
if self.current_queue_item is not None and self.current_queue_item.retries_remaining == 0: if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=True) self._queue.incomplete(self.current_queue_item, restore_retry=True)
self.current_queue_item = None
def mark_current_complete(self):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.complete(self.current_queue_item)
self.current_queue_item = None
def terminate(self, signal_num=None, stack_frame=None, graceful=False): def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set(): if self._terminated.is_set():