diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index a1b7809c2..0d1c482e2 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -22,7 +22,7 @@ from util.morecollections import AttrDict logger = logging.getLogger(__name__) -ETCD_DISABLE_TIMEOUT = 0 +ETCD_MAX_WATCH_TIMEOUT = 30 EC2_API_TIMEOUT = 20 RETRY_IMMEDIATELY_TIMEOUT = 0 @@ -85,10 +85,7 @@ class EphemeralBuilderManager(BaseManager): '*' if recursive else '', existing_index, etcd_result) except ReadTimeoutError: - logger.debug('Read-timeout on etcd watch: %s', etcd_key) - - except (ProtocolError, etcd.EtcdException): - logger.exception('Exception on etcd watch: %s', etcd_key) + logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key) except etcd.EtcdEventIndexCleared: # This happens if etcd2 has moved forward too fast for us to start watching @@ -101,6 +98,18 @@ class EphemeralBuilderManager(BaseManager): if restarter is not None: async(restarter()) + except etcd.EtcdException as eex: + # TODO(jschorr): This is a quick and dirty hack and should be replaced + # with a proper exception check. + if str(eex.message).find('Read timed out') >= 0: + logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key) + else: + logger.exception('Exception on etcd watch: %s', etcd_key) + + except ProtocolError: + logger.exception('Exception on etcd watch: %s', etcd_key) + + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter) @@ -112,7 +121,7 @@ class EphemeralBuilderManager(BaseManager): '*' if recursive else '', start_index) watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index, - timeout=ETCD_DISABLE_TIMEOUT) + timeout=ETCD_MAX_WATCH_TIMEOUT) watch_future.add_done_callback(callback_wrapper) self._watch_tasks[watch_task_key] = async(watch_future) diff --git a/test/test_buildman.py b/test/test_buildman.py index 541c807af..b58eddb45 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -104,7 +104,7 @@ class TestEphemeral(unittest.TestCase): @coroutine def _setup_job_for_managers(self): # Test that we are watching the realm location before anything else happens - self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0, index=None) + self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None) self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = Mock(spec=BuildComponent) @@ -182,7 +182,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) @@ -201,7 +201,7 @@ class TestEphemeral(unittest.TestCase): test_component = yield From(self._setup_job_for_managers()) # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult)