This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_buildman.py

686 lines
24 KiB
Python
Raw Normal View History

import unittest
import etcd
import time
import json
import uuid
import os
from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock, ANY
2016-07-14 15:49:01 +00:00
from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
future = coro(*args, **kwargs)
loop = get_event_loop()
loop.run_until_complete(future)
return wrapper
class TestExecutor(BuilderExecutor):
job_started = None
job_stopped = None
@coroutine
def start_builder(self, realm, token, build_uuid):
self.job_started = str(uuid.uuid4())
raise Return(self.job_started)
@coroutine
def stop_builder(self, execution_id):
self.job_stopped = execution_id
2016-07-14 15:49:01 +00:00
class BadExecutor(BuilderExecutor):
@coroutine
def start_builder(self, realm, token, build_uuid):
raise ExecutorException('raised on purpose!')
class EphemeralBuilderTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def create_future(*args, **kwargs):
return Future()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
self.etcd_client_mock.watch = Mock(side_effect=create_future)
self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))
return (self.etcd_client_mock, None)
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def setUp(self):
self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)
def tearDown(self):
EphemeralBuilderManager.EXECUTORS = self._existing_executors
@coroutine
def _register_component(self, realm_spec, build_component, token):
raise Return('hello')
def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
mock_job.job_item = {
'body': json.dumps(mock_job.job_details),
'id': 1,
}
mock_job.namespace = namespace
mock_job.retries_remaining = retries
mock_job.build_uuid = BUILD_UUID
return mock_job
class TestEphemeralLifecycle(EphemeralBuilderTestCase):
""" Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """
def __init__(self, *args, **kwargs):
super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
self.etcd_client_mock = None
self.test_executor = None
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
self.test_executor.name = 'MockExecutor'
self.test_executor.minimum_retry_threshold = 0
return self.test_executor
def setUp(self):
super(TestEphemeralLifecycle, self).setUp()
EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
self.manager.initialize({'EXECUTOR': 'test'})
# Test that we are watching the realm and jobs key once initialized.
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
super(TestEphemeralLifecycle, self).tearDown()
self.manager.shutdown()
@coroutine
def _setup_job_for_managers(self):
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
self.etcd_client_mock.write.reset()
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
# Ensure the job and realm were added to etcd.
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0)
realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1])
realm_data['realm'] = REALM_ID
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
# Fire off a realm changed with the same data.
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps(realm_data)
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
# Ensure that we have at least one component node.
self.assertEquals(1, self.manager.num_workers())
# Ensure that the build info exists.
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
# Ensure that the executor kills the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
# Ensure the build information is cleaned up.
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
self.assertEquals(0, self.manager.num_workers())
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
# Ensure that the executor does not kill the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
# Ensure that we still have the build info, but not the component.
self.assertEquals(0, self.manager.num_workers())
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
# Delete the job once it has "completed".
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.DELETE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
# Ensure the job was removed from the info, but stop was not called.
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_not_started(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
# Since the realm was never registered, expiration should do nothing.
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_started(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_buildjob_deleted(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.DELETE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
self.assertEqual(self.job_complete_callback.call_count, 0)
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_job_expiration_or_delete(set_result)
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))
yield From(self.manager.job_heartbeat(self.mock_job))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
self.assertTrue(job_key_data['had_heartbeat'])
self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])
class TestEphemeral(EphemeralBuilderTestCase):
""" Simple unit tests for the ephemeral builder around config management, starting and stopping
jobs.
"""
def setUp(self):
super(TestEphemeral, self).setUp()
unregister_component_callback = Mock()
job_heartbeat_callback = Mock()
job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
def tearDown(self):
super(TestEphemeral, self).tearDown()
self.manager.shutdown()
def test_verify_executor_oldconfig(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
})
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)
def test_verify_executor_newconfig(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
}]
})
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
def test_multiple_executors_samename(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
with self.assertRaises(Exception):
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'primary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
def test_verify_multiple_executors(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'secondary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
# Ensure that we have a two test executors.
self.assertEquals(2, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
def test_skip_invalid_executor(self):
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'unknown',
'MINIMUM_RETRY_THRESHOLD': 42
},
]
})
self.assertEquals(0, len(self.manager.registered_executors))
@async_test
def test_schedule_job_namespace_filter(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'NAMESPACE_WHITELIST': ['something'],
}]
})
# Try with a build job in an invalid namespace.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid namespace.
build_job = self._create_build_job(namespace='something')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_retries_filter(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 2,
}]
})
# Try with a build job that has too few retries.
build_job = self._create_build_job(retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid job.
build_job = self._create_build_job(retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_executor_fallback(self):
EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3,
},
{
'NAME': 'secondary',
'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2,
},
]
})
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
build_job = self._create_build_job(namespace='something', retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job matching the primary. Should schedule on the primary.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching either's restrictions.
build_job = self._create_build_job(namespace='somethingelse', retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
2016-07-14 15:49:01 +00:00
@async_test
def test_schedule_job_single_executor(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
2016-07-14 15:49:01 +00:00
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.manager.registered_executors[0].job_started = None
build_job = self._create_build_job(namespace='something', retries=0)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.manager.registered_executors[0].job_started = None
@async_test
def test_executor_exception(self):
EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor
2016-07-14 15:49:01 +00:00
self.manager.initialize({
'EXECUTOR': 'bad',
'EXECUTOR_CONFIG': {},
})
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
@async_test
def test_schedule_and_stop(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
# Start the build job.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
executor = self.manager.registered_executors[0]
self.assertIsNotNone(executor.job_started)
# Register the realm so the build information is added.
yield From(self.manager._register_realm({
'realm': str(uuid.uuid4()),
'token': str(uuid.uuid4()),
'execution_id': executor.job_started,
'executor_name': 'TestExecutor',
'build_uuid': build_job.build_uuid,
'job_queue_item': build_job.job_item,
}))
# Stop the build job.
yield From(self.manager.kill_builder_executor(build_job.build_uuid))
self.assertEquals(executor.job_stopped, executor.job_started)
if __name__ == '__main__':
unittest.main()