import unittest import etcd import time import json import uuid import os from trollius import coroutine, get_event_loop, From, Future, Return from mock import Mock, ANY from buildman.manager.executor import BuilderExecutor, ExecutorException from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction, ETCD_MAX_WATCH_TIMEOUT) from buildman.component.buildcomponent import BuildComponent from buildman.server import BuildJobResult from util.metrics.metricqueue import duration_collector_async from app import metric_queue BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' REALM_ID = '1234-realm' def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) future = coro(*args, **kwargs) loop = get_event_loop() loop.run_until_complete(future) return wrapper class TestExecutor(BuilderExecutor): job_started = None job_stopped = None @coroutine @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"]) def start_builder(self, realm, token, build_uuid): self.job_started = str(uuid.uuid4()) raise Return(self.job_started) @coroutine def stop_builder(self, execution_id): self.job_stopped = execution_id class BadExecutor(BuilderExecutor): @coroutine @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"]) def start_builder(self, realm, token, build_uuid): raise ExecutorException('raised on purpose!') class EphemeralBuilderTestCase(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): def create_future(*args, **kwargs): return Future() self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.read = Mock(side_effect=KeyError) self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future()) self.etcd_client_mock.watch = Mock(side_effect=create_future) self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id')) return (self.etcd_client_mock, None) def _create_completed_future(self, result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner def setUp(self): self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS) def tearDown(self): EphemeralBuilderManager.EXECUTORS = self._existing_executors @coroutine def _register_component(self, realm_spec, build_component, token): raise Return('hello') def _create_build_job(self, namespace='namespace', retries=3): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, } mock_job.job_item = { 'body': json.dumps(mock_job.job_details), 'id': 1, } mock_job.namespace = namespace mock_job.retries_remaining = retries mock_job.build_uuid = BUILD_UUID return mock_job class TestEphemeralLifecycle(EphemeralBuilderTestCase): """ Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """ def __init__(self, *args, **kwargs): super(TestEphemeralLifecycle, self).__init__(*args, **kwargs) self.etcd_client_mock = None self.test_executor = None def _create_completed_future(self, result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner def _create_mock_executor(self, *args, **kwargs): self.test_executor = Mock(spec=BuilderExecutor) self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) self.test_executor.name = 'MockExecutor' self.test_executor.minimum_retry_threshold = 0 return self.test_executor def setUp(self): super(TestEphemeralLifecycle, self).setUp() EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor self.register_component_callback = Mock() self.unregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, self.unregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1', 30, etcd_creator=self._create_mock_etcd_client, ) self.manager.initialize({'EXECUTOR': 'test'}) # Test that we are watching the realm and jobs key once initialized. self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None, timeout=ETCD_MAX_WATCH_TIMEOUT) self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None, timeout=ETCD_MAX_WATCH_TIMEOUT) self.mock_job = self._create_build_job() self.mock_job_key = os.path.join('building/', BUILD_UUID) def tearDown(self): super(TestEphemeralLifecycle, self).tearDown() self.manager.shutdown() @coroutine def _setup_job_for_managers(self): self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = Mock(spec=BuildComponent) test_component.builder_realm = REALM_ID test_component.start_build = Mock(side_effect=self._create_completed_future()) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled self.etcd_client_mock.write.reset() is_scheduled = yield From(self.manager.schedule(self.mock_job)) self.assertTrue(is_scheduled) self.assertEqual(self.test_executor.start_builder.call_count, 1) # Ensure the job and realm were added to etcd. self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0) realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1]) realm_data['realm'] = REALM_ID # Right now the job is not registered with any managers because etcd has not accepted the job self.assertEqual(self.register_component_callback.call_count, 0) # Fire off a realm changed with the same data. realm_created = Mock(spec=etcd.EtcdResult) realm_created.action = EtcdAction.CREATE realm_created.key = os.path.join('realm/', REALM_ID) realm_created.value = json.dumps(realm_data) yield From(self.manager._handle_realm_change(realm_created)) self.assertEqual(self.register_component_callback.call_count, 1) # Ensure that we have at least one component node. self.assertEquals(1, self.manager.num_workers()) # Ensure that the build info exists. self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) raise Return(test_component) @async_test def test_schedule_and_complete(self): # Test that a job is properly registered with all of the managers test_component = yield From(self._setup_job_for_managers()) # Take the job ourselves yield From(self.manager.build_component_ready(test_component)) self.etcd_client_mock.read.assert_called_with(os.path.join('realm/', REALM_ID)) self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) self.etcd_client_mock.delete.reset_mock() self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) # Finish the job yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) # Ensure that the executor kills the job. self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) # Ensure the build information is cleaned up. self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) self.assertEquals(0, self.manager.num_workers()) @async_test def test_another_manager_takes_job(self): # Prepare a job to be taken by another manager test_component = yield From(self._setup_job_for_managers()) realm_deleted = Mock(spec=etcd.EtcdResult) realm_deleted.action = EtcdAction.DELETE realm_deleted.key = os.path.join('realm/', REALM_ID) realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) realm_deleted._prev_node.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', 'execution_id': '123', 'job_queue_item': self.mock_job.job_item, }) yield From(self.manager._handle_realm_change(realm_deleted)) self.unregister_component_callback.assert_called_once_with(test_component) # Ensure that the executor does not kill the job. self.assertEqual(self.test_executor.stop_builder.call_count, 0) # Ensure that we still have the build info, but not the component. self.assertEquals(0, self.manager.num_workers()) self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) # Delete the job once it has "completed". expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.DELETE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': False, 'job_queue_item': self.mock_job.job_item, }) yield From(self.manager._handle_job_change(expired_result)) # Ensure the job was removed from the info, but stop was not called. self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) @async_test def test_job_started_by_other_manager(self): # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) # Send a signal to the callback that the job has been created. expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.CREATE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': False, 'job_queue_item': self.mock_job.job_item, }) # Ensure the create does nothing. yield From(self.manager._handle_job_change(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) @async_test def test_expiring_worker_not_started(self): # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': True, 'job_queue_item': self.mock_job.job_item, }) # Since the realm was never registered, expiration should do nothing. yield From(self.manager._handle_job_change(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) @async_test def test_expiring_worker_started(self): test_component = yield From(self._setup_job_for_managers()) # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': True, 'job_queue_item': self.mock_job.job_item, }) yield From(self.manager._handle_job_change(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @async_test def test_buildjob_deleted(self): test_component = yield From(self._setup_job_for_managers()) # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.DELETE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': False, 'job_queue_item': self.mock_job.job_item, }) yield From(self.manager._handle_job_change(expired_result)) self.assertEqual(self.test_executor.stop_builder.call_count, 0) self.assertEqual(self.job_complete_callback.call_count, 0) self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID)) @async_test def test_builder_never_starts(self): test_component = yield From(self._setup_job_for_managers()) # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'had_heartbeat': False, 'job_queue_item': self.mock_job.job_item, }) yield From(self.manager._handle_job_change(expired_result)) self.test_executor.stop_builder.assert_called_once_with('123') self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE, 'MockExecutor') @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed set_result = Mock(sepc=etcd.EtcdResult) set_result.action = 'set' set_result.key = self.mock_job_key self.manager._handle_job_change(set_result) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test def test_realm_expired(self): test_component = yield From(self._setup_job_for_managers()) # Send a signal to the callback that a realm has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({ 'realm': REALM_ID, 'execution_id': 'foobar', 'executor_name': 'MockExecutor', 'job_queue_item': {'body': '{"build_uuid": "fakeid"}'}, }) yield From(self.manager._handle_realm_change(expired_result)) # Ensure that the cleanup code for the executor was called. self.test_executor.stop_builder.assert_called_once_with('foobar') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @async_test def test_heartbeat_response(self): yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2)) @async_test def test_heartbeat_future_expiration(self): yield From(self.assertHeartbeatWithExpiration(10, 10, ranged=True)) @async_test def test_heartbeat_expired(self): yield From(self.assertHeartbeatWithExpiration(-60, 0)) @coroutine def assertHeartbeatWithExpiration(self, expires_in_sec, expected_ttl, ranged=False): expiration_timestamp = time.time() + expires_in_sec builder_result = Mock(spec=etcd.EtcdResult) builder_result.value = json.dumps({ 'expiration': expiration_timestamp, 'max_expiration': expiration_timestamp, }) self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result)) yield From(self.manager.job_heartbeat(self.mock_job)) self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1]) self.assertTrue(job_key_data['had_heartbeat']) self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item']) if not ranged: self.assertEquals(expected_ttl, self.etcd_client_mock.write.call_args_list[0][1]['ttl']) else: self.assertTrue(self.etcd_client_mock.write.call_args_list[0][1]['ttl'] <= expected_ttl) class TestEphemeral(EphemeralBuilderTestCase): """ Simple unit tests for the ephemeral builder around config management, starting and stopping jobs. """ def setUp(self): super(TestEphemeral, self).setUp() unregister_component_callback = Mock() job_heartbeat_callback = Mock() job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self._register_component, unregister_component_callback, job_heartbeat_callback, job_complete_callback, '127.0.0.1', 30, etcd_creator=self._create_mock_etcd_client, ) def tearDown(self): super(TestEphemeral, self).tearDown() self.manager.shutdown() def test_verify_executor_oldconfig(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTOR': 'test', 'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42) }) # Ensure that we have a single test executor. self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) self.assertEquals('TestExecutor', self.manager.registered_executors[0].name) def test_verify_executor_newconfig(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', 'MINIMUM_RETRY_THRESHOLD': 42 }] }) # Ensure that we have a single test executor. self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) def test_multiple_executors_samename(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor with self.assertRaises(Exception): self.manager.initialize({ 'EXECUTORS': [ { 'NAME': 'primary', 'EXECUTOR': 'test', 'MINIMUM_RETRY_THRESHOLD': 42 }, { 'NAME': 'primary', 'EXECUTOR': 'anotherexecutor', 'MINIMUM_RETRY_THRESHOLD': 24 }, ] }) def test_verify_multiple_executors(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [ { 'NAME': 'primary', 'EXECUTOR': 'test', 'MINIMUM_RETRY_THRESHOLD': 42 }, { 'NAME': 'secondary', 'EXECUTOR': 'anotherexecutor', 'MINIMUM_RETRY_THRESHOLD': 24 }, ] }) # Ensure that we have a two test executors. self.assertEquals(2, len(self.manager.registered_executors)) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold) def test_skip_invalid_executor(self): self.manager.initialize({ 'EXECUTORS': [ { 'EXECUTOR': 'unknown', 'MINIMUM_RETRY_THRESHOLD': 42 }, ] }) self.assertEquals(0, len(self.manager.registered_executors)) @async_test def test_schedule_job_namespace_filter(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', 'NAMESPACE_WHITELIST': ['something'], }] }) # Try with a build job in an invalid namespace. build_job = self._create_build_job(namespace='somethingelse') result = yield From(self.manager.schedule(build_job)) self.assertFalse(result[0]) # Try with a valid namespace. build_job = self._create_build_job(namespace='something') result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) @async_test def test_schedule_job_retries_filter(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', 'MINIMUM_RETRY_THRESHOLD': 2, }] }) # Try with a build job that has too few retries. build_job = self._create_build_job(retries=1) result = yield From(self.manager.schedule(build_job)) self.assertFalse(result[0]) # Try with a valid job. build_job = self._create_build_job(retries=2) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) @async_test def test_schedule_job_executor_fallback(self): EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [ { 'NAME': 'primary', 'EXECUTOR': 'primary', 'NAMESPACE_WHITELIST': ['something'], 'MINIMUM_RETRY_THRESHOLD': 3, }, { 'NAME': 'secondary', 'EXECUTOR': 'secondary', 'MINIMUM_RETRY_THRESHOLD': 2, }, ] }) # Try a job not matching the primary's namespace filter. Should schedule on secondary. build_job = self._create_build_job(namespace='somethingelse') result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) self.assertIsNone(self.manager.registered_executors[0].job_started) self.assertIsNotNone(self.manager.registered_executors[1].job_started) self.manager.registered_executors[0].job_started = None self.manager.registered_executors[1].job_started = None # Try a job not matching the primary's retry minimum. Should schedule on secondary. build_job = self._create_build_job(namespace='something', retries=2) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) self.assertIsNone(self.manager.registered_executors[0].job_started) self.assertIsNotNone(self.manager.registered_executors[1].job_started) self.manager.registered_executors[0].job_started = None self.manager.registered_executors[1].job_started = None # Try a job matching the primary. Should schedule on the primary. build_job = self._create_build_job(namespace='something', retries=3) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) self.assertIsNotNone(self.manager.registered_executors[0].job_started) self.assertIsNone(self.manager.registered_executors[1].job_started) self.manager.registered_executors[0].job_started = None self.manager.registered_executors[1].job_started = None # Try a job not matching either's restrictions. build_job = self._create_build_job(namespace='somethingelse', retries=1) result = yield From(self.manager.schedule(build_job)) self.assertFalse(result[0]) self.assertIsNone(self.manager.registered_executors[0].job_started) self.assertIsNone(self.manager.registered_executors[1].job_started) self.manager.registered_executors[0].job_started = None self.manager.registered_executors[1].job_started = None @async_test def test_schedule_job_single_executor(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTOR': 'test', 'EXECUTOR_CONFIG': {}, }) build_job = self._create_build_job(namespace='something', retries=3) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) self.assertIsNotNone(self.manager.registered_executors[0].job_started) self.manager.registered_executors[0].job_started = None build_job = self._create_build_job(namespace='something', retries=0) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) self.assertIsNotNone(self.manager.registered_executors[0].job_started) self.manager.registered_executors[0].job_started = None @async_test def test_executor_exception(self): EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor self.manager.initialize({ 'EXECUTOR': 'bad', 'EXECUTOR_CONFIG': {}, }) build_job = self._create_build_job(namespace='something', retries=3) result = yield From(self.manager.schedule(build_job)) self.assertFalse(result[0]) @async_test def test_schedule_and_stop(self): EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTOR': 'test', 'EXECUTOR_CONFIG': {}, }) # Start the build job. build_job = self._create_build_job(namespace='something', retries=3) result = yield From(self.manager.schedule(build_job)) self.assertTrue(result[0]) executor = self.manager.registered_executors[0] self.assertIsNotNone(executor.job_started) # Register the realm so the build information is added. yield From(self.manager._register_realm({ 'realm': str(uuid.uuid4()), 'token': str(uuid.uuid4()), 'execution_id': executor.job_started, 'executor_name': 'TestExecutor', 'build_uuid': build_job.build_uuid, 'job_queue_item': build_job.job_item, })) # Stop the build job. yield From(self.manager.kill_builder_executor(build_job.build_uuid)) self.assertEquals(executor.job_stopped, executor.job_started) if __name__ == '__main__': unittest.main()