76 lines
2.4 KiB
Python
76 lines
2.4 KiB
Python
import calendar
|
|
import sys
|
|
|
|
from email.utils import formatdate
|
|
from apscheduler.scheduler import Scheduler
|
|
from datetime import datetime, timedelta
|
|
|
|
from data import model
|
|
|
|
|
|
class ExpirationScheduler(object):
|
|
def __init__(self, utc_create_notifications_date, utc_terminate_processes_date):
|
|
self._scheduler = Scheduler()
|
|
self._termination_date = utc_terminate_processes_date
|
|
|
|
soon = datetime.now() + timedelta(seconds=1)
|
|
|
|
if utc_create_notifications_date > datetime.utcnow():
|
|
self._scheduler.add_date_job(model.delete_all_notifications_by_kind, soon,
|
|
['expiring_license'])
|
|
|
|
local_notifications_date = self._utc_to_local(utc_create_notifications_date)
|
|
self._scheduler.add_date_job(self._generate_notifications, local_notifications_date)
|
|
else:
|
|
self._scheduler.add_date_job(self._generate_notifications, soon)
|
|
|
|
local_termination_date = self._utc_to_local(utc_terminate_processes_date)
|
|
self._scheduler.add_date_job(self._terminate, local_termination_date)
|
|
|
|
@staticmethod
|
|
def _format_date(date):
|
|
""" Output an RFC822 date format. """
|
|
if date is None:
|
|
return None
|
|
return formatdate(calendar.timegm(date.utctimetuple()))
|
|
|
|
@staticmethod
|
|
def _utc_to_local(utc_dt):
|
|
# get integer timestamp to avoid precision lost
|
|
timestamp = calendar.timegm(utc_dt.timetuple())
|
|
local_dt = datetime.fromtimestamp(timestamp)
|
|
return local_dt.replace(microsecond=utc_dt.microsecond)
|
|
|
|
def _generate_notifications(self):
|
|
for user in model.get_active_users():
|
|
model.create_unique_notification('expiring_license', user,
|
|
{'expires_at': self._format_date(self._termination_date)})
|
|
|
|
@staticmethod
|
|
def _terminate():
|
|
sys.exit(1)
|
|
|
|
def start(self):
|
|
self._scheduler.start()
|
|
|
|
|
|
class Expiration(object):
|
|
def __init__(self, app=None):
|
|
self.app = app
|
|
if app is not None:
|
|
self.state = self.init_app(app)
|
|
else:
|
|
self.state = None
|
|
|
|
def init_app(self, app):
|
|
expiration = ExpirationScheduler(app.config['LICENSE_EXPIRATION_WARNING'],
|
|
app.config['LICENSE_EXPIRATION'])
|
|
expiration.start()
|
|
|
|
# register extension with app
|
|
app.extensions = getattr(app, 'extensions', {})
|
|
app.extensions['expiration'] = expiration
|
|
return expiration
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.state, name, None)
|