From b4c39e8ec0ce85981e233efb5097f2f2259c888a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 23 Jun 2015 16:46:05 -0400 Subject: [PATCH] Fix ephemeral build manager to ask for watches in index order with no gaps --- buildman/manager/ephemeral.py | 43 +++++++++++++++++++++++------------ test/test_buildman.py | 6 ++--- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 9082c9687..7ea9ea8eb 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -68,28 +68,41 @@ class EphemeralBuilderManager(BaseManager): super(EphemeralBuilderManager, self).__init__(*args, **kwargs) - def _watch_etcd(self, etcd_key, change_callback, recursive=True): + def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True): watch_task_key = (etcd_key, recursive) def callback_wrapper(changed_key_future): + new_index = start_index + etcd_result = None + + if not changed_key_future.cancelled(): + try: + etcd_result = changed_key_future.result() + existing_index = getattr(etcd_result, 'etcd_index', None) + new_index = etcd_result.modifiedIndex + 1 + + logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key, + '*' if recursive else '', existing_index, etcd_result) + + except ReadTimeoutError: + logger.debug('Read-timeout on etcd watch: %s', etcd_key) + + except (ProtocolError, etcd.EtcdException): + logger.exception('Exception on etcd watch: %s', etcd_key) + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): - self._watch_etcd(etcd_key, change_callback) + self._watch_etcd(etcd_key, change_callback, start_index=new_index) - if changed_key_future.cancelled(): - # Due to lack of interest, tomorrow has been cancelled - return - - try: - etcd_result = changed_key_future.result() - except (ReadTimeoutError, ProtocolError, etcd.EtcdException): - return - - change_callback(etcd_result) + if etcd_result: + change_callback(etcd_result) if not self._shutting_down: - watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, + logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key, + '*' if recursive else '', start_index) + + watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index, timeout=ETCD_DISABLE_TIMEOUT) watch_future.add_done_callback(callback_wrapper) - logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') + self._watch_tasks[watch_task_key] = async(watch_future) @coroutine @@ -329,7 +342,7 @@ class EphemeralBuilderManager(BaseManager): job.job_details['build_uuid'], build_component.builder_realm) yield From(build_component.start_build(job)) except (KeyError, etcd.EtcdKeyError): - logger.exception('Builder is asking for more work, but work already completed') + logger.warning('Builder is asking for more work, but work already completed') def build_component_disposed(self, build_component, timed_out): logger.debug('Calling build_component_disposed.') diff --git a/test/test_buildman.py b/test/test_buildman.py index 9e7a0f825..541c807af 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -104,7 +104,7 @@ class TestEphemeral(unittest.TestCase): @coroutine def _setup_job_for_managers(self): # Test that we are watching the realm location before anything else happens - self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0) + self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0, index=None) self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = Mock(spec=BuildComponent) @@ -182,7 +182,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) @@ -201,7 +201,7 @@ class TestEphemeral(unittest.TestCase): test_component = yield From(self._setup_job_for_managers()) # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult)