import unittest import etcd import os.path import time import json from trollius import coroutine, get_event_loop, From, Future, sleep from mock import Mock from threading import Event from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX, ETCD_EXPIRE_RESULT) from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) future = coro(*args, **kwargs) loop = get_event_loop() loop.run_until_complete(future) return wrapper class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None self.etcd_wait_event = Event() self.test_executor = None super(TestEphemeral, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): def hang_until_event(*args, **kwargs): time.sleep(.01) # 10ms to simulate network latency self.etcd_wait_event.wait() self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) return self.etcd_client_mock def _create_mock_executor(self, *args, **kwargs): def create_completed_future(result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner self.test_executor = Mock(spec=BuilderExecutor) self.test_executor.start_builder = Mock(side_effect=create_completed_future('123')) self.test_executor.stop_builder = Mock(side_effect=create_completed_future()) return self.test_executor def _create_build_job(self): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, } return mock_job def setUp(self): EphemeralBuilderManager._executors['test'] = self._create_mock_executor self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client self.etcd_wait_event.clear() self.register_component_callback = Mock() self.uniregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, self.uniregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1', 30, ) self.manager.initialize({'EXECUTOR': 'test'}) self.mock_job = self._create_build_job() self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID) def tearDown(self): self.etcd_wait_event.set() self.manager.shutdown() del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass @async_test def test_schedule_and_complete(self): self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = BuildComponent(None) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled loop = get_event_loop() is_scheduled = yield From(self.manager.schedule(self.mock_job, loop)) self.assertTrue(is_scheduled) self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) self.assertEqual(self.test_executor.start_builder.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) self.assertEqual(self.register_component_callback.call_count, 1) yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = ETCD_EXPIRE_RESULT expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) expired_future = Future() expired_future.set_result(expired_result) self.manager._handle_key_expiration(expired_future) yield From(sleep(.01)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed set_result = Mock(sepc=etcd.EtcdResult) set_result.action = 'set' set_result.key = self.mock_job_key set_future = Future() set_future.set_result(set_result) self.manager._handle_key_expiration(set_future) yield From(sleep(.01)) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test def test_heartbeat_response(self): expiration_timestamp = time.time() + 60 builder_result = Mock(spec=etcd.EtcdResult) builder_result.value = json.dumps({ 'builder_id': '123', 'expiration': expiration_timestamp, 'max_expiration': expiration_timestamp, }) self.etcd_client_mock.read = Mock(return_value=builder_result) yield From(self.manager.job_heartbeat(self.mock_job)) # Wait for threads to complete yield From(sleep(.01)) self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) @async_test def test_etcd_read_timeout(self): # Send a signal to the callback that a worker key has been changed read_timeout_future = Future() read_timeout_future.set_exception(ReadTimeoutError(None, None, None)) self.manager._handle_key_expiration(read_timeout_future) yield From(sleep(.01)) self.assertEquals(self.test_executor.stop_builder.call_count, 0)