Add a lock around accessing the current queue item and make sure to report it as incomplete whenever the worker becomes unhealthy

This commit is contained in:
Joseph Schorr 2014-07-30 18:30:54 -04:00
parent 7e935f5a8c
commit 4aec422e24

View file

@ -3,7 +3,7 @@ import json
import signal import signal
import sys import sys
from threading import Event from threading import Event, Lock
from apscheduler.scheduler import Scheduler from apscheduler.scheduler import Scheduler
from datetime import datetime, timedelta from datetime import datetime, timedelta
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
@ -71,6 +71,7 @@ class Worker(object):
self._stop = Event() self._stop = Event()
self._terminated = Event() self._terminated = Event()
self._queue = queue self._queue = queue
self._current_item_lock = Lock()
self.current_queue_item = None self.current_queue_item = None
def process_queue_item(self, job_details): def process_queue_item(self, job_details):
@ -93,8 +94,9 @@ class Worker(object):
return self._terminated.is_set() return self._terminated.is_set()
def extend_processing(self, seconds_from_now): def extend_processing(self, seconds_from_now):
if self.current_queue_item is not None: with self._current_item_lock:
self._queue.extend_processing(self.current_queue_item, seconds_from_now) if self.current_queue_item is not None:
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
def run_watchdog(self): def run_watchdog(self):
logger.debug('Running watchdog.') logger.debug('Running watchdog.')
@ -102,12 +104,15 @@ class Worker(object):
self.watchdog() self.watchdog()
except WorkerUnhealthyException: except WorkerUnhealthyException:
logger.error('The worker has encountered an error and will not take new jobs.') logger.error('The worker has encountered an error and will not take new jobs.')
self.mark_current_incomplete()
self._stop.set() self._stop.set()
def poll_queue(self): def poll_queue(self):
logger.debug('Getting work item from queue.') logger.debug('Getting work item from queue.')
self.current_queue_item = self._queue.get() with self._current_item_lock:
self.current_queue_item = self._queue.get()
while self.current_queue_item: while self.current_queue_item:
logger.debug('Queue gave us some work: %s', self.current_queue_item.body) logger.debug('Queue gave us some work: %s', self.current_queue_item.body)
@ -122,15 +127,17 @@ class Worker(object):
except WorkerUnhealthyException: except WorkerUnhealthyException:
logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.') logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.')
self._stop.set() self._stop.set()
self._queue.incomplete(self.current_queue_item, restore_retry=True) self.mark_current_incomplete()
finally: finally:
self.current_queue_item = None with self._current_item_lock:
self.current_queue_item = None
# Close the db handle periodically # Close the db handle periodically
self._close_db_handle() self._close_db_handle()
if not self._stop.is_set(): if not self._stop.is_set():
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) with self._current_item_lock:
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
if not self._stop.is_set(): if not self._stop.is_set():
logger.debug('No more work.') logger.debug('No more work.')
@ -173,6 +180,11 @@ class Worker(object):
while start_status_server_port is not None: while start_status_server_port is not None:
sleep(60) sleep(60)
def mark_current_incomplete(self):
with self._current_item_lock:
if self.current_queue_item is not None and self.current_queue_item.retries_remaining == 0:
self._queue.incomplete(self.current_queue_item, restore_retry=True)
def terminate(self, signal_num=None, stack_frame=None, graceful=False): def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set(): if self._terminated.is_set():
sys.exit(1) sys.exit(1)
@ -184,8 +196,7 @@ class Worker(object):
if not graceful: if not graceful:
# Give back the retry that we took for this queue item so that if it were down to zero # Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker # retries it will still be picked up by another worker
if self.current_queue_item is not None: self.mark_current_incomplete()
self._queue.incomplete(self.current_queue_item, restore_retry=True)
def join(self): def join(self):
self.terminate(graceful=True) self.terminate(graceful=True)