Merge pull request #981 from coreos-inc/globalworker
Add a base class for a global worker that locks via Redis
This commit is contained in:
commit
11cd3afba0
3 changed files with 36 additions and 1 deletions
|
@ -56,4 +56,5 @@ toposort
|
|||
pyjwkest
|
||||
rfc3987
|
||||
jsonpath-rw
|
||||
bintrees
|
||||
bintrees
|
||||
redlock
|
|
@ -73,6 +73,7 @@ pytz==2015.4
|
|||
PyYAML==3.11
|
||||
raven==5.3.1
|
||||
redis==2.10.3
|
||||
redlock=1.1.0
|
||||
reportlab==2.7
|
||||
requests==2.7.0
|
||||
requests-oauthlib==0.5.0
|
||||
|
|
33
workers/globalworkerbase.py
Normal file
33
workers/globalworkerbase.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import logging
|
||||
|
||||
from workers.worker import Worker
|
||||
from redlock import RedLock, RedLockError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GlobalWorker(Worker):
|
||||
def __init__(self, app, sleep_period_seconds=3600, lock_ttl=600):
|
||||
super(GlobalWorker, self).__init__()
|
||||
|
||||
worker_name = self.__class__.__name__
|
||||
|
||||
self._redis_info = app.config['USER_EVENTS_REDIS']
|
||||
self._lock_name = '%s.global_lock' % (worker_name)
|
||||
self._lock_ttl = lock_ttl
|
||||
|
||||
self.add_operation(self._prepare_global_work, sleep_period_seconds)
|
||||
|
||||
def _prepare_global_work(self):
|
||||
try:
|
||||
logger.debug('Acquiring lock %s', self._lock_name)
|
||||
with RedLock(self._lock_name, connection_details=[self._redis_info],
|
||||
ttl=self._lock_ttl):
|
||||
logger.debug('Acquired lock %s, performing work', self._lock_name)
|
||||
self.perform_global_work()
|
||||
|
||||
logger.debug('Work complete, releasing lock %s', self._lock_name)
|
||||
except RedLockError:
|
||||
logger.debug('Could not acquire lock %s, going to sleep', self._lock_name)
|
||||
|
||||
def perform_global_work(self):
|
||||
raise NotImplementedError('Workers must implement perform_global_work.')
|
Reference in a new issue