Add multiple executor and whitelist support to build manager

This commit is contained in:
Joseph Schorr 2016-07-08 14:52:14 -04:00
parent 6bdbe25cdc
commit 811413fe9c
4 changed files with 239 additions and 181 deletions

View file

@ -25,6 +25,10 @@ class BuildJob(object):
'Could not parse build queue item config with ID %s' % self.job_details['build_uuid']
)
@property
def retries_remaining(self):
return self.job_item.retries_remaining
def has_retries_remaining(self):
return self.job_item.retries_remaining > 0
@ -58,6 +62,11 @@ class BuildJob(object):
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self.job_details['build_uuid'])
@property
def namespace(self):
""" Returns the namespace under which this build is running. """
return self.repo_build.repository.namespace_user.username
@property
def repo_build(self):
return self._load_repo_build()

View file

@ -59,7 +59,7 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = None
self._etcd_builder_prefix = None
self._etcd_lock_prefix = Nopne
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {}
@ -205,6 +205,10 @@ class EphemeralBuilderManager(BaseManager):
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
return component
@property
def registered_executors(self):
return self._executors
@coroutine
def _register_existing_realms(self):
try:
@ -227,16 +231,26 @@ class EphemeralBuilderManager(BaseManager):
# no realms have been registered yet
pass
def _load_executor(self, executor_class_name, executor_config):
executor_klass = EXECUTORS.get(executor_class_name)
if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_class_name)
return
self._executors.append(executor_klass(executor_config, self.manager_hostname))
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
# TODO(jschorr): We need to make this backwards compatible with existing config, as well as test(s)
for config in manager_config.get('EXECUTORS', []):
executor_klass = EXECUTORS.get(config['EXECUTOR'])
executor_config = config.get('CONFIG', {})
executor_config.update(manager_config.get('EXECUTOR_CONFIG', {}))
self._executors.append(executor_klass(executor_config, self.manager_hostname))
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
# or as a new set of executor configurations, with the order determining how we fallback. We
# check for both here to ensure backwards compatibility.
if manager_config.get('EXECUTORS'):
for executor_config in manager_config['EXECUTORS']:
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
else:
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
@ -265,14 +279,14 @@ class EphemeralBuilderManager(BaseManager):
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', DEFAULT_EPHEMERAL_API_TIMEOUT)
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
return setup_time
return self._manager_config.get('MACHINE_SETUP_TIME', 300)
def shutdown(self):
logger.debug('Shutting down worker.')
@ -343,27 +357,43 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
started = False
started_with_executor = None
logger.debug("executors are: %s", self._executors)
for executor in self._executors:
# TODO(jschorr): gate on whitelist logic
executor_type = executor.__class__.__name__
# Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor_type)
continue
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
logger.debug('Job %s cannot use executor %s due to not meeting retry threshold', build_uuid,
executor_type)
continue
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
try:
builder_id = yield From(executor.start_builder(realm, token, build_uuid))
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
started = True
started_with_executor = executor
break
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
if not started:
logger.error('Could not start any ephemeral workers.')
if started_with_executor is None:
logger.error('Could not start ephemeral worker for build %s', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started builder for job: %s with executor: %s', build_uuid, executor_type)
# Store the builder in etcd associated with the job id
try:
payload['builder_id'] = builder_id
@ -392,8 +422,7 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time)
self._job_to_executor[builder_id] = executor
self._job_to_executor[builder_id] = started_with_executor
raise Return(True, None)
@coroutine

View file

@ -61,6 +61,20 @@ class BuilderExecutor(object):
"""
raise NotImplementedError
def allowed_for_namespace(self, namespace):
""" Returns true if this executor can be used for builds in the given namespace. """
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
if namespace_whitelist is not None:
return namespace in namespace_whitelist
return True
@property
def minimum_retry_threshold(self):
""" Returns the minimum number of retries required for this executor to be used or 0 if
none. """
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None):
if quay_username is None:

View file

@ -1,18 +1,15 @@
import unittest
import etcd
import os.path
import time
import json
import uuid
from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock, ANY
from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock
from threading import Event
from urllib3.exceptions import ReadTimeoutError
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
@ -27,6 +24,16 @@ def async_test(f):
loop.run_until_complete(future)
return wrapper
class TestExecutor(BuilderExecutor):
job_started = None
@coroutine
def start_builder(self, realm, token, build_uuid):
self.job_started = True
raise Return(str(uuid.uuid4))
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
@ -41,6 +48,8 @@ class TestEphemeral(unittest.TestCase):
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.write = Mock()
return self.etcd_client_mock
def _create_completed_future(self, result=None):
@ -50,13 +59,7 @@ class TestEphemeral(unittest.TestCase):
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
return self.test_executor
def _create_build_job(self):
def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
@ -65,13 +68,17 @@ class TestEphemeral(unittest.TestCase):
'body': json.dumps(mock_job.job_details),
'id': 1,
}
mock_job.namespace = namespace
mock_job.retries_remaining = retries
return mock_job
def setUp(self):
EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
self._existing_executors = dict(EXECUTORS)
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
@ -88,175 +95,174 @@ class TestEphemeral(unittest.TestCase):
30,
)
self.manager.initialize({'EXECUTOR': 'test'})
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
self.etcd_wait_event.set()
self.manager.shutdown()
del EphemeralBuilderManager.EXECUTORS['test']
EXECUTORS = self._existing_executors
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
def test_verify_executor_oldconfig(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
})
self.manager._handle_realm_change(realm_created)
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEqual(self.register_component_callback.call_count, 1)
raise Return(test_component)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
def test_verify_executor_newconfig(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
}]
})
self.manager._handle_realm_change(realm_deleted)
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.unregister_component_callback.assert_called_once_with(test_component)
def test_verify_multiple_executors(self):
EXECUTORS['test'] = TestExecutor
EXECUTORS['anotherexecutor'] = TestExecutor
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
yield From(self.manager._handle_builder_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'builder_id': '1234',
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
yield From(self.manager._handle_builder_expiration(expired_result))
# Ensure that we have a two test executors.
self.assertEquals(2, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(spec=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_builder_expiration(set_result)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'builder_id': '123',
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
def test_skip_invalid_executor(self):
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'unknown',
'MINIMUM_RETRY_THRESHOLD': 42
},
]
})
self.etcd_client_mock.read = Mock(return_value=builder_result)
yield From(self.manager.job_heartbeat(self.mock_job))
self.assertEquals(0, len(self.manager.registered_executors))
# Wait for threads to complete
yield From(sleep(.01))
@async_test
def test_schedule_job_namespace_filter(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'NAMESPACE_WHITELIST': ['something'],
}]
})
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
# Try with a build job in an invalid namespace.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid namespace.
build_job = self._create_build_job(namespace='something')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_retries_filter(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 2,
}]
})
# Try with a build job that has too few retries.
build_job = self._create_build_job(retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid job.
build_job = self._create_build_job(retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_executor_fallback(self):
EXECUTORS['primary'] = TestExecutor
EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3,
},
{
'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2,
},
]
})
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
build_job = self._create_build_job(namespace='something', retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job matching the primary. Should schedule on the primary.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching either's restrictions.
build_job = self._create_build_job(namespace='somethingelse', retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
if __name__ == '__main__':
unittest.main()