Register existing builders to watch their expirations.

This commit is contained in:
Jake Moshenko 2015-01-05 11:21:36 -05:00
parent a9839021af
commit b33ee1a474
2 changed files with 24 additions and 20 deletions

View file

@ -93,18 +93,16 @@ class EphemeralBuilderManager(BaseManager):
# Handle the expiration # Handle the expiration
logger.debug('Builder expired, clean up the old build node') logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value) job_metadata = json.loads(etcd_result._prev_node.value)
async(self._clean_up_old_builder(etcd_result.key, job_metadata))
if 'builder_id' in job_metadata:
logger.info('Terminating expired build node.')
async(self._executor.stop_builder(job_metadata['builder_id']))
def _handle_realm_change(self, etcd_result): def _handle_realm_change(self, etcd_result):
if etcd_result.action == EtcdAction.CREATE: if etcd_result.action == EtcdAction.CREATE:
# We must listen on the realm created by ourselves or another worker # We must listen on the realm created by ourselves or another worker
realm_spec = json.loads(etcd_result.value) realm_spec = json.loads(etcd_result.value)
component = self.register_component(realm_spec['realm'], BuildComponent, self._register_realm(realm_spec)
token=realm_spec['token'])
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE: elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
# We must stop listening for new connections on the specified realm, if we did not get the # We must stop listening for new connections on the specified realm, if we did not get the
@ -122,6 +120,22 @@ class EphemeralBuilderManager(BaseManager):
else: else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
def _register_realm(self, realm_spec):
logger.debug('Registering realm with manager: %s', realm_spec['realm'])
component = self.register_component(realm_spec['realm'], BuildComponent,
token=realm_spec['token'])
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
@coroutine
def _register_existing_realms(self):
all_realms = yield From(self._etcd_client.read(ETCD_REALM_PREFIX, recursive=True))
for realm in all_realms.children:
if not realm.dir:
self._register_realm(json.loads(realm.value))
def initialize(self, manager_config): def initialize(self, manager_config):
logger.debug('Calling initialize') logger.debug('Calling initialize')
self._manager_config = manager_config self._manager_config = manager_config
@ -142,6 +156,9 @@ class EphemeralBuilderManager(BaseManager):
self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration) self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration)
self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change) self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def setup_time(self): def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
logger.debug('Returning setup_time: %s', setup_time) logger.debug('Returning setup_time: %s', setup_time)
@ -282,17 +299,6 @@ class EphemeralBuilderManager(BaseManager):
self.job_heartbeat_callback(build_job) self.job_heartbeat_callback(build_job)
@coroutine
def _clean_up_old_builder(self, job_key, job_payload):
""" Terminate an old builders once the expiration date has passed.
"""
logger.debug('Cleaning up the old builder for job: %s', job_key)
if 'builder_id' in job_payload:
logger.info('Terminating expired build node.')
yield From(self._executor.stop_builder(job_payload['builder_id']))
yield From(self._etcd_client.delete(job_key))
@staticmethod @staticmethod
def _etcd_job_key(build_job): def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd. """ Create a key which is used to track a job in etcd.

View file

@ -199,8 +199,6 @@ class TestEphemeral(unittest.TestCase):
self.test_executor.stop_builder.assert_called_once_with('1234') self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test @async_test
def test_change_worker(self): def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed # Send a signal to the callback that a worker key has been changed