import logging import json from threading import Event from apscheduler.scheduler import Scheduler logger = logging.getLogger(__name__) class Worker(object): def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300): self._sched = Scheduler() self._poll_period_seconds = poll_period_seconds self._reservation_seconds = reservation_seconds self._stop = Event() self._queue = queue def process_queue_item(self, job_details): """ Return True if complete, False if it should be retried. """ raise NotImplementedError('Workers must implement run.') def poll_queue(self): logger.debug('Getting work item from queue.') item = self._queue.get() while item: logger.debug('Queue gave us some work: %s' % item.body) job_details = json.loads(item.body) if self.process_queue_item(job_details): self._queue.complete(item) else: logger.warning('An error occurred processing request: %s' % item.body) self._queue.incomplete(item) item = self._queue.get(processing_time=self._reservation_seconds) logger.debug('No more work.') def start(self): logger.debug("Scheduling worker.") self._sched.start() self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds) while not self._stop.wait(1): pass def join(self): self._stop.set()