Cleanup old executions that never start

Fixes #1727
This commit is contained in:
Joseph Schorr 2016-08-15 16:38:28 -04:00
parent 1c4d3326c2
commit d78361b041
2 changed files with 56 additions and 31 deletions

View file

@ -152,7 +152,7 @@ class EphemeralBuilderManager(BaseManager):
def _handle_job_expiration_or_delete(self, etcd_result):
""" Handler invoked whenever a job expires or is deleted in etcd. """
if etcd_result is None:
return
raise Return()
# Handle the expiration/deletion
job_metadata = json.loads(etcd_result._prev_node.value)
@ -164,14 +164,14 @@ class EphemeralBuilderManager(BaseManager):
if build_info is None:
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
etcd_result.action, build_job.build_uuid, job_metadata)
return
raise Return()
# If the etcd action was not an expiration, then it was already deleted and the execution
# shutdown.
if etcd_result.action != EtcdAction.EXPIRE:
# Build information will no longer be needed; pop it off.
self._build_uuid_to_info.pop(build_job.build_uuid, None)
return
raise Return()
execution_id = build_info.execution_id
@ -187,9 +187,10 @@ class EphemeralBuilderManager(BaseManager):
build_job.build_uuid, execution_id)
yield From(self.kill_builder_executor(build_job.build_uuid))
@coroutine
def _handle_realm_change(self, etcd_result):
if etcd_result is None:
return
raise Return()
if etcd_result.action == EtcdAction.CREATE:
# We must listen on the realm created by ourselves or another worker
@ -200,10 +201,12 @@ class EphemeralBuilderManager(BaseManager):
# We must stop listening for new connections on the specified realm, if we did not get the
# connection
realm_spec = json.loads(etcd_result._prev_node.value)
realm_id = realm_spec['realm']
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
build_uuid = build_job.build_uuid
logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid)
logger.debug('Realm key %s for build %s was %s', realm_id, build_uuid, etcd_result.action)
build_info = self._build_uuid_to_info.get(build_uuid, None)
if build_info is not None:
# Pop the component off. If we find one, then the build has not connected to this manager,
@ -214,6 +217,15 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Unregistering unused component for build %s', build_uuid)
self.unregister_component(build_info.component)
# If the realm has expired, then perform cleanup of the executor.
if etcd_result.action == EtcdAction.EXPIRE:
execution_id = realm_spec.get('execution_id', None)
executor_name = realm_spec.get('executor_name', 'EC2Executor')
logger.info('Realm %s expired for job %s, terminating executor %s with execution id %s',
realm_id, build_uuid, executor_name, execution_id)
yield From(self.terminate_executor(executor_name, execution_id))
else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
@ -228,9 +240,7 @@ class EphemeralBuilderManager(BaseManager):
# Create the build information block for the registered realm.
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
# TODO(jschorr): Remove the back-compat lookups once we've finished the rollout.
execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None))
execution_id = realm_spec.get('execution_id', None)
executor_name = realm_spec.get('executor_name', 'EC2Executor')
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
@ -376,14 +386,11 @@ class EphemeralBuilderManager(BaseManager):
token = str(uuid.uuid4())
nonce = str(uuid.uuid4())
setup_time = self.setup_time()
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = {
# TODO: remove expiration (but not max_expiration) after migration; not used.
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
@ -458,9 +465,6 @@ class EphemeralBuilderManager(BaseManager):
'execution_id': execution_id,
'executor_name': started_with_executor.name,
'job_queue_item': build_job.job_item,
# TODO: remove this back-compat field once we finish the rollout.
'builder_id': execution_id,
})
try:
@ -487,7 +491,7 @@ class EphemeralBuilderManager(BaseManager):
if job is None:
logger.error('Could not find job for the build component on realm %s',
build_component.builder_realm)
return
raise Return()
# Clean up the bookkeeping for allowing any manager to take the job.
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
@ -529,22 +533,24 @@ class EphemeralBuilderManager(BaseManager):
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is None:
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
return
raise Return()
# Remove the build's component.
self._component_to_job.pop(build_info.component, None)
# Stop the build node/executor itself.
executor = self._executor_name_to_executor.get(build_info.executor_name)
yield From(self.terminate_executor(build_info.executor_name, build_info.execution_id))
@coroutine
def terminate_executor(self, executor_name, execution_id):
executor = self._executor_name_to_executor.get(executor_name)
if executor is None:
logger.error('Could not find registered executor %s for build %s',
build_info.executor_name, build_uuid)
return
logger.error('Could not find registered executor %s', executor_name)
raise Return()
# Terminate the executor's execution.
logger.info('Terminating executor for job %s with execution id %s',
build_uuid, build_info.execution_id)
yield From(executor.stop_builder(build_info.execution_id))
logger.info('Terminating executor %s with execution id %s', executor_name, execution_id)
yield From(executor.stop_builder(execution_id))
@coroutine
def job_heartbeat(self, build_job):
@ -555,7 +561,7 @@ class EphemeralBuilderManager(BaseManager):
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
return
raise Return()
build_job_metadata = json.loads(build_job_metadata_response.value)
@ -564,11 +570,7 @@ class EphemeralBuilderManager(BaseManager):
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = {
# TODO: remove expiration (but not max_expiration) after migration; not used.
'expiration': calendar.timegm(new_expiration.timetuple()),
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,

View file

@ -191,7 +191,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps(realm_data)
self.manager._handle_realm_change(realm_created)
yield From(self.manager._handle_realm_change(realm_created))
self.assertEqual(self.register_component_callback.call_count, 1)
# Ensure that we have at least one component node.
@ -239,11 +239,11 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'execution_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
yield From(self.manager._handle_realm_change(realm_deleted))
self.unregister_component_callback.assert_called_once_with(test_component)
@ -372,6 +372,29 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
self.manager._handle_job_expiration_or_delete(set_result)
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_realm_expired(self):
test_component = yield From(self._setup_job_for_managers())
# Send a signal to the callback that a realm has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'realm': REALM_ID,
'execution_id': 'foobar',
'executor_name': 'MockExecutor',
'job_queue_item': {'body': '{"build_uuid": "fakeid"}'},
})
yield From(self.manager._handle_realm_change(expired_result))
# Ensure that the cleanup code for the executor was called.
self.test_executor.stop_builder.assert_called_once_with('foobar')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_heartbeat_response(self):
yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2))