Add a base class for a global worker that locks via Redis
This commit is contained in:
parent
c279d904bd
commit
544fa40a5f
3 changed files with 36 additions and 1 deletions
|
@ -56,4 +56,5 @@ toposort
|
||||||
pyjwkest
|
pyjwkest
|
||||||
rfc3987
|
rfc3987
|
||||||
jsonpath-rw
|
jsonpath-rw
|
||||||
bintrees
|
bintrees
|
||||||
|
redlock
|
|
@ -73,6 +73,7 @@ pytz==2015.4
|
||||||
PyYAML==3.11
|
PyYAML==3.11
|
||||||
raven==5.3.1
|
raven==5.3.1
|
||||||
redis==2.10.3
|
redis==2.10.3
|
||||||
|
redlock=1.1.0
|
||||||
reportlab==2.7
|
reportlab==2.7
|
||||||
requests==2.7.0
|
requests==2.7.0
|
||||||
requests-oauthlib==0.5.0
|
requests-oauthlib==0.5.0
|
||||||
|
|
33
workers/globalworkerbase.py
Normal file
33
workers/globalworkerbase.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from workers.worker import Worker
|
||||||
|
from redlock import RedLock, RedLockError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class GlobalWorker(Worker):
|
||||||
|
def __init__(self, app, sleep_period_seconds=3600, lock_ttl=600):
|
||||||
|
super(GlobalWorker, self).__init__()
|
||||||
|
|
||||||
|
worker_name = self.__class__.__name__
|
||||||
|
|
||||||
|
self._redis_info = app.config['USER_EVENTS_REDIS']
|
||||||
|
self._lock_name = '%s.global_lock' % (worker_name)
|
||||||
|
self._lock_ttl = lock_ttl
|
||||||
|
|
||||||
|
self.add_operation(self._prepare_global_work, sleep_period_seconds)
|
||||||
|
|
||||||
|
def _prepare_global_work(self):
|
||||||
|
try:
|
||||||
|
logger.debug('Acquiring lock %s', self._lock_name)
|
||||||
|
with RedLock(self._lock_name, connection_details=[self._redis_info],
|
||||||
|
ttl=self._lock_ttl):
|
||||||
|
logger.debug('Acquired lock %s, performing work', self._lock_name)
|
||||||
|
self.perform_global_work()
|
||||||
|
|
||||||
|
logger.debug('Work complete, releasing lock %s', self._lock_name)
|
||||||
|
except RedLockError:
|
||||||
|
logger.debug('Could not acquire lock %s, going to sleep', self._lock_name)
|
||||||
|
|
||||||
|
def perform_global_work(self):
|
||||||
|
raise NotImplementedError('Workers must implement perform_global_work.')
|
Reference in a new issue