import unittest import etcd import os.path import time import json from trollius import coroutine, get_event_loop, From, Future, sleep, Return from mock import Mock from threading import Event from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX, ETCD_REALM_PREFIX, EtcdAction) from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' REALM_ID = '1234-realm' def async_test(f): def wrapper(*args, **kwargs): coro = coroutine(f) future = coro(*args, **kwargs) loop = get_event_loop() loop.run_until_complete(future) return wrapper class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None self.etcd_wait_event = Event() self.test_executor = None super(TestEphemeral, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): def hang_until_event(*args, **kwargs): time.sleep(.01) # 10ms to simulate network latency self.etcd_wait_event.wait() self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) return self.etcd_client_mock def _create_completed_future(self, result=None): def inner(*args, **kwargs): new_future = Future() new_future.set_result(result) return new_future return inner def _create_mock_executor(self, *args, **kwargs): self.test_executor = Mock(spec=BuilderExecutor) self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) return self.test_executor def _create_build_job(self): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, } mock_job.job_item = { 'body': json.dumps(mock_job.job_details), 'id': 1, } return mock_job def setUp(self): EphemeralBuilderManager._executors['test'] = self._create_mock_executor self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client self.etcd_wait_event.clear() self.register_component_callback = Mock() self.unregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, self.unregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1', 30, ) self.manager.initialize({'EXECUTOR': 'test'}) self.mock_job = self._create_build_job() self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID) def tearDown(self): self.etcd_wait_event.set() self.manager.shutdown() del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass @coroutine def _setup_job_for_managers(self): # Test that we are watching the realm location before anything else happens self.etcd_client_mock.watch.assert_any_call(ETCD_REALM_PREFIX, recursive=True, timeout=0) self.etcd_client_mock.read = Mock(side_effect=KeyError) test_component = Mock(spec=BuildComponent) test_component.builder_realm = REALM_ID test_component.start_build = Mock(side_effect=self._create_completed_future()) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled is_scheduled = yield From(self.manager.schedule(self.mock_job)) self.assertTrue(is_scheduled) self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) self.assertEqual(self.test_executor.start_builder.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) # Right now the job is not registered with any managers because etcd has not accepted the job self.assertEqual(self.register_component_callback.call_count, 0) realm_created = Mock(spec=etcd.EtcdResult) realm_created.action = EtcdAction.CREATE realm_created.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID) realm_created.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', 'builder_id': '123', 'job_queue_item': self.mock_job.job_item, }) self.manager._handle_realm_change(realm_created) self.assertEqual(self.register_component_callback.call_count, 1) raise Return(test_component) @async_test def test_schedule_and_complete(self): # Test that a job is properly registered with all of the managers test_component = yield From(self._setup_job_for_managers()) # Take the job ourselves yield From(self.manager.build_component_ready(test_component)) self.etcd_client_mock.delete.assert_called_once_with(os.path.join(ETCD_REALM_PREFIX, REALM_ID)) self.etcd_client_mock.delete.reset_mock() # Finish the job yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) @async_test def test_another_manager_takes_job(self): # Prepare a job to be taken by another manager test_component = yield From(self._setup_job_for_managers()) realm_deleted = Mock(spec=etcd.EtcdResult) realm_deleted.action = EtcdAction.DELETE realm_deleted.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID) realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) realm_deleted._prev_node.value = json.dumps({ 'realm': REALM_ID, 'token': 'beef', 'builder_id': '123', 'job_queue_item': self.mock_job.job_item, }) self.manager._handle_realm_change(realm_deleted) self.unregister_component_callback.assert_called_once_with(test_component) @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens self.etcd_client_mock.watch.assert_any_call(ETCD_BUILDER_PREFIX, recursive=True, timeout=0) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) self.manager._handle_builder_expiration(expired_result) yield From(sleep(.01)) self.test_executor.stop_builder.assert_called_once_with('1234') self.assertEqual(self.test_executor.stop_builder.call_count, 1) @async_test def test_change_worker(self): # Send a signal to the callback that a worker key has been changed set_result = Mock(sepc=etcd.EtcdResult) set_result.action = 'set' set_result.key = self.mock_job_key self.manager._handle_builder_expiration(set_result) yield From(sleep(.01)) self.assertEquals(self.test_executor.stop_builder.call_count, 0) @async_test def test_heartbeat_response(self): expiration_timestamp = time.time() + 60 builder_result = Mock(spec=etcd.EtcdResult) builder_result.value = json.dumps({ 'builder_id': '123', 'expiration': expiration_timestamp, 'max_expiration': expiration_timestamp, }) self.etcd_client_mock.read = Mock(return_value=builder_result) yield From(self.manager.job_heartbeat(self.mock_job)) # Wait for threads to complete yield From(sleep(.01)) self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)