Merge pull request #1641 from coreos-inc/fix-buildman-more
Another fix for the record keeping in buildman
This commit is contained in:
commit
53e22b4afb
2 changed files with 89 additions and 22 deletions
|
@ -131,7 +131,6 @@ class EphemeralBuilderManager(BaseManager):
|
|||
except ProtocolError:
|
||||
logger.exception('Exception on etcd watch: %s', etcd_key)
|
||||
|
||||
|
||||
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
||||
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
|
||||
|
||||
|
@ -148,22 +147,28 @@ class EphemeralBuilderManager(BaseManager):
|
|||
self._watch_tasks[watch_task_key] = async(watch_future)
|
||||
|
||||
@coroutine
|
||||
def _handle_job_expiration(self, etcd_result):
|
||||
""" Handler invoked whenever a job expires in etcd. """
|
||||
def _handle_job_expiration_or_delete(self, etcd_result):
|
||||
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
||||
if etcd_result is None:
|
||||
return
|
||||
|
||||
if etcd_result.action != EtcdAction.EXPIRE:
|
||||
return
|
||||
|
||||
# Handle the expiration
|
||||
logger.debug('Builder expired, clean up the old build node')
|
||||
# Handle the expiration/deletion
|
||||
job_metadata = json.loads(etcd_result._prev_node.value)
|
||||
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid)
|
||||
logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid)
|
||||
|
||||
# Pop the build info.
|
||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
||||
if build_info is None:
|
||||
logger.error('Could not find build info for job %s under etcd expire with metadata: %s',
|
||||
build_job.build_uuid, job_metadata)
|
||||
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
|
||||
etcd_result.action, build_job.build_uuid, job_metadata)
|
||||
return
|
||||
|
||||
# If the etcd action was not an expiration, then it was already deleted and the execution
|
||||
# shutdown.
|
||||
if etcd_result.action != EtcdAction.EXPIRE:
|
||||
# Build information will no longer be needed; pop it off.
|
||||
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
||||
return
|
||||
|
||||
execution_id = build_info.execution_id
|
||||
|
@ -196,8 +201,8 @@ class EphemeralBuilderManager(BaseManager):
|
|||
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
||||
build_uuid = build_job.build_uuid
|
||||
|
||||
logger.debug('Realm key expired for build %s', build_uuid)
|
||||
build_info = self._build_uuid_to_info.pop(build_uuid, None)
|
||||
logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid)
|
||||
build_info = self._build_uuid_to_info.get(build_uuid, None)
|
||||
if build_info is not None:
|
||||
# Pop the component off. If we find one, then the build has not connected to this manager,
|
||||
# so we can safely unregister its component.
|
||||
|
@ -297,12 +302,18 @@ class EphemeralBuilderManager(BaseManager):
|
|||
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
||||
|
||||
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
||||
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads,
|
||||
host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert,
|
||||
protocol=etcd_protocol, read_timeout=5)
|
||||
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(
|
||||
worker_threads,
|
||||
host=etcd_host,
|
||||
port=etcd_port,
|
||||
cert=etcd_auth,
|
||||
ca_cert=etcd_ca_cert,
|
||||
protocol=etcd_protocol,
|
||||
read_timeout=5,
|
||||
)
|
||||
|
||||
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
||||
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration)
|
||||
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete)
|
||||
|
||||
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
||||
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
||||
|
@ -511,7 +522,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
logger.info('Starting termination of executor for job %s', build_uuid)
|
||||
build_info = self._build_uuid_to_info.pop(build_uuid, None)
|
||||
if build_info is None:
|
||||
logger.error('Could not find build information for build %s', build_uuid)
|
||||
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
|
||||
return
|
||||
|
||||
# Remove the build's component.
|
||||
|
|
|
@ -197,6 +197,9 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
# Ensure that we have at least one component node.
|
||||
self.assertEquals(1, self.manager.num_workers())
|
||||
|
||||
# Ensure that the build info exists.
|
||||
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
|
||||
raise Return(test_component)
|
||||
|
||||
@async_test
|
||||
|
@ -210,6 +213,8 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
|
||||
self.etcd_client_mock.delete.reset_mock()
|
||||
|
||||
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
|
||||
# Finish the job
|
||||
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
|
||||
|
||||
|
@ -217,6 +222,10 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
|
||||
|
||||
# Ensure the build information is cleaned up.
|
||||
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
self.assertEquals(0, self.manager.num_workers())
|
||||
|
||||
@async_test
|
||||
def test_another_manager_takes_job(self):
|
||||
# Prepare a job to be taken by another manager
|
||||
|
@ -238,6 +247,29 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
|
||||
self.unregister_component_callback.assert_called_once_with(test_component)
|
||||
|
||||
# Ensure that the executor does not kill the job.
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||
|
||||
# Ensure that we still have the build info, but not the component.
|
||||
self.assertEquals(0, self.manager.num_workers())
|
||||
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
|
||||
# Delete the job once it has "completed".
|
||||
expired_result = Mock(spec=etcd.EtcdResult)
|
||||
expired_result.action = EtcdAction.DELETE
|
||||
expired_result.key = self.mock_job_key
|
||||
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
||||
expired_result._prev_node.value = json.dumps({
|
||||
'had_heartbeat': False,
|
||||
'job_queue_item': self.mock_job.job_item,
|
||||
})
|
||||
|
||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
||||
|
||||
# Ensure the job was removed from the info, but stop was not called.
|
||||
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||
|
||||
@async_test
|
||||
def test_expiring_worker_not_started(self):
|
||||
# Test that we are watching before anything else happens
|
||||
|
@ -255,7 +287,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
})
|
||||
|
||||
# Since the realm was never registered, expiration should do nothing.
|
||||
yield From(self.manager._handle_job_expiration(expired_result))
|
||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||
|
||||
@async_test
|
||||
|
@ -276,11 +308,35 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
'job_queue_item': self.mock_job.job_item,
|
||||
})
|
||||
|
||||
yield From(self.manager._handle_job_expiration(expired_result))
|
||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
||||
|
||||
self.test_executor.stop_builder.assert_called_once_with('123')
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||
|
||||
@async_test
|
||||
def test_buildjob_deleted(self):
|
||||
test_component = yield From(self._setup_job_for_managers())
|
||||
|
||||
# Test that we are watching before anything else happens
|
||||
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
||||
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
||||
|
||||
# Send a signal to the callback that a worker has expired
|
||||
expired_result = Mock(spec=etcd.EtcdResult)
|
||||
expired_result.action = EtcdAction.DELETE
|
||||
expired_result.key = self.mock_job_key
|
||||
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
||||
expired_result._prev_node.value = json.dumps({
|
||||
'had_heartbeat': False,
|
||||
'job_queue_item': self.mock_job.job_item,
|
||||
})
|
||||
|
||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
||||
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||
self.assertEqual(self.job_complete_callback.call_count, 0)
|
||||
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||
|
||||
@async_test
|
||||
def test_builder_never_starts(self):
|
||||
test_component = yield From(self._setup_job_for_managers())
|
||||
|
@ -299,7 +355,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
'job_queue_item': self.mock_job.job_item,
|
||||
})
|
||||
|
||||
yield From(self.manager._handle_job_expiration(expired_result))
|
||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
||||
|
||||
self.test_executor.stop_builder.assert_called_once_with('123')
|
||||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||
|
@ -313,7 +369,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
set_result.action = 'set'
|
||||
set_result.key = self.mock_job_key
|
||||
|
||||
self.manager._handle_job_expiration(set_result)
|
||||
self.manager._handle_job_expiration_or_delete(set_result)
|
||||
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
||||
|
||||
@async_test
|
||||
|
|
Reference in a new issue