193 lines
6.6 KiB
Python
193 lines
6.6 KiB
Python
import unittest
|
|
import etcd
|
|
import os.path
|
|
import time
|
|
import json
|
|
|
|
from trollius import coroutine, get_event_loop, From, Future, sleep
|
|
from mock import Mock
|
|
from threading import Event
|
|
from urllib3.exceptions import ReadTimeoutError
|
|
|
|
from buildman.manager.executor import BuilderExecutor
|
|
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
|
|
ETCD_EXPIRE_RESULT)
|
|
from buildman.server import BuildJobResult
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
|
|
|
|
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
|
|
|
|
|
|
def async_test(f):
|
|
def wrapper(*args, **kwargs):
|
|
coro = coroutine(f)
|
|
future = coro(*args, **kwargs)
|
|
loop = get_event_loop()
|
|
loop.run_until_complete(future)
|
|
return wrapper
|
|
|
|
class TestEphemeral(unittest.TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
self.etcd_client_mock = None
|
|
self.etcd_wait_event = Event()
|
|
self.test_executor = None
|
|
super(TestEphemeral, self).__init__(*args, **kwargs)
|
|
|
|
def _create_mock_etcd_client(self, *args, **kwargs):
|
|
def hang_until_event(*args, **kwargs):
|
|
time.sleep(.01) # 10ms to simulate network latency
|
|
self.etcd_wait_event.wait()
|
|
|
|
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
|
|
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
|
|
return self.etcd_client_mock
|
|
|
|
def _create_mock_executor(self, *args, **kwargs):
|
|
def create_completed_future(result=None):
|
|
def inner(*args, **kwargs):
|
|
new_future = Future()
|
|
new_future.set_result(result)
|
|
return new_future
|
|
return inner
|
|
|
|
self.test_executor = Mock(spec=BuilderExecutor)
|
|
self.test_executor.start_builder = Mock(side_effect=create_completed_future('123'))
|
|
self.test_executor.stop_builder = Mock(side_effect=create_completed_future())
|
|
return self.test_executor
|
|
|
|
def _create_build_job(self):
|
|
mock_job = Mock()
|
|
mock_job.job_details = {
|
|
'build_uuid': BUILD_UUID,
|
|
}
|
|
return mock_job
|
|
|
|
def setUp(self):
|
|
EphemeralBuilderManager._executors['test'] = self._create_mock_executor
|
|
|
|
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
|
|
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
|
|
self.etcd_wait_event.clear()
|
|
|
|
self.register_component_callback = Mock()
|
|
self.uniregister_component_callback = Mock()
|
|
self.job_heartbeat_callback = Mock()
|
|
self.job_complete_callback = Mock()
|
|
|
|
self.manager = EphemeralBuilderManager(
|
|
self.register_component_callback,
|
|
self.uniregister_component_callback,
|
|
self.job_heartbeat_callback,
|
|
self.job_complete_callback,
|
|
'127.0.0.1',
|
|
30,
|
|
)
|
|
|
|
self.manager.initialize({'EXECUTOR': 'test'})
|
|
|
|
self.mock_job = self._create_build_job()
|
|
self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID)
|
|
|
|
def tearDown(self):
|
|
self.etcd_wait_event.set()
|
|
|
|
self.manager.shutdown()
|
|
|
|
del EphemeralBuilderManager._executors['test']
|
|
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
|
|
|
|
@async_test
|
|
def test_schedule_and_complete(self):
|
|
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
|
test_component = BuildComponent(None)
|
|
self.register_component_callback.return_value = test_component
|
|
|
|
# Ask for a builder to be scheduled
|
|
loop = get_event_loop()
|
|
is_scheduled = yield From(self.manager.schedule(self.mock_job, loop))
|
|
|
|
self.assertTrue(is_scheduled)
|
|
|
|
self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
|
|
self.assertEqual(self.test_executor.start_builder.call_count, 1)
|
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
|
|
|
|
self.assertEqual(self.register_component_callback.call_count, 1)
|
|
|
|
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
|
|
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
|
|
|
|
@async_test
|
|
def test_expiring_worker(self):
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True,
|
|
timeout=0)
|
|
|
|
# Send a signal to the callback that a worker has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = ETCD_EXPIRE_RESULT
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
|
|
expired_future = Future()
|
|
expired_future.set_result(expired_result)
|
|
|
|
self.manager._handle_key_expiration(expired_future)
|
|
|
|
yield From(sleep(.01))
|
|
|
|
self.test_executor.stop_builder.assert_called_once_with('1234')
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
|
|
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
|
|
|
|
@async_test
|
|
def test_change_worker(self):
|
|
# Send a signal to the callback that a worker key has been changed
|
|
set_result = Mock(sepc=etcd.EtcdResult)
|
|
set_result.action = 'set'
|
|
set_result.key = self.mock_job_key
|
|
set_future = Future()
|
|
set_future.set_result(set_result)
|
|
|
|
self.manager._handle_key_expiration(set_future)
|
|
|
|
yield From(sleep(.01))
|
|
|
|
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
@async_test
|
|
def test_heartbeat_response(self):
|
|
expiration_timestamp = time.time() + 60
|
|
builder_result = Mock(spec=etcd.EtcdResult)
|
|
builder_result.value = json.dumps({
|
|
'builder_id': '123',
|
|
'expiration': expiration_timestamp,
|
|
'max_expiration': expiration_timestamp,
|
|
})
|
|
self.etcd_client_mock.read = Mock(return_value=builder_result)
|
|
|
|
yield From(self.manager.job_heartbeat(self.mock_job))
|
|
|
|
# Wait for threads to complete
|
|
yield From(sleep(.01))
|
|
|
|
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
|
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
|
|
|
@async_test
|
|
def test_etcd_read_timeout(self):
|
|
# Send a signal to the callback that a worker key has been changed
|
|
read_timeout_future = Future()
|
|
read_timeout_future.set_exception(ReadTimeoutError(None, None, None))
|
|
|
|
self.manager._handle_key_expiration(read_timeout_future)
|
|
|
|
yield From(sleep(.01))
|
|
|
|
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|