import unittest import etcd from trollius import coroutine, get_event_loop, From, Future from mock import Mock from functools import partial from buildman.manager.executor import BuilderExecutor from buildman.manager.ephemeral import EphemeralBuilderManager, ETCD_BUILDER_PREFIX from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) future = coro(*args, **kwargs) loop = get_event_loop() loop.run_until_complete(future) return wrapper class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None self.test_executor = None super(TestEphemeral, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') return self.etcd_client_mock def _create_mock_executor(self, *args, **kwargs): def create_completed_future(result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner self.test_executor = Mock(spec=BuilderExecutor) self.test_executor.start_builder = Mock(side_effect=create_completed_future('123')) self.test_executor.stop_builder = Mock(side_effect=create_completed_future()) return self.test_executor def _create_build_job(self): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, } return mock_job def setUp(self): EphemeralBuilderManager._executors['test'] = self._create_mock_executor self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client self.register_component_callback = Mock() self.uniregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, self.uniregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1' ) self.manager.initialize({'EXECUTOR': 'test'}) def tearDown(self): del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass @async_test def test_schedule_and_complete(self): mock_job = self._create_build_job() self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = BuildComponent(None) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled loop = get_event_loop() is_scheduled = yield From(self.manager.schedule(mock_job, loop)) self.assertTrue(is_scheduled) job_key = ETCD_BUILDER_PREFIX + mock_job.job_details['build_uuid'] self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) self.assertEqual(len(self.test_executor.start_builder.call_args_list), 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], job_key) self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], job_key) self.assertEqual(len(self.register_component_callback.call_args_list), 1) yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component)) self.assertEqual(len(self.test_executor.stop_builder.call_args_list), 1) self.etcd_client_mock.delete.assert_called_once_with(job_key)