Abstract out the concept of a restart function

This commit is contained in:
Joseph Schorr 2015-06-25 21:36:58 -04:00
parent 52fa9aad5b
commit 9f5f71398c

View file

@ -68,7 +68,8 @@ class EphemeralBuilderManager(BaseManager):
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True):
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
restarter=None):
watch_task_key = (etcd_key, recursive)
def callback_wrapper(changed_key_future):
new_index = start_index
@ -97,10 +98,11 @@ class EphemeralBuilderManager(BaseManager):
# TODO: Remove this hack once Etcd is fixed.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
async(self._register_existing_realms())
if restarter is not None:
async(restarter())
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback, start_index=new_index)
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
if etcd_result:
change_callback(etcd_result)
@ -239,7 +241,8 @@ class EphemeralBuilderManager(BaseManager):
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change)
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')