Merge pull request #183 from coreos-inc/fixetcdtimeout

Etcd timeout temporary fix
This commit is contained in:
Jake Moshenko 2015-06-25 23:45:10 -04:00
commit d6cb7334d3
2 changed files with 18 additions and 9 deletions

View file

@ -22,7 +22,7 @@ from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
ETCD_DISABLE_TIMEOUT = 0
ETCD_MAX_WATCH_TIMEOUT = 30
EC2_API_TIMEOUT = 20
RETRY_IMMEDIATELY_TIMEOUT = 0
@ -85,10 +85,7 @@ class EphemeralBuilderManager(BaseManager):
'*' if recursive else '', existing_index, etcd_result)
except ReadTimeoutError:
logger.debug('Read-timeout on etcd watch: %s', etcd_key)
except (ProtocolError, etcd.EtcdException):
logger.exception('Exception on etcd watch: %s', etcd_key)
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
except etcd.EtcdEventIndexCleared:
# This happens if etcd2 has moved forward too fast for us to start watching
@ -101,6 +98,18 @@ class EphemeralBuilderManager(BaseManager):
if restarter is not None:
async(restarter())
except etcd.EtcdException as eex:
# TODO(jschorr): This is a quick and dirty hack and should be replaced
# with a proper exception check.
if str(eex.message).find('Read timed out') >= 0:
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
else:
logger.exception('Exception on etcd watch: %s', etcd_key)
except ProtocolError:
logger.exception('Exception on etcd watch: %s', etcd_key)
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
@ -112,7 +121,7 @@ class EphemeralBuilderManager(BaseManager):
'*' if recursive else '', start_index)
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
timeout=ETCD_DISABLE_TIMEOUT)
timeout=ETCD_MAX_WATCH_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
self._watch_tasks[watch_task_key] = async(watch_future)

View file

@ -104,7 +104,7 @@ class TestEphemeral(unittest.TestCase):
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0, index=None)
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
@ -182,7 +182,7 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
@ -201,7 +201,7 @@ class TestEphemeral(unittest.TestCase):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0, index=None)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)