diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index b59838f3a..5db77f662 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -150,43 +150,51 @@ class EphemeralBuilderManager(BaseManager): self._watch_tasks[watch_task_key] = async(watch_future) @coroutine - def _handle_job_expiration_or_delete(self, etcd_result): + def _handle_job_change(self, etcd_result): """ Handler invoked whenever a job expires or is deleted in etcd. """ if etcd_result is None: raise Return() - # Handle the expiration/deletion - job_metadata = json.loads(etcd_result._prev_node.value) - build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) - logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid) - - # Pop the build info. - build_info = self._build_uuid_to_info.get(build_job.build_uuid, None) - if build_info is None: - logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager', - etcd_result.action, build_job.build_uuid, job_metadata) + if etcd_result.action == EtcdAction.CREATE: raise Return() - # If the etcd action was not an expiration, then it was already deleted and the execution - # shutdown. - if etcd_result.action != EtcdAction.EXPIRE: - # Build information will no longer be needed; pop it off. - self._build_uuid_to_info.pop(build_job.build_uuid, None) - raise Return() + elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE): + # Handle the expiration/deletion + job_metadata = json.loads(etcd_result._prev_node.value) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid) - execution_id = build_info.execution_id + # Pop the build info. + build_info = self._build_uuid_to_info.get(build_job.build_uuid, None) + if build_info is None: + logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager', + etcd_result.action, build_job.build_uuid, job_metadata) + raise Return() - # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark - # the job as incomplete here. - if not job_metadata.get('had_heartbeat', True): - logger.warning('Build executor failed to successfully boot with execution id %s', - execution_id) - self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + # If the etcd action was not an expiration, then it was already deleted by some manager and + # the execution was therefore already shutdown. + if etcd_result.action != EtcdAction.EXPIRE: + # Build information will no longer be needed; pop it off. + self._build_uuid_to_info.pop(build_job.build_uuid, None) + raise Return() + + execution_id = build_info.execution_id + + # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark + # the job as incomplete here. + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build executor failed to successfully boot with execution id %s', + execution_id) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + # Finally, we terminate the build execution for the job. + logger.info('Terminating expired build executor for job %s with execution id %s', + build_job.build_uuid, execution_id) + yield From(self.kill_builder_executor(build_job.build_uuid)) + + else: + logger.warning('Unexpected action (%s) on job key: %s', etcd_result.action, etcd_result.key) - # Finally, we terminate the build execution for the job. - logger.info('Terminating expired build executor for job %s with execution id %s', - build_job.build_uuid, execution_id) - yield From(self.kill_builder_executor(build_job.build_uuid)) @coroutine def _handle_realm_change(self, etcd_result): @@ -198,7 +206,7 @@ class EphemeralBuilderManager(BaseManager): realm_spec = json.loads(etcd_result.value) self._register_realm(realm_spec) - elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE: + elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE): # We must stop listening for new connections on the specified realm, if we did not get the # connection realm_spec = json.loads(etcd_result._prev_node.value) @@ -326,7 +334,7 @@ class EphemeralBuilderManager(BaseManager): ) self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/') - self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete) + self._watch_etcd(self._etcd_job_prefix, self._handle_job_change) self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, diff --git a/test/test_buildman.py b/test/test_buildman.py index 7015ee58d..77385015a 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -264,12 +264,32 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + yield From(self.manager._handle_job_change(expired_result)) # Ensure the job was removed from the info, but stop was not called. self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) + @async_test + def test_job_started_by_other_manager(self): + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, + timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) + + # Send a signal to the callback that the job has been created. + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.CREATE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + # Ensure the create does nothing. + yield From(self.manager._handle_job_change(expired_result)) + self.assertEqual(self.test_executor.stop_builder.call_count, 0) + @async_test def test_expiring_worker_not_started(self): # Test that we are watching before anything else happens @@ -287,7 +307,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): }) # Since the realm was never registered, expiration should do nothing. - yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + yield From(self.manager._handle_job_change(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) @async_test @@ -308,7 +328,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + yield From(self.manager._handle_job_change(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @@ -331,7 +351,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + yield From(self.manager._handle_job_change(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) self.assertEqual(self.job_complete_callback.call_count, 0) @@ -355,7 +375,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + yield From(self.manager._handle_job_change(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @@ -369,7 +389,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): set_result.action = 'set' set_result.key = self.mock_job_key - self.manager._handle_job_expiration_or_delete(set_result) + self.manager._handle_job_change(set_result) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test