import logging import json import signal from threading import Event from apscheduler.scheduler import Scheduler from datetime import datetime, timedelta logger = logging.getLogger(__name__) class JobException(Exception): """ A job exception is an exception that is caused by something being malformed in the job. When a worker raises this exception the job will be terminated and the retry will not be returned to the queue. """ pass class WorkerUnhealthyException(Exception): """ When this exception is raised, the worker is no longer healthy and will not accept any more work. When this is raised while processing a queue item, the item should be returned to the queue along with another retry. """ pass class Worker(object): def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, watchdog_period_seconds=60): self._sched = Scheduler() self._poll_period_seconds = poll_period_seconds self._reservation_seconds = reservation_seconds self._watchdog_period_seconds = watchdog_period_seconds self._stop = Event() self._queue = queue self.current_queue_item = None def process_queue_item(self, job_details): """ Return True if complete, False if it should be retried. """ raise NotImplementedError('Workers must implement run.') def watchdog(self): """ Function that gets run once every watchdog_period_seconds. """ pass def extend_processing(self, seconds_from_now): if self.current_queue_item is not None: self._queue.extend_processing(self.current_queue_item, seconds_from_now) def poll_queue(self): logger.debug('Getting work item from queue.') self.current_queue_item = self._queue.get() while self.current_queue_item: logger.debug('Queue gave us some work: %s' % self.current_queue_item.body) job_details = json.loads(self.current_queue_item.body) try: self.process_queue_item(job_details) self._queue.complete(self.current_queue_item) except JobException: logger.warning('An error occurred processing request: %s', self.current_queue_item.body) self._queue.incomplete(self.current_queue_item) except WorkerUnhealthyException: logger.error('The worker has encountered an error and will not take new jobs.') self._stop.set() self._queue.incomplete(self.current_queue_item, restore_retry=True) finally: self.current_queue_item = None if not self._stop.is_set(): self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) if not self._stop.is_set(): logger.debug('No more work.') def start(self): logger.debug("Scheduling worker.") soon = datetime.now() + timedelta(seconds=.001) self._sched.start() self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds, start_date=soon) self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds) signal.signal(signal.SIGTERM, self.join) signal.signal(signal.SIGINT, self.join) while not self._stop.wait(1): pass logger.debug('Waiting for running tasks to complete.') self._sched.shutdown() logger.debug('Finished.') def join(self, signal_num=None, stack_frame=None): logger.debug('Shutting down worker gracefully.') self._stop.set() # Give back the retry that we took for this queue item so that if it were down to zero # retries it will still be picked up by another worker if self.current_queue_item is not None: self._queue.incomplete(self.current_queue_item, restore_retry=True)