diff --git a/buildman/jobutil/buildjob.py b/buildman/jobutil/buildjob.py index dbbb8113f..f5971018f 100644 --- a/buildman/jobutil/buildjob.py +++ b/buildman/jobutil/buildjob.py @@ -25,6 +25,10 @@ class BuildJob(object): 'Could not parse build queue item config with ID %s' % self.job_details['build_uuid'] ) + @property + def retries_remaining(self): + return self.job_item.retries_remaining + def has_retries_remaining(self): return self.job_item.retries_remaining > 0 @@ -58,6 +62,11 @@ class BuildJob(object): raise BuildJobLoadException( 'Could not load repository build with ID %s' % self.job_details['build_uuid']) + @property + def namespace(self): + """ Returns the namespace under which this build is running. """ + return self.repo_build.repository.namespace_user.username + @property def repo_build(self): return self._load_repo_build() diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index fd2745075..e6d3403e8 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -59,7 +59,7 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = None self._etcd_builder_prefix = None - self._etcd_lock_prefix = Nopne + self._etcd_lock_prefix = None self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT self._component_to_job = {} @@ -205,6 +205,10 @@ class EphemeralBuilderManager(BaseManager): self._job_uuid_to_component[build_job.job_details['build_uuid']] = component return component + @property + def registered_executors(self): + return self._executors + @coroutine def _register_existing_realms(self): try: @@ -227,16 +231,26 @@ class EphemeralBuilderManager(BaseManager): # no realms have been registered yet pass + def _load_executor(self, executor_class_name, executor_config): + executor_klass = EXECUTORS.get(executor_class_name) + if executor_klass is None: + logger.error('Unknown executor %s; skipping install', executor_class_name) + return + + self._executors.append(executor_klass(executor_config, self.manager_hostname)) + def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config - # TODO(jschorr): We need to make this backwards compatible with existing config, as well as test(s) - for config in manager_config.get('EXECUTORS', []): - executor_klass = EXECUTORS.get(config['EXECUTOR']) - executor_config = config.get('CONFIG', {}) - executor_config.update(manager_config.get('EXECUTOR_CONFIG', {})) - self._executors.append(executor_klass(executor_config, self.manager_hostname)) + # Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style) + # or as a new set of executor configurations, with the order determining how we fallback. We + # check for both here to ensure backwards compatibility. + if manager_config.get('EXECUTORS'): + for executor_config in manager_config['EXECUTORS']: + self._load_executor(executor_config.get('EXECUTOR'), executor_config) + else: + self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG')) etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1') etcd_port = self._manager_config.get('ETCD_PORT', 2379) @@ -265,14 +279,14 @@ class EphemeralBuilderManager(BaseManager): restarter=self._register_existing_realms) self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') - self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', DEFAULT_EPHEMERAL_API_TIMEOUT) + self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', + DEFAULT_EPHEMERAL_API_TIMEOUT) # Load components for all realms currently known to the cluster async(self._register_existing_realms()) def setup_time(self): - setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) - return setup_time + return self._manager_config.get('MACHINE_SETUP_TIME', 300) def shutdown(self): logger.debug('Shutting down worker.') @@ -343,27 +357,43 @@ class EphemeralBuilderManager(BaseManager): logger.exception('Exception when writing job %s to etcd', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) - started = False + started_with_executor = None logger.debug("executors are: %s", self._executors) + for executor in self._executors: - # TODO(jschorr): gate on whitelist logic executor_type = executor.__class__.__name__ + + # Check if we can use this executor based on its whitelist, by namespace. + namespace = build_job.namespace + if not executor.allowed_for_namespace(namespace): + logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace, + executor_type) + continue + + # Check if we can use this executor based on the retries remaining. + if executor.minimum_retry_threshold > build_job.retries_remaining: + logger.debug('Job %s cannot use executor %s due to not meeting retry threshold', build_uuid, + executor_type) + continue + logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type) try: builder_id = yield From(executor.start_builder(realm, token, build_uuid)) metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count') metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) - started = True + started_with_executor = executor break except: logger.exception('Exception when starting builder for job: %s', build_uuid) continue - if not started: - logger.error('Could not start any ephemeral workers.') + if started_with_executor is None: + logger.error('Could not start ephemeral worker for build %s', build_uuid) raise Return(False, self._ephemeral_api_timeout) + logger.debug('Started builder for job: %s with executor: %s', build_uuid, executor_type) + # Store the builder in etcd associated with the job id try: payload['builder_id'] = builder_id @@ -392,8 +422,7 @@ class EphemeralBuilderManager(BaseManager): logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) raise Return(False, setup_time) - self._job_to_executor[builder_id] = executor - + self._job_to_executor[builder_id] = started_with_executor raise Return(True, None) @coroutine diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index e87aba451..f3b23b07f 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -61,6 +61,20 @@ class BuilderExecutor(object): """ raise NotImplementedError + def allowed_for_namespace(self, namespace): + """ Returns true if this executor can be used for builds in the given namespace. """ + namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST') + if namespace_whitelist is not None: + return namespace in namespace_whitelist + + return True + + @property + def minimum_retry_threshold(self): + """ Returns the minimum number of retries required for this executor to be used or 0 if + none. """ + return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0) + def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, quay_username=None, quay_password=None): if quay_username is None: diff --git a/test/test_buildman.py b/test/test_buildman.py index 012e15254..163ef37ec 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -1,18 +1,15 @@ import unittest import etcd -import os.path import time import json +import uuid -from trollius import coroutine, get_event_loop, From, Future, sleep, Return -from mock import Mock, ANY +from trollius import coroutine, get_event_loop, From, Future, Return +from mock import Mock from threading import Event -from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor -from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction -from buildman.server import BuildJobResult -from buildman.component.buildcomponent import BuildComponent +from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' @@ -27,6 +24,16 @@ def async_test(f): loop.run_until_complete(future) return wrapper + +class TestExecutor(BuilderExecutor): + job_started = None + + @coroutine + def start_builder(self, realm, token, build_uuid): + self.job_started = True + raise Return(str(uuid.uuid4)) + + class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None @@ -41,6 +48,8 @@ class TestEphemeral(unittest.TestCase): self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) + self.etcd_client_mock.read = Mock(side_effect=KeyError) + self.etcd_client_mock.write = Mock() return self.etcd_client_mock def _create_completed_future(self, result=None): @@ -50,13 +59,7 @@ class TestEphemeral(unittest.TestCase): return new_future return inner - def _create_mock_executor(self, *args, **kwargs): - self.test_executor = Mock(spec=BuilderExecutor) - self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) - self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) - return self.test_executor - - def _create_build_job(self): + def _create_build_job(self, namespace='namespace', retries=3): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, @@ -65,13 +68,17 @@ class TestEphemeral(unittest.TestCase): 'body': json.dumps(mock_job.job_details), 'id': 1, } + + mock_job.namespace = namespace + mock_job.retries_remaining = retries return mock_job def setUp(self): - EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor + self._existing_executors = dict(EXECUTORS) self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client + self.etcd_wait_event.clear() self.register_component_callback = Mock() @@ -88,175 +95,174 @@ class TestEphemeral(unittest.TestCase): 30, ) - self.manager.initialize({'EXECUTOR': 'test'}) - - self.mock_job = self._create_build_job() - self.mock_job_key = os.path.join('building/', BUILD_UUID) - def tearDown(self): self.etcd_wait_event.set() - self.manager.shutdown() - del EphemeralBuilderManager.EXECUTORS['test'] + EXECUTORS = self._existing_executors EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass - @coroutine - def _setup_job_for_managers(self): - # Test that we are watching the realm location before anything else happens - self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None) - - self.etcd_client_mock.read = Mock(side_effect=KeyError) - test_component = Mock(spec=BuildComponent) - test_component.builder_realm = REALM_ID - test_component.start_build = Mock(side_effect=self._create_completed_future()) - self.register_component_callback.return_value = test_component - - # Ask for a builder to be scheduled - is_scheduled = yield From(self.manager.schedule(self.mock_job)) - - self.assertTrue(is_scheduled) - - self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True) - self.assertEqual(self.test_executor.start_builder.call_count, 1) - self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) - self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) - - # Right now the job is not registered with any managers because etcd has not accepted the job - self.assertEqual(self.register_component_callback.call_count, 0) - - realm_created = Mock(spec=etcd.EtcdResult) - realm_created.action = EtcdAction.CREATE - realm_created.key = os.path.join('realm/', REALM_ID) - realm_created.value = json.dumps({ - 'realm': REALM_ID, - 'token': 'beef', - 'builder_id': '123', - 'job_queue_item': self.mock_job.job_item, + def test_verify_executor_oldconfig(self): + EXECUTORS['test'] = TestExecutor + self.manager.initialize({ + 'EXECUTOR': 'test', + 'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42) }) - self.manager._handle_realm_change(realm_created) + # Ensure that we have a single test executor. + self.assertEquals(1, len(self.manager.registered_executors)) + self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) - self.assertEqual(self.register_component_callback.call_count, 1) - - raise Return(test_component) - - @async_test - @unittest.skip('this test is flaky on Quay.io builders') - def test_schedule_and_complete(self): - # Test that a job is properly registered with all of the managers - test_component = yield From(self._setup_job_for_managers()) - - # Take the job ourselves - yield From(self.manager.build_component_ready(test_component)) - - self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) - self.etcd_client_mock.delete.reset_mock() - - # Finish the job - yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) - - self.assertEqual(self.test_executor.stop_builder.call_count, 1) - self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) - - @async_test - @unittest.skip('this test is flaky on Quay.io builders') - def test_another_manager_takes_job(self): - # Prepare a job to be taken by another manager - test_component = yield From(self._setup_job_for_managers()) - - realm_deleted = Mock(spec=etcd.EtcdResult) - realm_deleted.action = EtcdAction.DELETE - realm_deleted.key = os.path.join('realm/', REALM_ID) - - realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) - realm_deleted._prev_node.value = json.dumps({ - 'realm': REALM_ID, - 'token': 'beef', - 'builder_id': '123', - 'job_queue_item': self.mock_job.job_item, + def test_verify_executor_newconfig(self): + EXECUTORS['test'] = TestExecutor + self.manager.initialize({ + 'EXECUTORS': [{ + 'EXECUTOR': 'test', + 'MINIMUM_RETRY_THRESHOLD': 42 + }] }) - self.manager._handle_realm_change(realm_deleted) + # Ensure that we have a single test executor. + self.assertEquals(1, len(self.manager.registered_executors)) + self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) - self.unregister_component_callback.assert_called_once_with(test_component) + def test_verify_multiple_executors(self): + EXECUTORS['test'] = TestExecutor + EXECUTORS['anotherexecutor'] = TestExecutor - @async_test - @unittest.skip('this test is flaky on Quay.io builders') - def test_expiring_worker(self): - # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) - - # Send a signal to the callback that a worker has expired - expired_result = Mock(spec=etcd.EtcdResult) - expired_result.action = EtcdAction.EXPIRE - expired_result.key = self.mock_job_key - expired_result._prev_node = Mock(spec=etcd.EtcdResult) - expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) - - yield From(self.manager._handle_builder_expiration(expired_result)) - - self.test_executor.stop_builder.assert_called_once_with('1234') - self.assertEqual(self.test_executor.stop_builder.call_count, 1) - - @async_test - @unittest.skip('this test is flaky on Quay.io builders') - def test_builder_never_starts(self): - test_component = yield From(self._setup_job_for_managers()) - - # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) - - # Send a signal to the callback that a worker has expired - expired_result = Mock(spec=etcd.EtcdResult) - expired_result.action = EtcdAction.EXPIRE - expired_result.key = self.mock_job_key - expired_result._prev_node = Mock(spec=etcd.EtcdResult) - expired_result._prev_node.value = json.dumps({ - 'builder_id': '1234', - 'had_heartbeat': False, - 'job_queue_item': self.mock_job.job_item, + self.manager.initialize({ + 'EXECUTORS': [ + { + 'EXECUTOR': 'test', + 'MINIMUM_RETRY_THRESHOLD': 42 + }, + { + 'EXECUTOR': 'anotherexecutor', + 'MINIMUM_RETRY_THRESHOLD': 24 + }, + ] }) - yield From(self.manager._handle_builder_expiration(expired_result)) + # Ensure that we have a two test executors. + self.assertEquals(2, len(self.manager.registered_executors)) + self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) + self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold) - self.test_executor.stop_builder.assert_called_once_with('1234') - self.assertEqual(self.test_executor.stop_builder.call_count, 1) - - self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) - - @async_test - def test_change_worker(self): - # Send a signal to the callback that a worker key has been changed - set_result = Mock(spec=etcd.EtcdResult) - set_result.action = 'set' - set_result.key = self.mock_job_key - - self.manager._handle_builder_expiration(set_result) - - yield From(sleep(.01)) - - self.assertEquals(self.test_executor.stop_builder.call_count, 0) - - @async_test - def test_heartbeat_response(self): - expiration_timestamp = time.time() + 60 - builder_result = Mock(spec=etcd.EtcdResult) - builder_result.value = json.dumps({ - 'builder_id': '123', - 'expiration': expiration_timestamp, - 'max_expiration': expiration_timestamp, + def test_skip_invalid_executor(self): + self.manager.initialize({ + 'EXECUTORS': [ + { + 'EXECUTOR': 'unknown', + 'MINIMUM_RETRY_THRESHOLD': 42 + }, + ] }) - self.etcd_client_mock.read = Mock(return_value=builder_result) - yield From(self.manager.job_heartbeat(self.mock_job)) + self.assertEquals(0, len(self.manager.registered_executors)) - # Wait for threads to complete - yield From(sleep(.01)) + @async_test + def test_schedule_job_namespace_filter(self): + EXECUTORS['test'] = TestExecutor + self.manager.initialize({ + 'EXECUTORS': [{ + 'EXECUTOR': 'test', + 'NAMESPACE_WHITELIST': ['something'], + }] + }) - self.job_heartbeat_callback.assert_called_once_with(self.mock_job) - self.assertEqual(self.etcd_client_mock.write.call_count, 1) - self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + # Try with a build job in an invalid namespace. + build_job = self._create_build_job(namespace='somethingelse') + result = yield From(self.manager.schedule(build_job)) + self.assertFalse(result[0]) + + # Try with a valid namespace. + build_job = self._create_build_job(namespace='something') + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + @async_test + def test_schedule_job_retries_filter(self): + EXECUTORS['test'] = TestExecutor + self.manager.initialize({ + 'EXECUTORS': [{ + 'EXECUTOR': 'test', + 'MINIMUM_RETRY_THRESHOLD': 2, + }] + }) + + # Try with a build job that has too few retries. + build_job = self._create_build_job(retries=1) + result = yield From(self.manager.schedule(build_job)) + self.assertFalse(result[0]) + + # Try with a valid job. + build_job = self._create_build_job(retries=2) + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + + @async_test + def test_schedule_job_executor_fallback(self): + EXECUTORS['primary'] = TestExecutor + EXECUTORS['secondary'] = TestExecutor + + self.manager.initialize({ + 'EXECUTORS': [ + { + 'EXECUTOR': 'primary', + 'NAMESPACE_WHITELIST': ['something'], + 'MINIMUM_RETRY_THRESHOLD': 3, + }, + { + 'EXECUTOR': 'secondary', + 'MINIMUM_RETRY_THRESHOLD': 2, + }, + ] + }) + + # Try a job not matching the primary's namespace filter. Should schedule on secondary. + build_job = self._create_build_job(namespace='somethingelse') + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + self.assertIsNone(self.manager.registered_executors[0].job_started) + self.assertIsNotNone(self.manager.registered_executors[1].job_started) + + self.manager.registered_executors[0].job_started = None + self.manager.registered_executors[1].job_started = None + + # Try a job not matching the primary's retry minimum. Should schedule on secondary. + build_job = self._create_build_job(namespace='something', retries=2) + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + self.assertIsNone(self.manager.registered_executors[0].job_started) + self.assertIsNotNone(self.manager.registered_executors[1].job_started) + + self.manager.registered_executors[0].job_started = None + self.manager.registered_executors[1].job_started = None + + # Try a job matching the primary. Should schedule on the primary. + build_job = self._create_build_job(namespace='something', retries=3) + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + self.assertIsNotNone(self.manager.registered_executors[0].job_started) + self.assertIsNone(self.manager.registered_executors[1].job_started) + + self.manager.registered_executors[0].job_started = None + self.manager.registered_executors[1].job_started = None + + # Try a job not matching either's restrictions. + build_job = self._create_build_job(namespace='somethingelse', retries=1) + result = yield From(self.manager.schedule(build_job)) + self.assertFalse(result[0]) + + self.assertIsNone(self.manager.registered_executors[0].job_started) + self.assertIsNone(self.manager.registered_executors[1].job_started) + + self.manager.registered_executors[0].job_started = None + self.manager.registered_executors[1].job_started = None if __name__ == '__main__': unittest.main()