diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 48f83e83a..a2e178d17 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -41,7 +41,7 @@ class EtcdAction(object): BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name']) -def createAsyncEtcdClient(worker_threads=1, **kwargs): +def _create_async_etcd_client(worker_threads=1, **kwargs): client = etcd.Client(**kwargs) async_executor = ThreadPoolExecutor(worker_threads) return AsyncWrapper(client, executor=async_executor), async_executor @@ -57,7 +57,7 @@ class EphemeralBuilderManager(BaseManager): } def __init__(self, *args, **kwargs): - self._etcd_client_creator = kwargs.pop('etcd_creator', createAsyncEtcdClient) + self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client) super(EphemeralBuilderManager, self).__init__(*args, **kwargs) @@ -153,30 +153,32 @@ class EphemeralBuilderManager(BaseManager): if etcd_result is None: return - if etcd_result.action == EtcdAction.EXPIRE: - # Handle the expiration - logger.debug('Builder expired, clean up the old build node') - job_metadata = json.loads(etcd_result._prev_node.value) - build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) - build_info = self._build_uuid_to_info.get(build_job.build_uuid) - if build_info is None: - logger.error('Could not find build info for job %s under etcd expire with metadata: %s', - build_job.build_uuid, job_metadata) - return + if etcd_result.action != EtcdAction.EXPIRE: + return - execution_id = build_info.execution_id + # Handle the expiration + logger.debug('Builder expired, clean up the old build node') + job_metadata = json.loads(etcd_result._prev_node.value) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + build_info = self._build_uuid_to_info.get(build_job.build_uuid) + if build_info is None: + logger.error('Could not find build info for job %s under etcd expire with metadata: %s', + build_job.build_uuid, job_metadata) + return - # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark - # the job as incomplete here. - if not job_metadata.get('had_heartbeat', True): - logger.warning('Build executor failed to successfully boot with execution id %s', - execution_id) - self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + execution_id = build_info.execution_id - # Finally, we terminate the build execution for the job. - logger.info('Terminating expired build executor for job %s with execution id %s', - build_job.build_uuid, execution_id) - yield From(self.kill_builder_executor(build_job.build_uuid)) + # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark + # the job as incomplete here. + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build executor failed to successfully boot with execution id %s', + execution_id) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + + # Finally, we terminate the build execution for the job. + logger.info('Terminating expired build executor for job %s with execution id %s', + build_job.build_uuid, execution_id) + yield From(self.kill_builder_executor(build_job.build_uuid)) def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -215,8 +217,10 @@ class EphemeralBuilderManager(BaseManager): # Create the build information block for the registered realm. build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) - execution_id = realm_spec['execution_id'] - executor_name = realm_spec['executor_name'] + + # TODO(jschorr): Remove the back-compat lookups once we've finished the rollout. + execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None)) + executor_name = realm_spec.get('executor_name', 'EC2Executor') build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id, executor_name=executor_name) @@ -433,6 +437,9 @@ class EphemeralBuilderManager(BaseManager): 'execution_id': execution_id, 'executor_name': started_with_executor.name, 'job_queue_item': build_job.job_item, + + # TODO: remove this back-compat field once we finish the rollout. + 'builder_id': execution_id, }) try: