From 544fa40a5f7617781163a36bac19eef3bcc0b4ba Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 24 Nov 2015 16:18:45 -0500 Subject: [PATCH] Add a base class for a global worker that locks via Redis --- requirements-nover.txt | 3 ++- requirements.txt | 1 + workers/globalworkerbase.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 workers/globalworkerbase.py diff --git a/requirements-nover.txt b/requirements-nover.txt index f20af7f67..1f1450103 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -56,4 +56,5 @@ toposort pyjwkest rfc3987 jsonpath-rw -bintrees \ No newline at end of file +bintrees +redlock \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 82b47ec1d..6cc5ca7a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -73,6 +73,7 @@ pytz==2015.4 PyYAML==3.11 raven==5.3.1 redis==2.10.3 +redlock=1.1.0 reportlab==2.7 requests==2.7.0 requests-oauthlib==0.5.0 diff --git a/workers/globalworkerbase.py b/workers/globalworkerbase.py new file mode 100644 index 000000000..74cc2a905 --- /dev/null +++ b/workers/globalworkerbase.py @@ -0,0 +1,33 @@ +import logging + +from workers.worker import Worker +from redlock import RedLock, RedLockError + +logger = logging.getLogger(__name__) + +class GlobalWorker(Worker): + def __init__(self, app, sleep_period_seconds=3600, lock_ttl=600): + super(GlobalWorker, self).__init__() + + worker_name = self.__class__.__name__ + + self._redis_info = app.config['USER_EVENTS_REDIS'] + self._lock_name = '%s.global_lock' % (worker_name) + self._lock_ttl = lock_ttl + + self.add_operation(self._prepare_global_work, sleep_period_seconds) + + def _prepare_global_work(self): + try: + logger.debug('Acquiring lock %s', self._lock_name) + with RedLock(self._lock_name, connection_details=[self._redis_info], + ttl=self._lock_ttl): + logger.debug('Acquired lock %s, performing work', self._lock_name) + self.perform_global_work() + + logger.debug('Work complete, releasing lock %s', self._lock_name) + except RedLockError: + logger.debug('Could not acquire lock %s, going to sleep', self._lock_name) + + def perform_global_work(self): + raise NotImplementedError('Workers must implement perform_global_work.')