Fix watch of the jobs key in the build manager
This commit is contained in:
parent
32154344ce
commit
742e153133
2 changed files with 64 additions and 36 deletions
|
@ -150,43 +150,51 @@ class EphemeralBuilderManager(BaseManager):
|
|||
self._watch_tasks[watch_task_key] = async(watch_future)
|
||||
|
||||
@coroutine
|
||||
def _handle_job_expiration_or_delete(self, etcd_result):
|
||||
def _handle_job_change(self, etcd_result):
|
||||
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
||||
if etcd_result is None:
|
||||
raise Return()
|
||||
|
||||
# Handle the expiration/deletion
|
||||
job_metadata = json.loads(etcd_result._prev_node.value)
|
||||
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
||||
logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid)
|
||||
|
||||
# Pop the build info.
|
||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
||||
if build_info is None:
|
||||
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
|
||||
etcd_result.action, build_job.build_uuid, job_metadata)
|
||||
if etcd_result.action == EtcdAction.CREATE:
|
||||
raise Return()
|
||||
|
||||
# If the etcd action was not an expiration, then it was already deleted and the execution
|
||||
# shutdown.
|
||||
if etcd_result.action != EtcdAction.EXPIRE:
|
||||
# Build information will no longer be needed; pop it off.
|
||||
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
||||
raise Return()
|
||||
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
||||
# Handle the expiration/deletion
|
||||
job_metadata = json.loads(etcd_result._prev_node.value)
|
||||
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
||||
logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid)
|
||||
|
||||
execution_id = build_info.execution_id
|
||||
# Pop the build info.
|
||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
||||
if build_info is None:
|
||||
logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager',
|
||||
etcd_result.action, build_job.build_uuid, job_metadata)
|
||||
raise Return()
|
||||
|
||||
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
||||
# the job as incomplete here.
|
||||
if not job_metadata.get('had_heartbeat', True):
|
||||
logger.warning('Build executor failed to successfully boot with execution id %s',
|
||||
execution_id)
|
||||
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
||||
# If the etcd action was not an expiration, then it was already deleted by some manager and
|
||||
# the execution was therefore already shutdown.
|
||||
if etcd_result.action != EtcdAction.EXPIRE:
|
||||
# Build information will no longer be needed; pop it off.
|
||||
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
||||
raise Return()
|
||||
|
||||
execution_id = build_info.execution_id
|
||||
|
||||
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
||||
# the job as incomplete here.
|
||||
if not job_metadata.get('had_heartbeat', True):
|
||||
logger.warning('Build executor failed to successfully boot with execution id %s',
|
||||
execution_id)
|
||||
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
||||
|
||||
# Finally, we terminate the build execution for the job.
|
||||
logger.info('Terminating expired build executor for job %s with execution id %s',
|
||||
build_job.build_uuid, execution_id)
|
||||
yield From(self.kill_builder_executor(build_job.build_uuid))
|
||||
|
||||
else:
|
||||
logger.warning('Unexpected action (%s) on job key: %s', etcd_result.action, etcd_result.key)
|
||||
|
||||
# Finally, we terminate the build execution for the job.
|
||||
logger.info('Terminating expired build executor for job %s with execution id %s',
|
||||
build_job.build_uuid, execution_id)
|
||||
yield From(self.kill_builder_executor(build_job.build_uuid))
|
||||
|
||||
@coroutine
|
||||
def _handle_realm_change(self, etcd_result):
|
||||
|
@ -198,7 +206,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
realm_spec = json.loads(etcd_result.value)
|
||||
self._register_realm(realm_spec)
|
||||
|
||||
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
|
||||
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
||||
# We must stop listening for new connections on the specified realm, if we did not get the
|
||||
# connection
|
||||
realm_spec = json.loads(etcd_result._prev_node.value)
|
||||
|
@ -326,7 +334,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
)
|
||||
|
||||
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
||||
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete)
|
||||
self._watch_etcd(self._etcd_job_prefix, self._handle_job_change)
|
||||
|
||||
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
||||
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
||||
|
|
Reference in a new issue