f50bb8a1ce
This change fixes the build manager ephemeral executor to tell the overall build server to call set_phase when a build never starts. Before this change, we'd properly adjust the queue item, but not the repo build row or the logs, which is why users just saw "Preparing Build Node", with no indicating the node failed to start. Fixes #1904
751 lines
26 KiB
Python
751 lines
26 KiB
Python
import unittest
|
|
import etcd
|
|
import time
|
|
import json
|
|
import uuid
|
|
import os
|
|
|
|
from trollius import coroutine, get_event_loop, From, Future, Return
|
|
from mock import Mock, ANY
|
|
|
|
from buildman.manager.executor import BuilderExecutor, ExecutorException
|
|
from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
|
|
ETCD_MAX_WATCH_TIMEOUT)
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.server import BuildJobResult
|
|
from util.metrics.metricqueue import duration_collector_async
|
|
from app import metric_queue
|
|
|
|
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
|
|
REALM_ID = '1234-realm'
|
|
|
|
|
|
def async_test(f):
|
|
def wrapper(*args, **kwargs):
|
|
coro = coroutine(f)
|
|
future = coro(*args, **kwargs)
|
|
loop = get_event_loop()
|
|
loop.run_until_complete(future)
|
|
return wrapper
|
|
|
|
|
|
class TestExecutor(BuilderExecutor):
|
|
job_started = None
|
|
job_stopped = None
|
|
|
|
@coroutine
|
|
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
|
|
def start_builder(self, realm, token, build_uuid):
|
|
self.job_started = str(uuid.uuid4())
|
|
raise Return(self.job_started)
|
|
|
|
@coroutine
|
|
def stop_builder(self, execution_id):
|
|
self.job_stopped = execution_id
|
|
|
|
|
|
|
|
class BadExecutor(BuilderExecutor):
|
|
@coroutine
|
|
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
|
|
def start_builder(self, realm, token, build_uuid):
|
|
raise ExecutorException('raised on purpose!')
|
|
|
|
|
|
class EphemeralBuilderTestCase(unittest.TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
self.etcd_client_mock = None
|
|
super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)
|
|
|
|
def _create_mock_etcd_client(self, *args, **kwargs):
|
|
def create_future(*args, **kwargs):
|
|
return Future()
|
|
|
|
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
|
|
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
|
self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
|
|
self.etcd_client_mock.watch = Mock(side_effect=create_future)
|
|
self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))
|
|
|
|
return (self.etcd_client_mock, None)
|
|
|
|
def _create_completed_future(self, result=None):
|
|
def inner(*args, **kwargs):
|
|
new_future = Future()
|
|
new_future.set_result(result)
|
|
return new_future
|
|
return inner
|
|
|
|
def setUp(self):
|
|
self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)
|
|
|
|
def tearDown(self):
|
|
EphemeralBuilderManager.EXECUTORS = self._existing_executors
|
|
|
|
@coroutine
|
|
def _register_component(self, realm_spec, build_component, token):
|
|
raise Return('hello')
|
|
|
|
def _create_build_job(self, namespace='namespace', retries=3):
|
|
mock_job = Mock()
|
|
mock_job.job_details = {
|
|
'build_uuid': BUILD_UUID,
|
|
}
|
|
mock_job.job_item = {
|
|
'body': json.dumps(mock_job.job_details),
|
|
'id': 1,
|
|
}
|
|
|
|
mock_job.namespace = namespace
|
|
mock_job.retries_remaining = retries
|
|
mock_job.build_uuid = BUILD_UUID
|
|
return mock_job
|
|
|
|
|
|
|
|
class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|
""" Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
|
|
self.etcd_client_mock = None
|
|
self.test_executor = None
|
|
|
|
def _create_completed_future(self, result=None):
|
|
def inner(*args, **kwargs):
|
|
new_future = Future()
|
|
new_future.set_result(result)
|
|
return new_future
|
|
return inner
|
|
|
|
def _create_mock_executor(self, *args, **kwargs):
|
|
self.test_executor = Mock(spec=BuilderExecutor)
|
|
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
|
|
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
|
|
self.test_executor.name = 'MockExecutor'
|
|
self.test_executor.minimum_retry_threshold = 0
|
|
return self.test_executor
|
|
|
|
def setUp(self):
|
|
super(TestEphemeralLifecycle, self).setUp()
|
|
|
|
EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
|
|
|
|
self.register_component_callback = Mock()
|
|
self.unregister_component_callback = Mock()
|
|
self.job_heartbeat_callback = Mock()
|
|
self.job_complete_callback = Mock()
|
|
|
|
self.manager = EphemeralBuilderManager(
|
|
self.register_component_callback,
|
|
self.unregister_component_callback,
|
|
self.job_heartbeat_callback,
|
|
self.job_complete_callback,
|
|
'127.0.0.1',
|
|
30,
|
|
etcd_creator=self._create_mock_etcd_client,
|
|
)
|
|
|
|
self.manager.initialize({'EXECUTOR': 'test'})
|
|
|
|
# Test that we are watching the realm and jobs key once initialized.
|
|
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT)
|
|
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT)
|
|
|
|
|
|
self.mock_job = self._create_build_job()
|
|
self.mock_job_key = os.path.join('building/', BUILD_UUID)
|
|
|
|
def tearDown(self):
|
|
super(TestEphemeralLifecycle, self).tearDown()
|
|
self.manager.shutdown()
|
|
|
|
|
|
@coroutine
|
|
def _setup_job_for_managers(self):
|
|
self.etcd_client_mock.read = Mock(side_effect=KeyError)
|
|
test_component = Mock(spec=BuildComponent)
|
|
test_component.builder_realm = REALM_ID
|
|
test_component.start_build = Mock(side_effect=self._create_completed_future())
|
|
self.register_component_callback.return_value = test_component
|
|
|
|
# Ask for a builder to be scheduled
|
|
self.etcd_client_mock.write.reset()
|
|
|
|
is_scheduled = yield From(self.manager.schedule(self.mock_job))
|
|
self.assertTrue(is_scheduled)
|
|
self.assertEqual(self.test_executor.start_builder.call_count, 1)
|
|
|
|
# Ensure the job and realm were added to etcd.
|
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
|
self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0)
|
|
realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1])
|
|
realm_data['realm'] = REALM_ID
|
|
|
|
# Right now the job is not registered with any managers because etcd has not accepted the job
|
|
self.assertEqual(self.register_component_callback.call_count, 0)
|
|
|
|
# Fire off a realm changed with the same data.
|
|
realm_created = Mock(spec=etcd.EtcdResult)
|
|
realm_created.action = EtcdAction.CREATE
|
|
realm_created.key = os.path.join('realm/', REALM_ID)
|
|
realm_created.value = json.dumps(realm_data)
|
|
|
|
yield From(self.manager._handle_realm_change(realm_created))
|
|
self.assertEqual(self.register_component_callback.call_count, 1)
|
|
|
|
# Ensure that we have at least one component node.
|
|
self.assertEquals(1, self.manager.num_workers())
|
|
|
|
# Ensure that the build info exists.
|
|
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
|
|
raise Return(test_component)
|
|
|
|
@async_test
|
|
def test_schedule_and_complete(self):
|
|
# Test that a job is properly registered with all of the managers
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
# Take the job ourselves
|
|
yield From(self.manager.build_component_ready(test_component))
|
|
|
|
self.etcd_client_mock.read.assert_called_with(os.path.join('realm/', REALM_ID))
|
|
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
|
|
self.etcd_client_mock.delete.reset_mock()
|
|
|
|
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
|
|
# Finish the job
|
|
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
|
|
|
|
# Ensure that the executor kills the job.
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
|
|
|
|
# Ensure the build information is cleaned up.
|
|
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
self.assertEquals(0, self.manager.num_workers())
|
|
|
|
@async_test
|
|
def test_another_manager_takes_job(self):
|
|
# Prepare a job to be taken by another manager
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
realm_deleted = Mock(spec=etcd.EtcdResult)
|
|
realm_deleted.action = EtcdAction.DELETE
|
|
realm_deleted.key = os.path.join('realm/', REALM_ID)
|
|
|
|
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
|
|
realm_deleted._prev_node.value = json.dumps({
|
|
'realm': REALM_ID,
|
|
'token': 'beef',
|
|
'execution_id': '123',
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
yield From(self.manager._handle_realm_change(realm_deleted))
|
|
|
|
self.unregister_component_callback.assert_called_once_with(test_component)
|
|
|
|
# Ensure that the executor does not kill the job.
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
# Ensure that we still have the build info, but not the component.
|
|
self.assertEquals(0, self.manager.num_workers())
|
|
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
|
|
# Delete the job once it has "completed".
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.DELETE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': False,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
|
|
# Ensure the job was removed from the info, but stop was not called.
|
|
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
@async_test
|
|
def test_job_started_by_other_manager(self):
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
|
|
|
# Send a signal to the callback that the job has been created.
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.CREATE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': False,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
# Ensure the create does nothing.
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
@async_test
|
|
def test_expiring_worker_not_started(self):
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
|
|
|
# Send a signal to the callback that a worker has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.EXPIRE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': True,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
# Since the realm was never registered, expiration should do nothing.
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
@async_test
|
|
def test_expiring_worker_started(self):
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
|
|
|
# Send a signal to the callback that a worker has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.EXPIRE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': True,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
|
|
self.test_executor.stop_builder.assert_called_once_with('123')
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
|
|
@async_test
|
|
def test_buildjob_deleted(self):
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
|
|
|
# Send a signal to the callback that a worker has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.DELETE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': False,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
|
self.assertEqual(self.job_complete_callback.call_count, 0)
|
|
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
|
|
|
@async_test
|
|
def test_builder_never_starts(self):
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
# Test that we are watching before anything else happens
|
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
|
|
|
# Send a signal to the callback that a worker has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.EXPIRE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'had_heartbeat': False,
|
|
'job_queue_item': self.mock_job.job_item,
|
|
})
|
|
|
|
yield From(self.manager._handle_job_change(expired_result))
|
|
|
|
self.test_executor.stop_builder.assert_called_once_with('123')
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
|
|
# Ensure the job was marked as incomplete, with an update_phase to True (so the DB record and
|
|
# logs are updated as well)
|
|
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
|
|
'MockExecutor', update_phase=True)
|
|
|
|
@async_test
|
|
def test_change_worker(self):
|
|
# Send a signal to the callback that a worker key has been changed
|
|
set_result = Mock(sepc=etcd.EtcdResult)
|
|
set_result.action = 'set'
|
|
set_result.key = self.mock_job_key
|
|
|
|
self.manager._handle_job_change(set_result)
|
|
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
|
|
|
@async_test
|
|
def test_realm_expired(self):
|
|
test_component = yield From(self._setup_job_for_managers())
|
|
|
|
# Send a signal to the callback that a realm has expired
|
|
expired_result = Mock(spec=etcd.EtcdResult)
|
|
expired_result.action = EtcdAction.EXPIRE
|
|
expired_result.key = self.mock_job_key
|
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
|
expired_result._prev_node.value = json.dumps({
|
|
'realm': REALM_ID,
|
|
'execution_id': 'foobar',
|
|
'executor_name': 'MockExecutor',
|
|
'job_queue_item': {'body': '{"build_uuid": "fakeid"}'},
|
|
})
|
|
|
|
yield From(self.manager._handle_realm_change(expired_result))
|
|
|
|
# Ensure that the cleanup code for the executor was called.
|
|
self.test_executor.stop_builder.assert_called_once_with('foobar')
|
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
|
|
|
|
|
@async_test
|
|
def test_heartbeat_response(self):
|
|
yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2))
|
|
|
|
@async_test
|
|
def test_heartbeat_future_expiration(self):
|
|
yield From(self.assertHeartbeatWithExpiration(10, 10, ranged=True))
|
|
|
|
@async_test
|
|
def test_heartbeat_expired(self):
|
|
yield From(self.assertHeartbeatWithExpiration(-60, 0))
|
|
|
|
@coroutine
|
|
def assertHeartbeatWithExpiration(self, expires_in_sec, expected_ttl, ranged=False):
|
|
expiration_timestamp = time.time() + expires_in_sec
|
|
builder_result = Mock(spec=etcd.EtcdResult)
|
|
builder_result.value = json.dumps({
|
|
'expiration': expiration_timestamp,
|
|
'max_expiration': expiration_timestamp,
|
|
})
|
|
self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))
|
|
|
|
yield From(self.manager.job_heartbeat(self.mock_job))
|
|
|
|
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
|
|
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
|
|
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
|
|
|
|
job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
|
|
self.assertTrue(job_key_data['had_heartbeat'])
|
|
self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])
|
|
|
|
if not ranged:
|
|
self.assertEquals(expected_ttl, self.etcd_client_mock.write.call_args_list[0][1]['ttl'])
|
|
else:
|
|
self.assertTrue(self.etcd_client_mock.write.call_args_list[0][1]['ttl'] <= expected_ttl)
|
|
|
|
|
|
class TestEphemeral(EphemeralBuilderTestCase):
|
|
""" Simple unit tests for the ephemeral builder around config management, starting and stopping
|
|
jobs.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(TestEphemeral, self).setUp()
|
|
|
|
unregister_component_callback = Mock()
|
|
job_heartbeat_callback = Mock()
|
|
job_complete_callback = Mock()
|
|
|
|
self.manager = EphemeralBuilderManager(
|
|
self._register_component,
|
|
unregister_component_callback,
|
|
job_heartbeat_callback,
|
|
job_complete_callback,
|
|
'127.0.0.1',
|
|
30,
|
|
etcd_creator=self._create_mock_etcd_client,
|
|
)
|
|
|
|
def tearDown(self):
|
|
super(TestEphemeral, self).tearDown()
|
|
self.manager.shutdown()
|
|
|
|
def test_verify_executor_oldconfig(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTOR': 'test',
|
|
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
|
|
})
|
|
|
|
# Ensure that we have a single test executor.
|
|
self.assertEquals(1, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)
|
|
|
|
def test_verify_executor_newconfig(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
}]
|
|
})
|
|
|
|
# Ensure that we have a single test executor.
|
|
self.assertEquals(1, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
|
|
|
|
def test_multiple_executors_samename(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
|
|
|
|
with self.assertRaises(Exception):
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'NAME': 'primary',
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
},
|
|
{
|
|
'NAME': 'primary',
|
|
'EXECUTOR': 'anotherexecutor',
|
|
'MINIMUM_RETRY_THRESHOLD': 24
|
|
},
|
|
]
|
|
})
|
|
|
|
|
|
def test_verify_multiple_executors(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'NAME': 'primary',
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
},
|
|
{
|
|
'NAME': 'secondary',
|
|
'EXECUTOR': 'anotherexecutor',
|
|
'MINIMUM_RETRY_THRESHOLD': 24
|
|
},
|
|
]
|
|
})
|
|
|
|
# Ensure that we have a two test executors.
|
|
self.assertEquals(2, len(self.manager.registered_executors))
|
|
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
|
|
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
|
|
|
|
def test_skip_invalid_executor(self):
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'EXECUTOR': 'unknown',
|
|
'MINIMUM_RETRY_THRESHOLD': 42
|
|
},
|
|
]
|
|
})
|
|
|
|
self.assertEquals(0, len(self.manager.registered_executors))
|
|
|
|
@async_test
|
|
def test_schedule_job_namespace_filter(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'NAMESPACE_WHITELIST': ['something'],
|
|
}]
|
|
})
|
|
|
|
# Try with a build job in an invalid namespace.
|
|
build_job = self._create_build_job(namespace='somethingelse')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
# Try with a valid namespace.
|
|
build_job = self._create_build_job(namespace='something')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
@async_test
|
|
def test_schedule_job_retries_filter(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
self.manager.initialize({
|
|
'EXECUTORS': [{
|
|
'EXECUTOR': 'test',
|
|
'MINIMUM_RETRY_THRESHOLD': 2,
|
|
}]
|
|
})
|
|
|
|
# Try with a build job that has too few retries.
|
|
build_job = self._create_build_job(retries=1)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
# Try with a valid job.
|
|
build_job = self._create_build_job(retries=2)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
|
|
@async_test
|
|
def test_schedule_job_executor_fallback(self):
|
|
EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
|
|
EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTORS': [
|
|
{
|
|
'NAME': 'primary',
|
|
'EXECUTOR': 'primary',
|
|
'NAMESPACE_WHITELIST': ['something'],
|
|
'MINIMUM_RETRY_THRESHOLD': 3,
|
|
},
|
|
{
|
|
'NAME': 'secondary',
|
|
'EXECUTOR': 'secondary',
|
|
'MINIMUM_RETRY_THRESHOLD': 2,
|
|
},
|
|
]
|
|
})
|
|
|
|
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
|
|
build_job = self._create_build_job(namespace='somethingelse')
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
|
|
build_job = self._create_build_job(namespace='something', retries=2)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job matching the primary. Should schedule on the primary.
|
|
build_job = self._create_build_job(namespace='something', retries=3)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
# Try a job not matching either's restrictions.
|
|
build_job = self._create_build_job(namespace='somethingelse', retries=1)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
self.assertIsNone(self.manager.registered_executors[0].job_started)
|
|
self.assertIsNone(self.manager.registered_executors[1].job_started)
|
|
|
|
self.manager.registered_executors[0].job_started = None
|
|
self.manager.registered_executors[1].job_started = None
|
|
|
|
|
|
@async_test
|
|
def test_schedule_job_single_executor(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTOR': 'test',
|
|
'EXECUTOR_CONFIG': {},
|
|
})
|
|
|
|
build_job = self._create_build_job(namespace='something', retries=3)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
|
|
self.manager.registered_executors[0].job_started = None
|
|
|
|
|
|
build_job = self._create_build_job(namespace='something', retries=0)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
|
|
self.manager.registered_executors[0].job_started = None
|
|
|
|
|
|
@async_test
|
|
def test_executor_exception(self):
|
|
EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTOR': 'bad',
|
|
'EXECUTOR_CONFIG': {},
|
|
})
|
|
|
|
build_job = self._create_build_job(namespace='something', retries=3)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertFalse(result[0])
|
|
|
|
|
|
@async_test
|
|
def test_schedule_and_stop(self):
|
|
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
|
|
|
|
self.manager.initialize({
|
|
'EXECUTOR': 'test',
|
|
'EXECUTOR_CONFIG': {},
|
|
})
|
|
|
|
# Start the build job.
|
|
build_job = self._create_build_job(namespace='something', retries=3)
|
|
result = yield From(self.manager.schedule(build_job))
|
|
self.assertTrue(result[0])
|
|
|
|
executor = self.manager.registered_executors[0]
|
|
self.assertIsNotNone(executor.job_started)
|
|
|
|
# Register the realm so the build information is added.
|
|
yield From(self.manager._register_realm({
|
|
'realm': str(uuid.uuid4()),
|
|
'token': str(uuid.uuid4()),
|
|
'execution_id': executor.job_started,
|
|
'executor_name': 'TestExecutor',
|
|
'build_uuid': build_job.build_uuid,
|
|
'job_queue_item': build_job.job_item,
|
|
}))
|
|
|
|
# Stop the build job.
|
|
yield From(self.manager.kill_builder_executor(build_job.build_uuid))
|
|
self.assertEquals(executor.job_stopped, executor.job_started)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|