This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/worker.py
Joseph Schorr ac0cca2d90 Switch to a unified worker system
- Handles logging
- Handles reporting to Sentry
- Removes old code around serving a web endpoint (unused now)
2015-07-28 17:26:12 -04:00

90 lines
2.5 KiB
Python

import logging
import signal
import sys
import socket
from threading import Event
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
from threading import Thread
from time import sleep
from raven import Client
from app import app
from data.model import db
from functools import wraps
logger = logging.getLogger(__name__)
class Worker(object):
""" Base class for workers which perform some work periodically. """
def __init__(self):
self._sched = BackgroundScheduler()
self._operations = []
self._stop = Event()
self._terminated = Event()
if app.config.get('EXCEPTION_LOG_TYPE', 'FakeSentry') == 'Sentry':
worker_name = '%s:worker-%s' % (socket.gethostname(), self.__class__.__name__)
self._raven_client = Client(app.config.get('SENTRY_DSN', ''), name=worker_name)
def is_healthy(self):
return not self._stop.is_set()
def is_terminated(self):
return self._terminated.is_set()
def ungracefully_terminated(self):
""" Method called when the worker has been terminated in an ungraceful fashion. """
pass
def add_operation(self, operation_func, operation_sec):
@wraps(operation_func)
def _operation_func():
try:
return operation_func()
except Exception:
logger.exception('Operation raised exception')
if self._raven_client:
logger.debug('Logging exception to Sentry')
self._raven_client.captureException()
self._operations.append((_operation_func, operation_sec))
def start(self):
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
logger.debug('Scheduling worker.')
soon = datetime.now() + timedelta(seconds=.001)
self._sched.start()
for operation_func, operation_sec in self._operations:
self._sched.add_job(operation_func, 'interval', seconds=operation_sec,
start_date=soon, max_instances=1)
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
while not self._stop.wait(1):
pass
logger.debug('Waiting for running tasks to complete.')
self._sched.shutdown()
logger.debug('Finished.')
self._terminated.set()
def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set():
sys.exit(1)
else:
logger.debug('Shutting down worker.')
self._stop.set()
if not graceful:
self.ungracefully_terminated()
def join(self):
self.terminate(graceful=True)