diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 1e1e06af7..4b097a990 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -152,7 +152,7 @@ class EphemeralBuilderManager(BaseManager): def _handle_job_expiration_or_delete(self, etcd_result): """ Handler invoked whenever a job expires or is deleted in etcd. """ if etcd_result is None: - return + raise Return() # Handle the expiration/deletion job_metadata = json.loads(etcd_result._prev_node.value) @@ -164,14 +164,14 @@ class EphemeralBuilderManager(BaseManager): if build_info is None: logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager', etcd_result.action, build_job.build_uuid, job_metadata) - return + raise Return() # If the etcd action was not an expiration, then it was already deleted and the execution # shutdown. if etcd_result.action != EtcdAction.EXPIRE: # Build information will no longer be needed; pop it off. self._build_uuid_to_info.pop(build_job.build_uuid, None) - return + raise Return() execution_id = build_info.execution_id @@ -187,9 +187,10 @@ class EphemeralBuilderManager(BaseManager): build_job.build_uuid, execution_id) yield From(self.kill_builder_executor(build_job.build_uuid)) + @coroutine def _handle_realm_change(self, etcd_result): if etcd_result is None: - return + raise Return() if etcd_result.action == EtcdAction.CREATE: # We must listen on the realm created by ourselves or another worker @@ -200,10 +201,12 @@ class EphemeralBuilderManager(BaseManager): # We must stop listening for new connections on the specified realm, if we did not get the # connection realm_spec = json.loads(etcd_result._prev_node.value) + realm_id = realm_spec['realm'] + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) build_uuid = build_job.build_uuid - logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid) + logger.debug('Realm key %s for build %s was %s', realm_id, build_uuid, etcd_result.action) build_info = self._build_uuid_to_info.get(build_uuid, None) if build_info is not None: # Pop the component off. If we find one, then the build has not connected to this manager, @@ -214,6 +217,15 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Unregistering unused component for build %s', build_uuid) self.unregister_component(build_info.component) + # If the realm has expired, then perform cleanup of the executor. + if etcd_result.action == EtcdAction.EXPIRE: + execution_id = realm_spec.get('execution_id', None) + executor_name = realm_spec.get('executor_name', 'EC2Executor') + + logger.info('Realm %s expired for job %s, terminating executor %s with execution id %s', + realm_id, build_uuid, executor_name, execution_id) + yield From(self.terminate_executor(executor_name, execution_id)) + else: logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) @@ -228,9 +240,7 @@ class EphemeralBuilderManager(BaseManager): # Create the build information block for the registered realm. build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) - - # TODO(jschorr): Remove the back-compat lookups once we've finished the rollout. - execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None)) + execution_id = realm_spec.get('execution_id', None) executor_name = realm_spec.get('executor_name', 'EC2Executor') build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id, @@ -376,14 +386,11 @@ class EphemeralBuilderManager(BaseManager): token = str(uuid.uuid4()) nonce = str(uuid.uuid4()) setup_time = self.setup_time() - expiration = datetime.utcnow() + timedelta(seconds=setup_time) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) payload = { - # TODO: remove expiration (but not max_expiration) after migration; not used. - 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), 'nonce': nonce, 'had_heartbeat': False, @@ -458,9 +465,6 @@ class EphemeralBuilderManager(BaseManager): 'execution_id': execution_id, 'executor_name': started_with_executor.name, 'job_queue_item': build_job.job_item, - - # TODO: remove this back-compat field once we finish the rollout. - 'builder_id': execution_id, }) try: @@ -487,7 +491,7 @@ class EphemeralBuilderManager(BaseManager): if job is None: logger.error('Could not find job for the build component on realm %s', build_component.builder_realm) - return + raise Return() # Clean up the bookkeeping for allowing any manager to take the job. yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm))) @@ -529,22 +533,24 @@ class EphemeralBuilderManager(BaseManager): build_info = self._build_uuid_to_info.pop(build_uuid, None) if build_info is None: logger.debug('Build information not found for build %s; skipping termination', build_uuid) - return + raise Return() # Remove the build's component. self._component_to_job.pop(build_info.component, None) # Stop the build node/executor itself. - executor = self._executor_name_to_executor.get(build_info.executor_name) + yield From(self.terminate_executor(build_info.executor_name, build_info.execution_id)) + + @coroutine + def terminate_executor(self, executor_name, execution_id): + executor = self._executor_name_to_executor.get(executor_name) if executor is None: - logger.error('Could not find registered executor %s for build %s', - build_info.executor_name, build_uuid) - return + logger.error('Could not find registered executor %s', executor_name) + raise Return() # Terminate the executor's execution. - logger.info('Terminating executor for job %s with execution id %s', - build_uuid, build_info.execution_id) - yield From(executor.stop_builder(build_info.execution_id)) + logger.info('Terminating executor %s with execution id %s', executor_name, execution_id) + yield From(executor.stop_builder(execution_id)) @coroutine def job_heartbeat(self, build_job): @@ -555,7 +561,7 @@ class EphemeralBuilderManager(BaseManager): build_job_metadata_response = yield From(self._etcd_client.read(job_key)) except (KeyError, etcd.EtcdKeyError): logger.info('Job %s no longer exists in etcd', build_job.build_uuid) - return + raise Return() build_job_metadata = json.loads(build_job_metadata_response.value) @@ -564,11 +570,7 @@ class EphemeralBuilderManager(BaseManager): max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds())) ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec) - new_expiration = datetime.utcnow() + timedelta(seconds=ttl) - payload = { - # TODO: remove expiration (but not max_expiration) after migration; not used. - 'expiration': calendar.timegm(new_expiration.timetuple()), 'job_queue_item': build_job.job_item, 'max_expiration': build_job_metadata['max_expiration'], 'had_heartbeat': True, diff --git a/test/test_buildman.py b/test/test_buildman.py index e5849424c..7015ee58d 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -191,7 +191,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): realm_created.key = os.path.join('realm/', REALM_ID) realm_created.value = json.dumps(realm_data) - self.manager._handle_realm_change(realm_created) + yield From(self.manager._handle_realm_change(realm_created)) self.assertEqual(self.register_component_callback.call_count, 1) # Ensure that we have at least one component node. @@ -239,11 +239,11 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): realm_deleted._prev_node.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', - 'builder_id': '123', + 'execution_id': '123', 'job_queue_item': self.mock_job.job_item, }) - self.manager._handle_realm_change(realm_deleted) + yield From(self.manager._handle_realm_change(realm_deleted)) self.unregister_component_callback.assert_called_once_with(test_component) @@ -372,6 +372,29 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): self.manager._handle_job_expiration_or_delete(set_result) self.assertEquals(self.test_executor.stop_builder.call_count, 0) + @async_test + def test_realm_expired(self): + test_component = yield From(self._setup_job_for_managers()) + + # Send a signal to the callback that a realm has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'realm': REALM_ID, + 'execution_id': 'foobar', + 'executor_name': 'MockExecutor', + 'job_queue_item': {'body': '{"build_uuid": "fakeid"}'}, + }) + + yield From(self.manager._handle_realm_change(expired_result)) + + # Ensure that the cleanup code for the executor was called. + self.test_executor.stop_builder.assert_called_once_with('foobar') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + @async_test def test_heartbeat_response(self): yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2))