Merge pull request #168 from coreos-inc/etcdindex
Fix ephemeral build manager to ask for watches in index order with no gaps
This commit is contained in:
commit
2ade08468d
2 changed files with 31 additions and 18 deletions
|
@ -68,28 +68,41 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
|
|
||||||
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def _watch_etcd(self, etcd_key, change_callback, recursive=True):
|
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True):
|
||||||
watch_task_key = (etcd_key, recursive)
|
watch_task_key = (etcd_key, recursive)
|
||||||
def callback_wrapper(changed_key_future):
|
def callback_wrapper(changed_key_future):
|
||||||
|
new_index = start_index
|
||||||
|
etcd_result = None
|
||||||
|
|
||||||
|
if not changed_key_future.cancelled():
|
||||||
|
try:
|
||||||
|
etcd_result = changed_key_future.result()
|
||||||
|
existing_index = getattr(etcd_result, 'etcd_index', None)
|
||||||
|
new_index = etcd_result.modifiedIndex + 1
|
||||||
|
|
||||||
|
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
|
||||||
|
'*' if recursive else '', existing_index, etcd_result)
|
||||||
|
|
||||||
|
except ReadTimeoutError:
|
||||||
|
logger.debug('Read-timeout on etcd watch: %s', etcd_key)
|
||||||
|
|
||||||
|
except (ProtocolError, etcd.EtcdException):
|
||||||
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
||||||
|
|
||||||
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
||||||
self._watch_etcd(etcd_key, change_callback)
|
self._watch_etcd(etcd_key, change_callback, start_index=new_index)
|
||||||
|
|
||||||
if changed_key_future.cancelled():
|
if etcd_result:
|
||||||
# Due to lack of interest, tomorrow has been cancelled
|
change_callback(etcd_result)
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
etcd_result = changed_key_future.result()
|
|
||||||
except (ReadTimeoutError, ProtocolError, etcd.EtcdException):
|
|
||||||
return
|
|
||||||
|
|
||||||
change_callback(etcd_result)
|
|
||||||
|
|
||||||
if not self._shutting_down:
|
if not self._shutting_down:
|
||||||
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive,
|
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
|
||||||
|
'*' if recursive else '', start_index)
|
||||||
|
|
||||||
|
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
|
||||||
timeout=ETCD_DISABLE_TIMEOUT)
|
timeout=ETCD_DISABLE_TIMEOUT)
|
||||||
watch_future.add_done_callback(callback_wrapper)
|
watch_future.add_done_callback(callback_wrapper)
|
||||||
logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '')
|
|
||||||
self._watch_tasks[watch_task_key] = async(watch_future)
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@ -329,7 +342,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
job.job_details['build_uuid'], build_component.builder_realm)
|
job.job_details['build_uuid'], build_component.builder_realm)
|
||||||
yield From(build_component.start_build(job))
|
yield From(build_component.start_build(job))
|
||||||
except (KeyError, etcd.EtcdKeyError):
|
except (KeyError, etcd.EtcdKeyError):
|
||||||
logger.exception('Builder is asking for more work, but work already completed')
|
logger.warning('Builder is asking for more work, but work already completed')
|
||||||
|
|
||||||
def build_component_disposed(self, build_component, timed_out):
|
def build_component_disposed(self, build_component, timed_out):
|
||||||
logger.debug('Calling build_component_disposed.')
|
logger.debug('Calling build_component_disposed.')
|
||||||
|
|
|
@ -104,7 +104,7 @@ class TestEphemeral(unittest.TestCase):
|
||||||
@coroutine
|
@coroutine
|
||||||
def _setup_job_for_managers(self):
|
def _setup_job_for_managers(self):
|
||||||
# Test that we are watching the realm location before anything else happens
|
# Test that we are watching the realm location before anything else happens
|
||||||
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0)
|
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0, index=None)
|
||||||
|
|
||||||
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
||||||
test_component = Mock(spec=BuildComponent)
|
test_component = Mock(spec=BuildComponent)
|
||||||
|
@ -182,7 +182,7 @@ class TestEphemeral(unittest.TestCase):
|
||||||
@async_test
|
@async_test
|
||||||
def test_expiring_worker(self):
|
def test_expiring_worker(self):
|
||||||
# Test that we are watching before anything else happens
|
# Test that we are watching before anything else happens
|
||||||
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0)
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None)
|
||||||
|
|
||||||
# Send a signal to the callback that a worker has expired
|
# Send a signal to the callback that a worker has expired
|
||||||
expired_result = Mock(spec=etcd.EtcdResult)
|
expired_result = Mock(spec=etcd.EtcdResult)
|
||||||
|
@ -201,7 +201,7 @@ class TestEphemeral(unittest.TestCase):
|
||||||
test_component = yield From(self._setup_job_for_managers())
|
test_component = yield From(self._setup_job_for_managers())
|
||||||
|
|
||||||
# Test that we are watching before anything else happens
|
# Test that we are watching before anything else happens
|
||||||
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0)
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None)
|
||||||
|
|
||||||
# Send a signal to the callback that a worker has expired
|
# Send a signal to the callback that a worker has expired
|
||||||
expired_result = Mock(spec=etcd.EtcdResult)
|
expired_result = Mock(spec=etcd.EtcdResult)
|
||||||
|
|
Reference in a new issue