Finish porting the workers over to apscheduler 3.0

This commit is contained in:
Jake Moshenko 2014-08-01 18:38:02 -04:00
parent 09917ff062
commit 0aa6e92b02
3 changed files with 16 additions and 36 deletions

5
app.py
View file

@ -17,7 +17,6 @@ from data.users import UserAuthentication
from util.analytics import Analytics from util.analytics import Analytics
from util.exceptionlog import Sentry from util.exceptionlog import Sentry
from util.queuemetrics import QueueMetrics from util.queuemetrics import QueueMetrics
from util.expiration import Expiration
from data.billing import Billing from data.billing import Billing
from data.buildlogs import BuildLogs from data.buildlogs import BuildLogs
from data.queue import WorkQueue from data.queue import WorkQueue
@ -52,7 +51,10 @@ else:
app.config.update(environ_config) app.config.update(environ_config)
logger.debug('Applying license config from: %s', LICENSE_FILENAME) logger.debug('Applying license config from: %s', LICENSE_FILENAME)
try:
app.config.update(load_license(LICENSE_FILENAME)) app.config.update(load_license(LICENSE_FILENAME))
except IOError:
raise RuntimeError('No license file found, please check your configuration')
if app.config.get('LICENSE_EXPIRATION', datetime.min) < datetime.utcnow(): if app.config.get('LICENSE_EXPIRATION', datetime.min) < datetime.utcnow():
raise RuntimeError('License has expired, please contact support@quay.io') raise RuntimeError('License has expired, please contact support@quay.io')
@ -71,7 +73,6 @@ sentry = Sentry(app)
build_logs = BuildLogs(app) build_logs = BuildLogs(app)
queue_metrics = QueueMetrics(app) queue_metrics = QueueMetrics(app)
authentication = UserAuthentication(app) authentication = UserAuthentication(app)
# expiration = Expiration(app)
userevents = UserEventsBuilderModule(app) userevents = UserEventsBuilderModule(app)
tf = app.config['DB_TRANSACTION_FACTORY'] tf = app.config['DB_TRANSACTION_FACTORY']

View file

@ -16,16 +16,17 @@ class ExpirationScheduler(object):
soon = datetime.now() + timedelta(seconds=1) soon = datetime.now() + timedelta(seconds=1)
if utc_create_notifications_date > datetime.utcnow(): if utc_create_notifications_date > datetime.utcnow():
self._scheduler.add_date_job(model.delete_all_notifications_by_kind, soon, self._scheduler.add_job(model.delete_all_notifications_by_kind, 'date', run_date=soon,
['expiring_license']) args=['expiring_license'])
local_notifications_date = self._utc_to_local(utc_create_notifications_date) local_notifications_date = self._utc_to_local(utc_create_notifications_date)
self._scheduler.add_date_job(self._generate_notifications, local_notifications_date) self._scheduler.add_job(self._generate_notifications, 'date',
run_date=local_notifications_date)
else: else:
self._scheduler.add_date_job(self._generate_notifications, soon) self._scheduler.add_job(self._generate_notifications, 'date', run_date=soon)
local_termination_date = self._utc_to_local(utc_terminate_processes_date) local_termination_date = self._utc_to_local(utc_terminate_processes_date)
self._scheduler.add_date_job(self._terminate, local_termination_date) self._scheduler.add_job(self._terminate, 'date', run_date=local_termination_date)
@staticmethod @staticmethod
def _format_date(date): def _format_date(date):
@ -52,25 +53,3 @@ class ExpirationScheduler(object):
def start(self): def start(self):
self._scheduler.start() self._scheduler.start()
class Expiration(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
expiration = ExpirationScheduler(app.config['LICENSE_EXPIRATION_WARNING'],
app.config['LICENSE_EXPIRATION'])
expiration.start()
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['expiration'] = expiration
return expiration
def __getattr__(self, name):
return getattr(self.state, name, None)

View file

@ -4,7 +4,7 @@ import signal
import sys import sys
from threading import Event, Lock from threading import Event, Lock
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta from datetime import datetime, timedelta
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from threading import Thread from threading import Thread
@ -64,7 +64,7 @@ class WorkerStatusHandler(BaseHTTPRequestHandler):
class Worker(object): class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
watchdog_period_seconds=60): watchdog_period_seconds=60):
self._sched = BlockingScheduler() self._sched = BackgroundScheduler()
self._poll_period_seconds = poll_period_seconds self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds self._reservation_seconds = reservation_seconds
self._watchdog_period_seconds = watchdog_period_seconds self._watchdog_period_seconds = watchdog_period_seconds
@ -166,10 +166,10 @@ class Worker(object):
soon = datetime.now() + timedelta(seconds=.001) soon = datetime.now() + timedelta(seconds=.001)
self._sched.start() self._sched.start()
self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds, self._sched.add_job(self.poll_queue, 'interval', seconds=self._poll_period_seconds,
start_date=soon) start_date=soon)
self._sched.add_interval_job(self.update_queue_metrics, seconds=60, start_date=soon) self._sched.add_job(self.update_queue_metrics, 'interval', seconds=60, start_date=soon)
self._sched.add_interval_job(self.run_watchdog, seconds=self._watchdog_period_seconds) self._sched.add_job(self.run_watchdog, 'interval', seconds=self._watchdog_period_seconds)
signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate) signal.signal(signal.SIGINT, self.terminate)