diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 2c7bbc1df..820a47ad2 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -131,7 +131,6 @@ class EphemeralBuilderManager(BaseManager): except ProtocolError: logger.exception('Exception on etcd watch: %s', etcd_key) - if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter) @@ -148,22 +147,28 @@ class EphemeralBuilderManager(BaseManager): self._watch_tasks[watch_task_key] = async(watch_future) @coroutine - def _handle_job_expiration(self, etcd_result): - """ Handler invoked whenever a job expires in etcd. """ + def _handle_job_expiration_or_delete(self, etcd_result): + """ Handler invoked whenever a job expires or is deleted in etcd. """ if etcd_result is None: return - if etcd_result.action != EtcdAction.EXPIRE: - return - - # Handle the expiration - logger.debug('Builder expired, clean up the old build node') + # Handle the expiration/deletion job_metadata = json.loads(etcd_result._prev_node.value) build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) - build_info = self._build_uuid_to_info.get(build_job.build_uuid) + logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid) + + # Pop the build info. + build_info = self._build_uuid_to_info.get(build_job.build_uuid, None) if build_info is None: - logger.error('Could not find build info for job %s under etcd expire with metadata: %s', - build_job.build_uuid, job_metadata) + logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager', + etcd_result.action, build_job.build_uuid, job_metadata) + return + + # If the etcd action was not an expiration, then it was already deleted and the execution + # shutdown. + if etcd_result.action != EtcdAction.EXPIRE: + # Build information will no longer be needed; pop it off. + self._build_uuid_to_info.pop(build_job.build_uuid, None) return execution_id = build_info.execution_id @@ -196,8 +201,8 @@ class EphemeralBuilderManager(BaseManager): build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) build_uuid = build_job.build_uuid - logger.debug('Realm key expired for build %s', build_uuid) - build_info = self._build_uuid_to_info.pop(build_uuid, None) + logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid) + build_info = self._build_uuid_to_info.get(build_uuid, None) if build_info is not None: # Pop the component off. If we find one, then the build has not connected to this manager, # so we can safely unregister its component. @@ -297,12 +302,18 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5) - (self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads, - host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert, - protocol=etcd_protocol, read_timeout=5) + (self._etcd_client, self._async_thread_executor) = self._etcd_client_creator( + worker_threads, + host=etcd_host, + port=etcd_port, + cert=etcd_auth, + ca_cert=etcd_ca_cert, + protocol=etcd_protocol, + read_timeout=5, + ) self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/') - self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration) + self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete) self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, @@ -511,7 +522,7 @@ class EphemeralBuilderManager(BaseManager): logger.info('Starting termination of executor for job %s', build_uuid) build_info = self._build_uuid_to_info.pop(build_uuid, None) if build_info is None: - logger.error('Could not find build information for build %s', build_uuid) + logger.debug('Build information not found for build %s; skipping termination', build_uuid) return # Remove the build's component. diff --git a/test/test_buildman.py b/test/test_buildman.py index 100ca44ea..eebfd6eeb 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -197,6 +197,9 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): # Ensure that we have at least one component node. self.assertEquals(1, self.manager.num_workers()) + # Ensure that the build info exists. + self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + raise Return(test_component) @async_test @@ -210,6 +213,8 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) self.etcd_client_mock.delete.reset_mock() + self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + # Finish the job yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) @@ -217,6 +222,10 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) + # Ensure the build information is cleaned up. + self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + self.assertEquals(0, self.manager.num_workers()) + @async_test def test_another_manager_takes_job(self): # Prepare a job to be taken by another manager @@ -238,6 +247,29 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): self.unregister_component_callback.assert_called_once_with(test_component) + # Ensure that the executor does not kill the job. + self.assertEqual(self.test_executor.stop_builder.call_count, 0) + + # Ensure that we still have the build info, but not the component. + self.assertEquals(0, self.manager.num_workers()) + self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + + # Delete the job once it has "completed". + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.DELETE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + + # Ensure the job was removed from the info, but stop was not called. + self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + self.assertEqual(self.test_executor.stop_builder.call_count, 0) + @async_test def test_expiring_worker_not_started(self): # Test that we are watching before anything else happens @@ -255,7 +287,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): }) # Since the realm was never registered, expiration should do nothing. - yield From(self.manager._handle_job_expiration(expired_result)) + yield From(self.manager._handle_job_expiration_or_delete(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) @async_test @@ -276,11 +308,35 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration(expired_result)) + yield From(self.manager._handle_job_expiration_or_delete(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) + @async_test + def test_buildjob_deleted(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, + timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.DELETE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_job_expiration_or_delete(expired_result)) + + self.assertEqual(self.test_executor.stop_builder.call_count, 0) + self.assertEqual(self.job_complete_callback.call_count, 0) + self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) + @async_test def test_builder_never_starts(self): test_component = yield From(self._setup_job_for_managers()) @@ -299,7 +355,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): 'job_queue_item': self.mock_job.job_item, }) - yield From(self.manager._handle_job_expiration(expired_result)) + yield From(self.manager._handle_job_expiration_or_delete(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @@ -313,7 +369,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): set_result.action = 'set' set_result.key = self.mock_job_key - self.manager._handle_job_expiration(set_result) + self.manager._handle_job_expiration_or_delete(set_result) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test