diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 902da86f0..9a17b38fe 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -15,6 +15,7 @@ from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent from buildman.jobutil.buildjob import BuildJob from buildman.asyncutil import AsyncWrapper +from buildman.server import BuildJobResult from util.morecollections import AttrDict @@ -91,6 +92,7 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') self._watch_tasks[watch_task_key] = async(watch_future) + @coroutine def _handle_builder_expiration(self, etcd_result): if etcd_result is None: return @@ -101,8 +103,23 @@ class EphemeralBuilderManager(BaseManager): job_metadata = json.loads(etcd_result._prev_node.value) if 'builder_id' in job_metadata: - logger.info('Terminating expired build node.') - async(self._executor.stop_builder(job_metadata['builder_id'])) + builder_id = job_metadata['builder_id'] + + try: + lock_key = self._etcd_lock_key(builder_id) + yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time())) + except (KeyError, etcd.EtcdKeyError): + logger.debug('Somebody else is cleaning up the build node: %s', builder_id) + return + + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build node failed to successfully boot: %s', builder_id) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + logger.info('Terminating expired build node: %s', builder_id) + yield From(self._executor.stop_builder(builder_id)) + def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -182,6 +199,8 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) + self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') + # Load components for all realms currently known to the cluster async(self._register_existing_realms()) @@ -241,6 +260,8 @@ class EphemeralBuilderManager(BaseManager): 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), 'nonce': nonce, + 'had_heartbeat': False, + 'job_queue_item': build_job.job_item, } lock_payload = json.dumps(payload) @@ -342,7 +363,9 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(new_expiration.timetuple()), 'builder_id': build_job_metadata['builder_id'], + 'job_queue_item': build_job.job_item, 'max_expiration': build_job_metadata['max_expiration'], + 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) @@ -354,6 +377,11 @@ class EphemeralBuilderManager(BaseManager): """ return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) + def _etcd_lock_key(self, unique_lock_id): + """ Create a key which is used to create a temporary lock in etcd. + """ + return os.path.join(self._etcd_lock_prefix, unique_lock_id) + def _etcd_realm_key(self, realm): """ Create a key which is used to track an incoming connection on a realm. """ diff --git a/test/test_buildman.py b/test/test_buildman.py index 543c5fc5d..9e7a0f825 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -5,7 +5,7 @@ import time import json from trollius import coroutine, get_event_loop, From, Future, sleep, Return -from mock import Mock +from mock import Mock, ANY from threading import Event from urllib3.exceptions import ReadTimeoutError @@ -191,13 +191,36 @@ class TestEphemeral(unittest.TestCase): expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) - self.manager._handle_builder_expiration(expired_result) - - yield From(sleep(.01)) + yield From(self.manager._handle_builder_expiration(expired_result)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) + @async_test + def test_builder_never_starts(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'builder_id': '1234', + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_builder_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) + @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed @@ -233,3 +256,4 @@ class TestEphemeral(unittest.TestCase): if __name__ == '__main__': unittest.main() +