269 lines
8.2 KiB
Python
269 lines
8.2 KiB
Python
import unittest
|
|
import etcd
|
|
import time
|
|
import json
|
|
import uuid
|
|
|
|
from trollius import coroutine, get_event_loop, From, Future, Return
|
|
from mock import Mock
|
|
from threading import Event
|
|
|
|
from buildman.manager.executor import BuilderExecutor
|
|
from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS
|
|
|
|
|
|
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
|
|
REALM_ID = '1234-realm'
|
|
|
|
|
|
def async_test(f):
|
|
def wrapper(*args, **kwargs):
|
|
coro = coroutine(f)
|
|
future = coro(*args, **kwargs)
|
|
loop = get_event_loop()
|
|
loop.run_until_complete(future)
|
|
return wrapper
|
|
|
|
|
|
class TestExecutor(BuilderExecutor):
|
|
job_started = None
|
|
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
self.job_started = True
|
|
raise Return(str(uuid.uuid4))
|
|
|
|
|
|
class TestEphemeral(unittest.TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
self.etcd_client_mock = None
|
|
self.etcd_wait_event = Event()
|
|
self.test_executor = None
|
|
super(TestEphemeral, self).__init__(*args, **kwargs)
|
|
|
|
def _create_mock_etcd_client(self, *args, **kwargs):
|
|
def hang_until_event(*args, **kwargs):
|
|
time.sleep(.01) # 10ms to simulate network latency
|
|
self.etcd_wait_event.wait()
|
|
|
|
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
|
|
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
|
|
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
|
self.etcd_client_mock.write = Mock()
|
|
return self.etcd_client_mock
|
|
|
|
def _create_completed_future(self, result=None):
|
|
def inner(*args, **kwargs):
|
|
new_future = Future()
|
|
new_future.set_result(result)
|
|
return new_future
|
|
return inner
|
|
|
|
def _create_build_job(self, namespace='namespace', retries=3):
|
|
mock_job = Mock()
|
|
mock_job.job_details = {
|
|
'build_uuid': BUILD_UUID,
|
|
}
|
|
mock_job.job_item = {
|
|
'body': json.dumps(mock_job.job_details),
|
|
'id': 1,
|
|
}
|
|
|
|
mock_job.namespace = namespace
|
|
mock_job.retries_remaining = retries
|
|
return mock_job
|
|
|
|
def setUp(self):
|
|
self._existing_executors = dict(EXECUTORS)
|
|
|
|
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
|
|
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
|
|
|
|
self.etcd_wait_event.clear()
|
|
|
|
self.register_component_callback = Mock()
|
|
self.unregister_component_callback = Mock()
|
|
self.job_heartbeat_callback = Mock()
|
|
self.job_complete_callback = Mock()
|
|
|
|
self.manager = EphemeralBuilderManager(
|
|
self.register_component_callback,
|
|
self.unregister_component_callback,
|
|
self.job_heartbeat_callback,
|
|
self.job_complete_callback,
|
|
'127.0.0.1',
|
|
30,
|
|
)
|
|
|
|
def tearDown(self):
|
|
self.etcd_wait_event.set()
|
|
self.manager.shutdown()
|
|
|
|
EXECUTORS = self._existing_executors
|
|
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
|
|
|
|
def test_verify_executor_oldconfig(self):
|
|
EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTOR': 'test',
|
|
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
|
|
})
|
|
|
|
# Ensure that we have a single test executor.
|
|
self.assertEquals(1, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
|
|
def test_verify_executor_newconfig(self):
|
|
EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
}]
|
|
})
|
|
|
|
# Ensure that we have a single test executor.
|
|
self.assertEquals(1, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
|
|
def test_verify_multiple_executors(self):
|
|
EXECUTORS['test'] = TestExecutor
|
|
EXECUTORS['anotherexecutor'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
},
|
|
{
|
|
'EXECUTOR': 'anotherexecutor',
|
|
'MINIMUM_RETRY_THRESHOLD': 24
|
|
},
|
|
]
|
|
})
|
|
|
|
# Ensure that we have a two test executors.
|
|
self.assertEquals(2, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
|
|
|
|
def test_skip_invalid_executor(self):
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'EXECUTOR': 'unknown',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
},
|
|
]
|
|
})
|
|
|
|
self.assertEquals(0, len(self.manager.registered_executors))
|
|
|
|
@async_test
|
|
def test_schedule_job_namespace_filter(self):
|
|
EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'NAMESPACE_WHITELIST': ['something'],
|
|
}]
|
|
})
|
|
|
|
# Try with a build job in an invalid namespace.
|
|
build_job = self._create_build_job(namespace='somethingelse')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
# Try with a valid namespace.
|
|
build_job = self._create_build_job(namespace='something')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
@async_test
|
|
def test_schedule_job_retries_filter(self):
|
|
EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 2,
|
|
}]
|
|
})
|
|
|
|
# Try with a build job that has too few retries.
|
|
build_job = self._create_build_job(retries=1)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
# Try with a valid job.
|
|
build_job = self._create_build_job(retries=2)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
|
|
@async_test
|
|
def test_schedule_job_executor_fallback(self):
|
|
EXECUTORS['primary'] = TestExecutor
|
|
EXECUTORS['secondary'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'EXECUTOR': 'primary',
|
|
'NAMESPACE_WHITELIST': ['something'],
|
|
'MINIMUM_RETRY_THRESHOLD': 3,
|
|
},
|
|
{
|
|
'EXECUTOR': 'secondary',
|
|
'MINIMUM_RETRY_THRESHOLD': 2,
|
|
},
|
|
]
|
|
})
|
|
|
|
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
|
|
build_job = self._create_build_job(namespace='somethingelse')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
|
|
build_job = self._create_build_job(namespace='something', retries=2)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job matching the primary. Should schedule on the primary.
|
|
build_job = self._create_build_job(namespace='something', retries=3)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job not matching either's restrictions.
|
|
build_job = self._create_build_job(namespace='somethingelse', retries=1)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|