This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_buildman.py
Joseph Schorr c29f9ccc7f Fix TTL on heartbeat in etcd
Until now, once the heartbeat has expired, we would issue a TTL that is negative, which causes etcd to either raise an exception or simply ignore the expiration (depending on the version of etcd). This change ensures that once the key is expired, it is removed immediately via a set of a TTL of 0. Also adds tests for this case and the normal expiration case.
2016-08-03 11:15:03 -04:00

702 lines
24 KiB
Python

import unittest
import etcd
import time
import json
import uuid
import os
from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock, ANY
from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
future = coro(*args, **kwargs)
loop = get_event_loop()
loop.run_until_complete(future)
return wrapper
class TestExecutor(BuilderExecutor):
job_started = None
job_stopped = None
@coroutine
def start_builder(self, realm, token, build_uuid):
self.job_started = str(uuid.uuid4())
raise Return(self.job_started)
@coroutine
def stop_builder(self, execution_id):
self.job_stopped = execution_id
class BadExecutor(BuilderExecutor):
@coroutine
def start_builder(self, realm, token, build_uuid):
raise ExecutorException('raised on purpose!')
class EphemeralBuilderTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def create_future(*args, **kwargs):
return Future()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
self.etcd_client_mock.watch = Mock(side_effect=create_future)
self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))
return (self.etcd_client_mock, None)
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def setUp(self):
self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)
def tearDown(self):
EphemeralBuilderManager.EXECUTORS = self._existing_executors
@coroutine
def _register_component(self, realm_spec, build_component, token):
raise Return('hello')
def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
mock_job.job_item = {
'body': json.dumps(mock_job.job_details),
'id': 1,
}
mock_job.namespace = namespace
mock_job.retries_remaining = retries
mock_job.build_uuid = BUILD_UUID
return mock_job
class TestEphemeralLifecycle(EphemeralBuilderTestCase):
""" Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """
def __init__(self, *args, **kwargs):
super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
self.etcd_client_mock = None
self.test_executor = None
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
self.test_executor.name = 'MockExecutor'
self.test_executor.minimum_retry_threshold = 0
return self.test_executor
def setUp(self):
super(TestEphemeralLifecycle, self).setUp()
EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
self.manager.initialize({'EXECUTOR': 'test'})
# Test that we are watching the realm and jobs key once initialized.
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
super(TestEphemeralLifecycle, self).tearDown()
self.manager.shutdown()
@coroutine
def _setup_job_for_managers(self):
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
self.etcd_client_mock.write.reset()
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
# Ensure the job and realm were added to etcd.
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0)
realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1])
realm_data['realm'] = REALM_ID
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
# Fire off a realm changed with the same data.
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps(realm_data)
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
# Ensure that we have at least one component node.
self.assertEquals(1, self.manager.num_workers())
# Ensure that the build info exists.
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
# Ensure that the executor kills the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
# Ensure the build information is cleaned up.
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
self.assertEquals(0, self.manager.num_workers())
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
# Ensure that the executor does not kill the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
# Ensure that we still have the build info, but not the component.
self.assertEquals(0, self.manager.num_workers())
self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
# Delete the job once it has "completed".
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.DELETE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
# Ensure the job was removed from the info, but stop was not called.
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_not_started(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
# Since the realm was never registered, expiration should do nothing.
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_started(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_buildjob_deleted(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.DELETE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
self.assertEqual(self.job_complete_callback.call_count, 0)
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_job_expiration_or_delete(set_result)
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2))
@async_test
def test_heartbeat_future_expiration(self):
yield From(self.assertHeartbeatWithExpiration(10, 10, ranged=True))
@async_test
def test_heartbeat_expired(self):
yield From(self.assertHeartbeatWithExpiration(-60, 0))
@coroutine
def assertHeartbeatWithExpiration(self, expires_in_sec, expected_ttl, ranged=False):
expiration_timestamp = time.time() + expires_in_sec
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))
yield From(self.manager.job_heartbeat(self.mock_job))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
self.assertTrue(job_key_data['had_heartbeat'])
self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])
if not ranged:
self.assertEquals(expected_ttl, self.etcd_client_mock.write.call_args_list[0][1]['ttl'])
else:
self.assertTrue(self.etcd_client_mock.write.call_args_list[0][1]['ttl'] <= expected_ttl)
class TestEphemeral(EphemeralBuilderTestCase):
""" Simple unit tests for the ephemeral builder around config management, starting and stopping
jobs.
"""
def setUp(self):
super(TestEphemeral, self).setUp()
unregister_component_callback = Mock()
job_heartbeat_callback = Mock()
job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
def tearDown(self):
super(TestEphemeral, self).tearDown()
self.manager.shutdown()
def test_verify_executor_oldconfig(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
})
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)
def test_verify_executor_newconfig(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
}]
})
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
def test_multiple_executors_samename(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
with self.assertRaises(Exception):
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'primary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
def test_verify_multiple_executors(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'secondary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
# Ensure that we have a two test executors.
self.assertEquals(2, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
def test_skip_invalid_executor(self):
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'unknown',
'MINIMUM_RETRY_THRESHOLD': 42
},
]
})
self.assertEquals(0, len(self.manager.registered_executors))
@async_test
def test_schedule_job_namespace_filter(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'NAMESPACE_WHITELIST': ['something'],
}]
})
# Try with a build job in an invalid namespace.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid namespace.
build_job = self._create_build_job(namespace='something')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_retries_filter(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 2,
}]
})
# Try with a build job that has too few retries.
build_job = self._create_build_job(retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid job.
build_job = self._create_build_job(retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_executor_fallback(self):
EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3,
},
{
'NAME': 'secondary',
'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2,
},
]
})
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
build_job = self._create_build_job(namespace='something', retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job matching the primary. Should schedule on the primary.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching either's restrictions.
build_job = self._create_build_job(namespace='somethingelse', retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
@async_test
def test_schedule_job_single_executor(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.manager.registered_executors[0].job_started = None
build_job = self._create_build_job(namespace='something', retries=0)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.manager.registered_executors[0].job_started = None
@async_test
def test_executor_exception(self):
EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor
self.manager.initialize({
'EXECUTOR': 'bad',
'EXECUTOR_CONFIG': {},
})
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
@async_test
def test_schedule_and_stop(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
# Start the build job.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
executor = self.manager.registered_executors[0]
self.assertIsNotNone(executor.job_started)
# Register the realm so the build information is added.
yield From(self.manager._register_realm({
'realm': str(uuid.uuid4()),
'token': str(uuid.uuid4()),
'execution_id': executor.job_started,
'executor_name': 'TestExecutor',
'build_uuid': build_job.build_uuid,
'job_queue_item': build_job.job_item,
}))
# Stop the build job.
yield From(self.manager.kill_builder_executor(build_job.build_uuid))
self.assertEquals(executor.job_stopped, executor.job_started)
if __name__ == '__main__':
unittest.main()