Add support for adjusting etcd ttl on job_heartbeat. Switch the heartbeat method to a coroutine.
This commit is contained in:
parent
2b6c2a2a50
commit
34bf92673b
6 changed files with 62 additions and 13 deletions
|
@ -338,7 +338,7 @@ class BuildComponent(BaseComponent):
|
|||
# Mark the build item.
|
||||
current_job = self._current_job
|
||||
if current_job is not None:
|
||||
self.parent_manager.job_heartbeat(current_job)
|
||||
yield trollius.From(self.parent_manager.job_heartbeat(current_job))
|
||||
|
||||
# Check the heartbeat from the worker.
|
||||
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
||||
|
|
|
@ -3,13 +3,15 @@ from trollius import coroutine
|
|||
class BaseManager(object):
|
||||
""" Base for all worker managers. """
|
||||
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
|
||||
job_complete_callback, public_ip_address):
|
||||
job_complete_callback, public_ip_address, heartbeat_period_sec):
|
||||
self.register_component = register_component
|
||||
self.unregister_component = unregister_component
|
||||
self.job_heartbeat_callback = job_heartbeat_callback
|
||||
self.job_complete_callback = job_complete_callback
|
||||
self.public_ip_address = public_ip_address
|
||||
self.heartbeat_period_sec = heartbeat_period_sec
|
||||
|
||||
@coroutine
|
||||
def job_heartbeat(self, build_job):
|
||||
""" Method invoked to tell the manager that a job is still running. This method will be called
|
||||
every few minutes. """
|
||||
|
|
|
@ -83,7 +83,8 @@ class EphemeralBuilderManager(BaseManager):
|
|||
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
||||
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
||||
|
||||
self._async_thread_executor = ThreadPoolExecutor(self._manager_config.get('ETCD_WORKERS', 5))
|
||||
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
||||
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
||||
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port),
|
||||
executor=self._async_thread_executor)
|
||||
|
||||
|
@ -131,14 +132,15 @@ class EphemeralBuilderManager(BaseManager):
|
|||
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
||||
realm = str(uuid.uuid4())
|
||||
token = str(uuid.uuid4())
|
||||
expiration = datetime.utcnow() + timedelta(seconds=self.setup_time())
|
||||
ttl = self.setup_time()
|
||||
expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
||||
|
||||
payload = {
|
||||
'expiration': calendar.timegm(expiration.timetuple()),
|
||||
}
|
||||
|
||||
try:
|
||||
yield From(self._etcd_client.write(job_key, payload, prevExist=False))
|
||||
yield From(self._etcd_client.write(job_key, payload, prevExist=False, ttl=ttl))
|
||||
component = self.register_component(realm, BuildComponent, token=token)
|
||||
self._component_to_job[component] = build_job
|
||||
except KeyError:
|
||||
|
@ -168,11 +170,14 @@ class EphemeralBuilderManager(BaseManager):
|
|||
def build_component_disposed(self, build_component, timed_out):
|
||||
logger.debug('Calling build_component_disposed.')
|
||||
|
||||
# TODO make it so that I don't have to unregister the component if it timed out
|
||||
self.unregister_component(build_component)
|
||||
|
||||
@coroutine
|
||||
def job_completed(self, build_job, job_status, build_component):
|
||||
logger.debug('Calling job_completed with status: %s', job_status)
|
||||
|
||||
# Kill he ephmeral builder
|
||||
# Kill the ephmeral builder
|
||||
self._executor.stop_builder(self._component_to_builder.pop(build_component))
|
||||
|
||||
# Release the lock in etcd
|
||||
|
@ -181,6 +186,24 @@ class EphemeralBuilderManager(BaseManager):
|
|||
|
||||
self.job_complete_callback(build_job, job_status)
|
||||
|
||||
@coroutine
|
||||
def job_heartbeat(self, build_job):
|
||||
# Extend the deadline in etcd
|
||||
job_key = self._etcd_job_key(build_job)
|
||||
build_job_response = yield From(self._etcd_client.read(job_key))
|
||||
|
||||
ttl = self.heartbeat_period_sec * 2
|
||||
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
||||
|
||||
payload = {
|
||||
'expiration': calendar.timegm(new_expiration.timetuple()),
|
||||
'builder_id': build_job_response.value['builder_id'],
|
||||
}
|
||||
|
||||
yield From(self._etcd_client.write(job_key, payload, ttl=ttl))
|
||||
|
||||
self.job_heartbeat_callback(build_job)
|
||||
|
||||
@coroutine
|
||||
def _clean_up_old_builder(self, job_key, job_payload):
|
||||
""" Terminate an old builders once the expiration date has passed.
|
||||
|
@ -197,3 +220,8 @@ class EphemeralBuilderManager(BaseManager):
|
|||
""" Create a key which is used to track a job in etcd.
|
||||
"""
|
||||
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
|
||||
|
||||
def num_workers(self):
|
||||
""" Return the number of workers we're managing locally.
|
||||
"""
|
||||
return len(self._component_to_builder)
|
||||
|
|
|
@ -142,6 +142,7 @@ class EC2Executor(BuilderExecutor):
|
|||
if builder_id not in [si.id for si in stopped_instances]:
|
||||
raise ExecutorException('Unable to stop instance: %s' % builder_id)
|
||||
|
||||
|
||||
class PopenExecutor(BuilderExecutor):
|
||||
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
|
||||
"""
|
||||
|
|
|
@ -24,6 +24,8 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
|||
WEBSOCKET_PORT = 8787
|
||||
CONTROLLER_PORT = 8686
|
||||
|
||||
HEARTBEAT_PERIOD_SEC = 30
|
||||
|
||||
class BuildJobResult(object):
|
||||
""" Build job result enum """
|
||||
INCOMPLETE = 'incomplete'
|
||||
|
@ -52,6 +54,7 @@ class BuilderServer(object):
|
|||
self._job_heartbeat,
|
||||
self._job_complete,
|
||||
manager_public_ip,
|
||||
HEARTBEAT_PERIOD_SEC,
|
||||
)
|
||||
self._lifecycle_manager_config = lifecycle_manager_config
|
||||
|
||||
|
@ -140,7 +143,8 @@ class BuilderServer(object):
|
|||
@trollius.coroutine
|
||||
def _work_checker(self):
|
||||
while self._current_status == 'running':
|
||||
logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers())
|
||||
logger.debug('Checking for more work for %d active workers',
|
||||
self._lifecycle_manager.num_workers())
|
||||
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
|
||||
if job_item is None:
|
||||
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
||||
|
|
|
@ -78,11 +78,13 @@ class TestEphemeral(unittest.TestCase):
|
|||
self.uniregister_component_callback,
|
||||
self.job_heartbeat_callback,
|
||||
self.job_complete_callback,
|
||||
'127.0.0.1'
|
||||
'127.0.0.1',
|
||||
30,
|
||||
)
|
||||
|
||||
self.manager.initialize({'EXECUTOR': 'test'})
|
||||
|
||||
self.mock_job = self._create_build_job()
|
||||
self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID)
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -95,15 +97,13 @@ class TestEphemeral(unittest.TestCase):
|
|||
|
||||
@async_test
|
||||
def test_schedule_and_complete(self):
|
||||
mock_job = self._create_build_job()
|
||||
|
||||
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
||||
test_component = BuildComponent(None)
|
||||
self.register_component_callback.return_value = test_component
|
||||
|
||||
# Ask for a builder to be scheduled
|
||||
loop = get_event_loop()
|
||||
is_scheduled = yield From(self.manager.schedule(mock_job, loop))
|
||||
is_scheduled = yield From(self.manager.schedule(self.mock_job, loop))
|
||||
|
||||
self.assertTrue(is_scheduled)
|
||||
|
||||
|
@ -114,7 +114,7 @@ class TestEphemeral(unittest.TestCase):
|
|||
|
||||
self.assertEqual(self.register_component_callback.call_count, 1)
|
||||
|
||||
yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component))
|
||||
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
|
||||
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
|
||||
|
@ -125,7 +125,7 @@ class TestEphemeral(unittest.TestCase):
|
|||
self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
|
||||
|
||||
# Send a signal to the callback that a worker has expired
|
||||
expired_result = Mock(sepc=etcd.EtcdResult)
|
||||
expired_result = Mock(spec=etcd.EtcdResult)
|
||||
expired_result.action = ETCD_EXPIRE_RESULT
|
||||
expired_result.key = self.mock_job_key
|
||||
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
||||
|
@ -157,3 +157,17 @@ class TestEphemeral(unittest.TestCase):
|
|||
|
||||
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
||||
|
||||
@async_test
|
||||
def test_heartbeat_response(self):
|
||||
builder_result = Mock(spec=etcd.EtcdResult)
|
||||
builder_result.value = {'builder_id': '123', 'expiration': '123'}
|
||||
self.etcd_client_mock.read = Mock(return_value=builder_result)
|
||||
|
||||
yield From(self.manager.job_heartbeat(self.mock_job))
|
||||
|
||||
# Wait for threads to complete
|
||||
yield From(sleep(.01))
|
||||
|
||||
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
||||
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
||||
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
||||
|
|
Reference in a new issue