diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index ed2da908e..80a96d336 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -2,9 +2,11 @@ import logging import etcd import uuid import calendar +import os.path from datetime import datetime, timedelta -from trollius import From, coroutine, Return +from trollius import From, coroutine, Return, async +from concurrent.futures import ThreadPoolExecutor from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor @@ -16,25 +18,11 @@ logger = logging.getLogger(__name__) ETCD_BUILDER_PREFIX = 'building/' - - -def clear_etcd(client): - """ Debugging method used to clear out the section of etcd we are using to track jobs in flight. - """ - try: - building = client.read(ETCD_BUILDER_PREFIX, recursive=True) - for child in building.leaves: - if not child.dir: - logger.warning('Deleting key: %s', child.key) - client.delete(child.key) - except KeyError: - pass +ETCD_EXPIRE_RESULT = 'expire' class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ - shutting_down = False - _executors = { 'popen': PopenExecutor, 'ec2': EC2Executor, @@ -43,7 +31,10 @@ class EphemeralBuilderManager(BaseManager): _etcd_client_klass = etcd.Client def __init__(self, *args, **kwargs): + self._shutting_down = False + self._manager_config = None + self._async_thread_executor = None self._etcd_client = None self._component_to_job = {} @@ -51,8 +42,35 @@ class EphemeralBuilderManager(BaseManager): self._executor = None + self._worker_watch_task = None + super(EphemeralBuilderManager, self).__init__(*args, **kwargs) + def _watch_builders(self): + """ Watch the builders key for expirations. + """ + if not self._shutting_down: + workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True) + workers_future.add_done_callback(self._handle_key_expiration) + logger.debug('Scheduling watch task.') + self._worker_watch_task = async(workers_future) + + def _handle_key_expiration(self, changed_key_future): + """ Handle when a builder expires + """ + if self._worker_watch_task is None or self._worker_watch_task.done(): + self._watch_builders() + + if changed_key_future.cancelled(): + # Due to lack of interest, tomorrow has been cancelled + return + + etcd_result = changed_key_future.result() + if etcd_result.action == ETCD_EXPIRE_RESULT: + # Handle the expiration + logger.debug('Builder expired, clean up the old build node') + async(self._clean_up_old_builder(etcd_result.key, etcd_result._prev_node.value)) + def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config @@ -65,7 +83,11 @@ class EphemeralBuilderManager(BaseManager): etcd_port = self._manager_config.get('ETCD_PORT', 2379) logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) - self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port)) + self._async_thread_executor = ThreadPoolExecutor(self._manager_config.get('ETCD_WORKERS', 5)) + self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port), + executor=self._async_thread_executor) + + self._watch_builders() def setup_time(self): setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) @@ -73,8 +95,17 @@ class EphemeralBuilderManager(BaseManager): return setup_time def shutdown(self): - logger.debug('Calling shutdown.') - raise NotImplementedError + logger.debug('Shutting down worker.') + self._shutting_down = True + + if self._worker_watch_task is not None: + logger.debug('Canceling watch task.') + self._worker_watch_task.cancel() + self._worker_watch_task = None + + if self._async_thread_executor is not None: + logger.debug('Shutting down thread pool executor.') + self._async_thread_executor.shutdown() @coroutine def schedule(self, build_job, loop): @@ -161,9 +192,8 @@ class EphemeralBuilderManager(BaseManager): yield From(self._etcd_client.delete(job_key)) - @staticmethod def _etcd_job_key(build_job): """ Create a key which is used to track a job in etcd. """ - return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid']) + return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid']) diff --git a/test/test_buildman.py b/test/test_buildman.py index 0886b671a..d5a7423e6 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -1,12 +1,15 @@ import unittest import etcd +import os.path +import time -from trollius import coroutine, get_event_loop, From, Future +from trollius import coroutine, get_event_loop, From, Future, sleep from mock import Mock -from functools import partial +from threading import Event from buildman.manager.executor import BuilderExecutor -from buildman.manager.ephemeral import EphemeralBuilderManager, ETCD_BUILDER_PREFIX +from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX, + ETCD_EXPIRE_RESULT) from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent @@ -14,10 +17,6 @@ from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' -import logging -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) - def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) @@ -29,11 +28,17 @@ def async_test(f): class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None + self.etcd_wait_event = Event() self.test_executor = None super(TestEphemeral, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): + def hang_until_event(*args, **kwargs): + time.sleep(.01) # 10ms to simulate network latency + self.etcd_wait_event.wait() + self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') + self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) return self.etcd_client_mock def _create_mock_executor(self, *args, **kwargs): @@ -61,6 +66,7 @@ class TestEphemeral(unittest.TestCase): self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client + self.etcd_wait_event.clear() self.register_component_callback = Mock() self.uniregister_component_callback = Mock() @@ -77,7 +83,13 @@ class TestEphemeral(unittest.TestCase): self.manager.initialize({'EXECUTOR': 'test'}) + self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID) + def tearDown(self): + self.etcd_wait_event.set() + + self.manager.shutdown() + del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass @@ -95,15 +107,53 @@ class TestEphemeral(unittest.TestCase): self.assertTrue(is_scheduled) - job_key = ETCD_BUILDER_PREFIX + mock_job.job_details['build_uuid'] self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) - self.assertEqual(len(self.test_executor.start_builder.call_args_list), 1) - self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], job_key) - self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], job_key) + self.assertEqual(self.test_executor.start_builder.call_count, 1) + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) - self.assertEqual(len(self.register_component_callback.call_args_list), 1) + self.assertEqual(self.register_component_callback.call_count, 1) yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component)) - self.assertEqual(len(self.test_executor.stop_builder.call_args_list), 1) - self.etcd_client_mock.delete.assert_called_once_with(job_key) + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) + + @async_test + def test_expiring_worker(self): + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(sepc=etcd.EtcdResult) + expired_result.action = ETCD_EXPIRE_RESULT + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = {'builder_id': '1234'} + expired_future = Future() + expired_future.set_result(expired_result) + + self.manager._handle_key_expiration(expired_future) + + yield From(sleep(.01)) + + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) + + @async_test + def test_change_worker(self): + # Send a signal to the callback that a worker key has been changed + set_result = Mock(sepc=etcd.EtcdResult) + set_result.action = 'set' + set_result.key = self.mock_job_key + set_future = Future() + set_future.set_result(set_result) + + self.manager._handle_key_expiration(set_future) + + yield From(sleep(.01)) + + self.assertEquals(self.test_executor.stop_builder.call_count, 0) +