import unittest import etcd import os.path import time import json from trollius import coroutine, get_event_loop, From, Future, sleep, Return from mock import Mock from threading import Event from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' REALM_ID = '1234-realm' def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) future = coro(*args, **kwargs) loop = get_event_loop() loop.run_until_complete(future) return wrapper class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None self.etcd_wait_event = Event() self.test_executor = None super(TestEphemeral, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): def hang_until_event(*args, **kwargs): time.sleep(.01) # 10ms to simulate network latency self.etcd_wait_event.wait() self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) return self.etcd_client_mock def _create_completed_future(self, result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner def _create_mock_executor(self, *args, **kwargs): self.test_executor = Mock(spec=BuilderExecutor) self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) return self.test_executor def _create_build_job(self): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, } mock_job.job_item = { 'body': json.dumps(mock_job.job_details), 'id': 1, } return mock_job def setUp(self): EphemeralBuilderManager._executors['test'] = self._create_mock_executor self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client self.etcd_wait_event.clear() self.register_component_callback = Mock() self.unregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, self.unregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1', 30, ) self.manager.initialize({'EXECUTOR': 'test'}) self.mock_job = self._create_build_job() self.mock_job_key = os.path.join('building/', BUILD_UUID) def tearDown(self): self.etcd_wait_event.set() self.manager.shutdown() del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass @coroutine def _setup_job_for_managers(self): # Test that we are watching the realm location before anything else happens self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0) self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = Mock(spec=BuildComponent) test_component.builder_realm = REALM_ID test_component.start_build = Mock(side_effect=self._create_completed_future()) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled is_scheduled = yield From(self.manager.schedule(self.mock_job)) self.assertTrue(is_scheduled) self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True) self.assertEqual(self.test_executor.start_builder.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) # Right now the job is not registered with any managers because etcd has not accepted the job self.assertEqual(self.register_component_callback.call_count, 0) realm_created = Mock(spec=etcd.EtcdResult) realm_created.action = EtcdAction.CREATE realm_created.key = os.path.join('realm/', REALM_ID) realm_created.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', 'builder_id': '123', 'job_queue_item': self.mock_job.job_item, }) self.manager._handle_realm_change(realm_created) self.assertEqual(self.register_component_callback.call_count, 1) raise Return(test_component) @async_test def test_schedule_and_complete(self): # Test that a job is properly registered with all of the managers test_component = yield From(self._setup_job_for_managers()) # Take the job ourselves yield From(self.manager.build_component_ready(test_component)) self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) self.etcd_client_mock.delete.reset_mock() # Finish the job yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) @async_test def test_another_manager_takes_job(self): # Prepare a job to be taken by another manager test_component = yield From(self._setup_job_for_managers()) realm_deleted = Mock(spec=etcd.EtcdResult) realm_deleted.action = EtcdAction.DELETE realm_deleted.key = os.path.join('realm/', REALM_ID) realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) realm_deleted._prev_node.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', 'builder_id': '123', 'job_queue_item': self.mock_job.job_item, }) self.manager._handle_realm_change(realm_deleted) self.unregister_component_callback.assert_called_once_with(test_component) @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) self.manager._handle_builder_expiration(expired_result) yield From(sleep(.01)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed set_result = Mock(sepc=etcd.EtcdResult) set_result.action = 'set' set_result.key = self.mock_job_key self.manager._handle_builder_expiration(set_result) yield From(sleep(.01)) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test def test_heartbeat_response(self): expiration_timestamp = time.time() + 60 builder_result = Mock(spec=etcd.EtcdResult) builder_result.value = json.dumps({ 'builder_id': '123', 'expiration': expiration_timestamp, 'max_expiration': expiration_timestamp, }) self.etcd_client_mock.read = Mock(return_value=builder_result) yield From(self.manager.job_heartbeat(self.mock_job)) # Wait for threads to complete yield From(sleep(.01)) self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) if __name__ == '__main__': unittest.main()