Handle read timeouts from etcd when watching a key.
This commit is contained in:
parent
29bd428817
commit
709e571b78
2 changed files with 19 additions and 1 deletions
|
@ -7,6 +7,7 @@ import os.path
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from trollius import From, coroutine, Return, async
|
from trollius import From, coroutine, Return, async
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from urllib3.exceptions import ReadTimeoutError
|
||||||
|
|
||||||
from buildman.manager.basemanager import BaseManager
|
from buildman.manager.basemanager import BaseManager
|
||||||
from buildman.manager.executor import PopenExecutor, EC2Executor
|
from buildman.manager.executor import PopenExecutor, EC2Executor
|
||||||
|
@ -65,7 +66,11 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
# Due to lack of interest, tomorrow has been cancelled
|
# Due to lack of interest, tomorrow has been cancelled
|
||||||
return
|
return
|
||||||
|
|
||||||
etcd_result = changed_key_future.result()
|
try:
|
||||||
|
etcd_result = changed_key_future.result()
|
||||||
|
except ReadTimeoutError:
|
||||||
|
return
|
||||||
|
|
||||||
if etcd_result.action == ETCD_EXPIRE_RESULT:
|
if etcd_result.action == ETCD_EXPIRE_RESULT:
|
||||||
# Handle the expiration
|
# Handle the expiration
|
||||||
logger.debug('Builder expired, clean up the old build node')
|
logger.debug('Builder expired, clean up the old build node')
|
||||||
|
|
|
@ -6,6 +6,7 @@ import time
|
||||||
from trollius import coroutine, get_event_loop, From, Future, sleep
|
from trollius import coroutine, get_event_loop, From, Future, sleep
|
||||||
from mock import Mock
|
from mock import Mock
|
||||||
from threading import Event
|
from threading import Event
|
||||||
|
from urllib3.exceptions import ReadTimeoutError
|
||||||
|
|
||||||
from buildman.manager.executor import BuilderExecutor
|
from buildman.manager.executor import BuilderExecutor
|
||||||
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
|
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
|
||||||
|
@ -176,3 +177,15 @@ class TestEphemeral(unittest.TestCase):
|
||||||
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
||||||
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
||||||
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
||||||
|
|
||||||
|
@async_test
|
||||||
|
def test_etcd_read_timeout(self):
|
||||||
|
# Send a signal to the callback that a worker key has been changed
|
||||||
|
read_timeout_future = Future()
|
||||||
|
read_timeout_future.set_exception(ReadTimeoutError(None, None, None))
|
||||||
|
|
||||||
|
self.manager._handle_key_expiration(read_timeout_future)
|
||||||
|
|
||||||
|
yield From(sleep(.01))
|
||||||
|
|
||||||
|
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
||||||
|
|
Reference in a new issue