This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/worker.py

101 lines
2.8 KiB
Python
Raw Normal View History

import logging
import signal
import sys
import socket
from threading import Event
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
from raven import Client
from app import app
from data.database import UseThenDisconnect
from functools import wraps
logger = logging.getLogger(__name__)
class Worker(object):
""" Base class for workers which perform some work periodically. """
def __init__(self):
self._sched = BackgroundScheduler()
self._operations = []
self._stop = Event()
self._terminated = Event()
self._raven_client = None
if app.config.get('EXCEPTION_LOG_TYPE', 'FakeSentry') == 'Sentry':
worker_name = '%s:worker-%s' % (socket.gethostname(), self.__class__.__name__)
self._raven_client = Client(app.config.get('SENTRY_DSN', ''), name=worker_name)
def is_healthy(self):
return not self._stop.is_set()
def is_terminated(self):
return self._terminated.is_set()
def ungracefully_terminated(self):
""" Method called when the worker has been terminated in an ungraceful fashion. """
pass
def add_operation(self, operation_func, operation_sec):
@wraps(operation_func)
def _operation_func():
try:
with UseThenDisconnect(app.config):
return operation_func()
except Exception:
logger.exception('Operation raised exception')
if self._raven_client:
logger.debug('Logging exception to Sentry')
self._raven_client.captureException()
self._operations.append((_operation_func, operation_sec))
def _setup_and_wait_for_shutdown(self):
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
while not self._stop.wait(1):
pass
def start(self):
2015-10-27 21:38:48 +00:00
logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False)
if not app.config.get('SETUP_COMPLETE', False):
logger.info('Product setup is not yet complete; skipping worker startup')
self._setup_and_wait_for_shutdown()
return
logger.debug('Scheduling worker.')
soon = datetime.now() + timedelta(seconds=.001)
self._sched.start()
for operation_func, operation_sec in self._operations:
self._sched.add_job(operation_func, 'interval', seconds=operation_sec,
start_date=soon, max_instances=1)
self._setup_and_wait_for_shutdown()
logger.debug('Waiting for running tasks to complete.')
self._sched.shutdown()
logger.debug('Finished.')
self._terminated.set()
def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set():
sys.exit(1)
else:
logger.debug('Shutting down worker.')
self._stop.set()
if not graceful:
self.ungracefully_terminated()
def join(self):
self.terminate(graceful=True)