72 lines
2.1 KiB
Python
72 lines
2.1 KiB
Python
import logging
|
|
import json
|
|
import signal
|
|
|
|
from threading import Event
|
|
from apscheduler.scheduler import Scheduler
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Worker(object):
|
|
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
|
|
watchdog_period_seconds=60):
|
|
self._sched = Scheduler()
|
|
self._poll_period_seconds = poll_period_seconds
|
|
self._reservation_seconds = reservation_seconds
|
|
self._watchdog_period_seconds = watchdog_period_seconds
|
|
self._stop = Event()
|
|
self._queue = queue
|
|
|
|
def process_queue_item(self, job_details):
|
|
""" Return True if complete, False if it should be retried. """
|
|
raise NotImplementedError('Workers must implement run.')
|
|
|
|
def watchdog(self):
|
|
""" Function that gets run once every watchdog_period_seconds. """
|
|
pass
|
|
|
|
def poll_queue(self):
|
|
logger.debug('Getting work item from queue.')
|
|
|
|
item = self._queue.get()
|
|
while item:
|
|
logger.debug('Queue gave us some work: %s' % item.body)
|
|
|
|
job_details = json.loads(item.body)
|
|
|
|
if self.process_queue_item(job_details):
|
|
self._queue.complete(item)
|
|
else:
|
|
logger.warning('An error occurred processing request: %s' % item.body)
|
|
self._queue.incomplete(item)
|
|
|
|
item = self._queue.get(processing_time=self._reservation_seconds)
|
|
|
|
logger.debug('No more work.')
|
|
|
|
def start(self):
|
|
logger.debug("Scheduling worker.")
|
|
|
|
soon = datetime.now() + timedelta(seconds=.001)
|
|
|
|
self._sched.start()
|
|
self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds,
|
|
start_date=soon)
|
|
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
|
|
|
|
signal.signal(signal.SIGTERM, self.join)
|
|
signal.signal(signal.SIGINT, self.join)
|
|
|
|
while not self._stop.wait(1):
|
|
pass
|
|
|
|
logger.debug('Waiting for running tasks to complete.')
|
|
self._sched.shutdown()
|
|
logger.debug('Finished.')
|
|
|
|
def join(self, signal_num=None, stack_frame=None):
|
|
logger.debug('Shutting down worker gracefully.')
|
|
self._stop.set()
|