import logging import signal import sys import socket from threading import Event from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta from threading import Thread from time import sleep from raven import Client from app import app from data.model import db from functools import wraps logger = logging.getLogger(__name__) class Worker(object): """ Base class for workers which perform some work periodically. """ def __init__(self): self._sched = BackgroundScheduler() self._operations = [] self._stop = Event() self._terminated = Event() if app.config.get('EXCEPTION_LOG_TYPE', 'FakeSentry') == 'Sentry': worker_name = '%s:worker-%s' % (socket.gethostname(), self.__class__.__name__) self._raven_client = Client(app.config.get('SENTRY_DSN', ''), name=worker_name) def is_healthy(self): return not self._stop.is_set() def is_terminated(self): return self._terminated.is_set() def ungracefully_terminated(self): """ Method called when the worker has been terminated in an ungraceful fashion. """ pass def add_operation(self, operation_func, operation_sec): @wraps(operation_func) def _operation_func(): try: return operation_func() except Exception: logger.exception('Operation raised exception') if self._raven_client: logger.debug('Logging exception to Sentry') self._raven_client.captureException() self._operations.append((_operation_func, operation_sec)) def start(self): logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) logger.debug('Scheduling worker.') soon = datetime.now() + timedelta(seconds=.001) self._sched.start() for operation_func, operation_sec in self._operations: self._sched.add_job(operation_func, 'interval', seconds=operation_sec, start_date=soon, max_instances=1) signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate) while not self._stop.wait(1): pass logger.debug('Waiting for running tasks to complete.') self._sched.shutdown() logger.debug('Finished.') self._terminated.set() def terminate(self, signal_num=None, stack_frame=None, graceful=False): if self._terminated.is_set(): sys.exit(1) else: logger.debug('Shutting down worker.') self._stop.set() if not graceful: self.ungracefully_terminated() def join(self): self.terminate(graceful=True)