From b33ee1a474010d473d5900dc7d6ff86274d21fce Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 5 Jan 2015 11:21:36 -0500 Subject: [PATCH] Register existing builders to watch their expirations. --- buildman/manager/ephemeral.py | 42 ++++++++++++++++++++--------------- test/test_buildman.py | 2 -- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 9ab10fd15..701465c3f 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -93,18 +93,16 @@ class EphemeralBuilderManager(BaseManager): # Handle the expiration logger.debug('Builder expired, clean up the old build node') job_metadata = json.loads(etcd_result._prev_node.value) - async(self._clean_up_old_builder(etcd_result.key, job_metadata)) + + if 'builder_id' in job_metadata: + logger.info('Terminating expired build node.') + async(self._executor.stop_builder(job_metadata['builder_id'])) def _handle_realm_change(self, etcd_result): if etcd_result.action == EtcdAction.CREATE: # We must listen on the realm created by ourselves or another worker realm_spec = json.loads(etcd_result.value) - component = self.register_component(realm_spec['realm'], BuildComponent, - token=realm_spec['token']) - build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) - self._component_to_job[component] = build_job - self._component_to_builder[component] = realm_spec['builder_id'] - self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + self._register_realm(realm_spec) elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE: # We must stop listening for new connections on the specified realm, if we did not get the @@ -122,6 +120,22 @@ class EphemeralBuilderManager(BaseManager): else: logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) + def _register_realm(self, realm_spec): + logger.debug('Registering realm with manager: %s', realm_spec['realm']) + component = self.register_component(realm_spec['realm'], BuildComponent, + token=realm_spec['token']) + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) + self._component_to_job[component] = build_job + self._component_to_builder[component] = realm_spec['builder_id'] + self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + + @coroutine + def _register_existing_realms(self): + all_realms = yield From(self._etcd_client.read(ETCD_REALM_PREFIX, recursive=True)) + for realm in all_realms.children: + if not realm.dir: + self._register_realm(json.loads(realm.value)) + def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config @@ -142,6 +156,9 @@ class EphemeralBuilderManager(BaseManager): self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration) self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change) + # Load components for all realms currently known to the cluster + async(self._register_existing_realms()) + def setup_time(self): setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) logger.debug('Returning setup_time: %s', setup_time) @@ -282,17 +299,6 @@ class EphemeralBuilderManager(BaseManager): self.job_heartbeat_callback(build_job) - @coroutine - def _clean_up_old_builder(self, job_key, job_payload): - """ Terminate an old builders once the expiration date has passed. - """ - logger.debug('Cleaning up the old builder for job: %s', job_key) - if 'builder_id' in job_payload: - logger.info('Terminating expired build node.') - yield From(self._executor.stop_builder(job_payload['builder_id'])) - - yield From(self._etcd_client.delete(job_key)) - @staticmethod def _etcd_job_key(build_job): """ Create a key which is used to track a job in etcd. diff --git a/test/test_buildman.py b/test/test_buildman.py index a9029c22a..89658f65d 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -199,8 +199,6 @@ class TestEphemeral(unittest.TestCase): self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) - self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) - @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed