diff --git a/buildman/jobutil/buildjob.py b/buildman/jobutil/buildjob.py index f5971018f..eecac1000 100644 --- a/buildman/jobutil/buildjob.py +++ b/buildman/jobutil/buildjob.py @@ -57,10 +57,15 @@ class BuildJob(object): @lru_cache(maxsize=1) def _load_repo_build(self): try: - return model.build.get_repository_build(self.job_details['build_uuid']) + return model.build.get_repository_build(self.build_uuid) except model.InvalidRepositoryBuildException: raise BuildJobLoadException( - 'Could not load repository build with ID %s' % self.job_details['build_uuid']) + 'Could not load repository build with ID %s' % self.build_uuid) + + @property + def build_uuid(self): + """ Returns the unique UUID for this build job. """ + return self.job_details['build_uuid'] @property def namespace(self): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 298cd681f..a2e178d17 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -5,6 +5,7 @@ import calendar import os.path import json +from collections import namedtuple from datetime import datetime, timedelta from trollius import From, coroutine, Return, async from concurrent.futures import ThreadPoolExecutor @@ -28,12 +29,6 @@ RETRY_IMMEDIATELY_TIMEOUT = 0 NO_WORKER_AVAILABLE_TIMEOUT = 10 DEFAULT_EPHEMERAL_API_TIMEOUT = 20 -EXECUTORS = { - 'popen': PopenExecutor, - 'ec2': EC2Executor, - 'kubernetes': KubernetesExecutor, -} - class EtcdAction(object): GET = 'get' SET = 'set' @@ -44,13 +39,28 @@ class EtcdAction(object): COMPARE_AND_SWAP = 'compareAndSwap' COMPARE_AND_DELETE = 'compareAndDelete' +BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name']) + +def _create_async_etcd_client(worker_threads=1, **kwargs): + client = etcd.Client(**kwargs) + async_executor = ThreadPoolExecutor(worker_threads) + return AsyncWrapper(client, executor=async_executor), async_executor + class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ - _etcd_client_klass = etcd.Client + EXECUTORS = { + 'popen': PopenExecutor, + 'ec2': EC2Executor, + 'kubernetes': KubernetesExecutor, + } def __init__(self, *args, **kwargs): + self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client) + + super(EphemeralBuilderManager, self).__init__(*args, **kwargs) + self._shutting_down = False self._manager_config = None @@ -58,22 +68,24 @@ class EphemeralBuilderManager(BaseManager): self._etcd_client = None self._etcd_realm_prefix = None - self._etcd_builder_prefix = None + self._etcd_job_prefix = None - self._etcd_lock_prefix = None self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT - self._component_to_job = {} - self._job_uuid_to_component = {} - self._component_to_builder = {} - self._job_to_executor = {} + # The registered executors available for running jobs, in order. + self._ordered_executors = [] - self._executors = [] + # The registered executors, mapped by their unique name. + self._executor_name_to_executor = {} # Map of etcd keys being watched to the tasks watching them self._watch_tasks = {} - super(EphemeralBuilderManager, self).__init__(*args, **kwargs) + # Map from builder component to its associated job. + self._component_to_job = {} + + # Map from build UUID to a BuildInfo tuple with information about the build. + self._build_uuid_to_info = {} def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True, restarter=None): @@ -129,7 +141,6 @@ class EphemeralBuilderManager(BaseManager): if not self._shutting_down: logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key, '*' if recursive else '', start_index) - watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index, timeout=ETCD_MAX_WATCH_TIMEOUT) watch_future.add_done_callback(callback_wrapper) @@ -137,34 +148,37 @@ class EphemeralBuilderManager(BaseManager): self._watch_tasks[watch_task_key] = async(watch_future) @coroutine - def _handle_builder_expiration(self, etcd_result): + def _handle_job_expiration(self, etcd_result): + """ Handler invoked whenever a job expires in etcd. """ if etcd_result is None: return - if etcd_result.action == EtcdAction.EXPIRE: - # Handle the expiration - logger.debug('Builder expired, clean up the old build node') - job_metadata = json.loads(etcd_result._prev_node.value) + if etcd_result.action != EtcdAction.EXPIRE: + return - if 'builder_id' in job_metadata: - builder_id = job_metadata['builder_id'] + # Handle the expiration + logger.debug('Builder expired, clean up the old build node') + job_metadata = json.loads(etcd_result._prev_node.value) + build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) + build_info = self._build_uuid_to_info.get(build_job.build_uuid) + if build_info is None: + logger.error('Could not find build info for job %s under etcd expire with metadata: %s', + build_job.build_uuid, job_metadata) + return - # Before we delete the build node, we take a lock to make sure that only one manager - # can terminate the node. - try: - lock_key = self._etcd_lock_key(builder_id) - yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time())) - except (KeyError, etcd.EtcdKeyError): - logger.debug('Somebody else is cleaning up the build node: %s', builder_id) - return + execution_id = build_info.execution_id - if not job_metadata.get('had_heartbeat', True): - logger.warning('Build node failed to successfully boot: %s', builder_id) - build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) - self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) + # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark + # the job as incomplete here. + if not job_metadata.get('had_heartbeat', True): + logger.warning('Build executor failed to successfully boot with execution id %s', + execution_id) + self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) - logger.info('Terminating expired build node: %s', builder_id) - yield From(self._job_to_executor[builder_id].stop_builder(builder_id)) + # Finally, we terminate the build execution for the job. + logger.info('Terminating expired build executor for job %s with execution id %s', + build_job.build_uuid, execution_id) + yield From(self.kill_builder_executor(build_job.build_uuid)) def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -180,13 +194,14 @@ class EphemeralBuilderManager(BaseManager): # connection realm_spec = json.loads(etcd_result._prev_node.value) build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) - component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None) - if component is not None: + build_uuid = build_job.build_uuid + + build_info = self._build_uuid_to_info.pop(build_uuid, None) + if build_info is not None: # We were not the manager which the worker connected to, remove the bookkeeping for it - logger.debug('Unregistering unused component on realm: %s', realm_spec['realm']) - del self._component_to_job[component] - del self._component_to_builder[component] - self.unregister_component(component) + logger.debug('Unregistering unused component for build %s', build_uuid) + self._component_to_job.pop(build_info.component, None) + self.unregister_component(build_info.component) else: logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) @@ -200,15 +215,23 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Realm already registered with manager: %s', realm_spec['realm']) return component + # Create the build information block for the registered realm. build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) + + # TODO(jschorr): Remove the back-compat lookups once we've finished the rollout. + execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None)) + executor_name = realm_spec.get('executor_name', 'EC2Executor') + + build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id, + executor_name=executor_name) + self._component_to_job[component] = build_job - self._component_to_builder[component] = realm_spec['builder_id'] - self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + self._build_uuid_to_info[build_job.build_uuid] = build_info return component @property def registered_executors(self): - return self._executors + return self._ordered_executors @coroutine def _register_existing_realms(self): @@ -223,22 +246,27 @@ class EphemeralBuilderManager(BaseManager): encountered.add(component) # Remove any components not encountered so we can clean up. - for found in list(self._component_to_job.keys()): - if not found in encountered: - self._component_to_job.pop(component) - self._component_to_builder.pop(component) + for component, job in list(self._component_to_job.items()): + if not component in encountered: + self._component_to_job.pop(component, None) + self._build_uuid_to_info.pop(job.build_uuid, None) except (KeyError, etcd.EtcdKeyError): # no realms have been registered yet pass - def _load_executor(self, executor_class_name, executor_config): - executor_klass = EXECUTORS.get(executor_class_name) + def _load_executor(self, executor_kind_name, executor_config): + executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name) if executor_klass is None: - logger.error('Unknown executor %s; skipping install', executor_class_name) + logger.error('Unknown executor %s; skipping install', executor_kind_name) return - self._executors.append(executor_klass(executor_config, self.manager_hostname)) + executor = executor_klass(executor_config, self.manager_hostname) + if executor.name in self._executor_name_to_executor: + raise Exception('Executor with name %s already registered' % executor.name) + + self._ordered_executors.append(executor) + self._executor_name_to_executor[executor.name] = executor def initialize(self, manager_config): logger.debug('Calling initialize') @@ -265,21 +293,17 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5) - self._async_thread_executor = ThreadPoolExecutor(worker_threads) - self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port, - cert=etcd_auth, ca_cert=etcd_ca_cert, - protocol=etcd_protocol, - read_timeout=5), - executor=self._async_thread_executor) + (self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads, + host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert, + protocol=etcd_protocol, read_timeout=5) - self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/') - self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration) + self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/') + self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration) self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, restarter=self._register_existing_realms) - self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', DEFAULT_EPHEMERAL_API_TIMEOUT) @@ -310,8 +334,8 @@ class EphemeralBuilderManager(BaseManager): # Check if there are worker slots avialable by checking the number of jobs in etcd allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1) try: - building = yield From(self._etcd_client.read(self._etcd_builder_prefix, recursive=True)) - workers_alive = sum(1 for child in building.children if not child.dir) + active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True)) + workers_alive = sum(1 for child in active_jobs.children if not child.dir) except (KeyError, etcd.EtcdKeyError): workers_alive = 0 except etcd.EtcdException: @@ -359,40 +383,40 @@ class EphemeralBuilderManager(BaseManager): raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) started_with_executor = None - builder_id = None + execution_id = None - logger.debug("Registered executors are: %s", [ex.__class__.__name__ for ex in self._executors]) - for executor in self._executors: - executor_type = executor.__class__.__name__ + logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors]) + for executor in self._ordered_executors: # Check if we can use this executor based on its whitelist, by namespace. namespace = build_job.namespace if not executor.allowed_for_namespace(namespace): logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace, - executor_type) + executor.name) continue # Check if we can use this executor based on the retries remaining. if executor.minimum_retry_threshold > build_job.retries_remaining: - logger.debug('Job %s cannot use executor %s as it is below retry threshold (retry #: %s)', - build_uuid, executor_type, build_job.retries_remaining) + logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)', + build_uuid, executor.name, executor.minimum_retry_threshold, + build_job.retries_remaining) continue logger.debug('Starting builder for job %s with selected executor: %s', build_uuid, - executor_type) + executor.name) try: - builder_id = yield From(executor.start_builder(realm, token, build_uuid)) + execution_id = yield From(executor.start_builder(realm, token, build_uuid)) except: logger.exception('Exception when starting builder for job: %s', build_uuid) continue try: metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count') - metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) + metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid]) except: - logger.exception('Exception when writing start metrics for builder %s for job %s', - builder_id, build_uuid) + logger.exception('Exception when writing start metrics for execution %s for job %s', + execution_id, build_uuid) started_with_executor = executor @@ -403,24 +427,19 @@ class EphemeralBuilderManager(BaseManager): logger.error('Could not start ephemeral worker for build %s', build_uuid) raise Return(False, self._ephemeral_api_timeout) - logger.debug('Started builder with ID %s for job: %s with executor: %s', builder_id, build_uuid, - started_with_executor.__class__.__name__) - - # Store the builder in etcd associated with the job id - try: - payload['builder_id'] = builder_id - yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload, - ttl=setup_time)) - except etcd.EtcdException: - logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False, self._ephemeral_api_timeout) + logger.debug('Started execution with ID %s for job: %s with executor: %s', + execution_id, build_uuid, started_with_executor.name) # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ 'realm': realm, 'token': token, - 'builder_id': builder_id, + 'execution_id': execution_id, + 'executor_name': started_with_executor.name, 'job_queue_item': build_job.job_item, + + # TODO: remove this back-compat field once we finish the rollout. + 'builder_id': execution_id, }) try: @@ -434,22 +453,27 @@ class EphemeralBuilderManager(BaseManager): logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) raise Return(False, setup_time) - self._job_to_executor[builder_id] = started_with_executor - logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid, - started_with_executor.__class__.__name__, builder_id) + started_with_executor.name, execution_id) raise Return(True, None) @coroutine def build_component_ready(self, build_component): try: - # Clean up the bookkeeping for allowing any manager to take the job + # Pop off the job for the component. We do so before we send out the etcd watch below, + # as it will also remove this mapping. job = self._component_to_job.pop(build_component) - del self._job_uuid_to_component[job.job_details['build_uuid']] + if job is None: + logger.error('Could not find job for the build component on realm %s', + build_component.builder_realm) + return + + # Clean up the bookkeeping for allowing any manager to take the job. yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm))) + # Start the build job. logger.debug('Sending build %s to newly ready component on realm %s', - job.job_details['build_uuid'], build_component.builder_realm) + job.build_uuid, build_component.builder_realm) yield From(build_component.start_build(job)) except (KeyError, etcd.EtcdKeyError): logger.warning('Builder is asking for more work, but work already completed') @@ -460,21 +484,46 @@ class EphemeralBuilderManager(BaseManager): @coroutine def job_completed(self, build_job, job_status, build_component): - logger.debug('Calling job_completed with status: %s', job_status) + logger.debug('Calling job_completed for job %s with status: %s', + build_job.build_uuid, job_status) - # Kill the ephmeral builder - builder_id = self._component_to_builder.pop(build_component) - yield From(self._job_to_executor[builder_id].stop_builder(builder_id)) - del self._job_to_executor[builder_id] + # Mark the job as completed. + self.job_complete_callback(build_job, job_status) - # Release the lock in etcd + # Kill the ephmeral builder. + yield From(self.kill_builder_executor(build_job.build_uuid)) + + # Delete the build job from etcd. job_key = self._etcd_job_key(build_job) try: yield From(self._etcd_client.delete(job_key)) except (KeyError, etcd.EtcdKeyError): logger.debug('Builder is asking for job to be removed, but work already completed') - self.job_complete_callback(build_job, job_status) + logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status) + + @coroutine + def kill_builder_executor(self, build_uuid): + logger.info('Starting termination of executor for job %s', build_uuid) + build_info = self._build_uuid_to_info.pop(build_uuid, None) + if build_info is None: + logger.error('Could not find build information for build %s', build_uuid) + return + + # Remove the build's component. + self._component_to_job.pop(build_info.component, None) + + # Stop the build node/executor itself. + executor = self._executor_name_to_executor.get(build_info.executor_name) + if executor is None: + logger.error('Could not find registered executor %s for build %s', + build_info.executor_name, build_uuid) + return + + # Terminate the executor's execution. + logger.info('Terminating executor for job %s with execution id %s', + build_uuid, build_info.execution_id) + yield From(executor.stop_builder(build_info.execution_id)) @coroutine def job_heartbeat(self, build_job): @@ -484,7 +533,7 @@ class EphemeralBuilderManager(BaseManager): try: build_job_metadata_response = yield From(self._etcd_client.read(job_key)) except (KeyError, etcd.EtcdKeyError): - logger.info('Job %s no longer exists in etcd', build_job.job_details['build_uuid']) + logger.info('Job %s no longer exists in etcd', build_job.build_uuid) return build_job_metadata = json.loads(build_job_metadata_response.value) @@ -498,25 +547,18 @@ class EphemeralBuilderManager(BaseManager): payload = { 'expiration': calendar.timegm(new_expiration.timetuple()), - 'builder_id': build_job_metadata['builder_id'], 'job_queue_item': build_job.job_item, 'max_expiration': build_job_metadata['max_expiration'], 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) - self.job_heartbeat_callback(build_job) def _etcd_job_key(self, build_job): """ Create a key which is used to track a job in etcd. """ - return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) - - def _etcd_lock_key(self, unique_lock_id): - """ Create a key which is used to create a temporary lock in etcd. - """ - return os.path.join(self._etcd_lock_prefix, unique_lock_id) + return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid']) def _etcd_realm_key(self, realm): """ Create a key which is used to track an incoming connection on a realm. @@ -526,4 +568,4 @@ class EphemeralBuilderManager(BaseManager): def num_workers(self): """ Return the number of workers we're managing locally. """ - return len(self._component_to_builder) + return len(self._component_to_job) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index f6ea79667..82a438fb5 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -48,6 +48,11 @@ class BuilderExecutor(object): default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws' self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme) + @property + def name(self): + """ Name returns the unique name for this executor. """ + return self.executor_config.get('NAME') or self.__class__.__name__ + @coroutine def start_builder(self, realm, token, build_uuid): """ Create a builder with the specified config. Returns a unique id which can be used to manage diff --git a/test/test_buildman.py b/test/test_buildman.py index b85e4fa05..100ca44ea 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -3,13 +3,16 @@ import etcd import time import json import uuid +import os from trollius import coroutine, get_event_loop, From, Future, Return -from mock import Mock -from threading import Event +from mock import Mock, ANY from buildman.manager.executor import BuilderExecutor, ExecutorException -from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS +from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction, + ETCD_MAX_WATCH_TIMEOUT) +from buildman.component.buildcomponent import BuildComponent +from buildman.server import BuildJobResult BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' @@ -27,11 +30,16 @@ def async_test(f): class TestExecutor(BuilderExecutor): job_started = None + job_stopped = None @coroutine def start_builder(self, realm, token, build_uuid): - self.job_started = True - raise Return(str(uuid.uuid4)) + self.job_started = str(uuid.uuid4()) + raise Return(self.job_started) + + @coroutine + def stop_builder(self, execution_id): + self.job_stopped = execution_id @@ -41,23 +49,22 @@ class BadExecutor(BuilderExecutor): raise ExecutorException('raised on purpose!') -class TestEphemeral(unittest.TestCase): +class EphemeralBuilderTestCase(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None - self.etcd_wait_event = Event() - self.test_executor = None - super(TestEphemeral, self).__init__(*args, **kwargs) + super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs) def _create_mock_etcd_client(self, *args, **kwargs): - def hang_until_event(*args, **kwargs): - time.sleep(.01) # 10ms to simulate network latency - self.etcd_wait_event.wait() + def create_future(*args, **kwargs): + return Future() self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') - self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) self.etcd_client_mock.read = Mock(side_effect=KeyError) - self.etcd_client_mock.write = Mock() - return self.etcd_client_mock + self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future()) + self.etcd_client_mock.watch = Mock(side_effect=create_future) + self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id')) + + return (self.etcd_client_mock, None) def _create_completed_future(self, result=None): def inner(*args, **kwargs): @@ -66,6 +73,16 @@ class TestEphemeral(unittest.TestCase): return new_future return inner + def setUp(self): + self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS) + + def tearDown(self): + EphemeralBuilderManager.EXECUTORS = self._existing_executors + + @coroutine + def _register_component(self, realm_spec, build_component, token): + raise Return('hello') + def _create_build_job(self, namespace='namespace', retries=3): mock_job = Mock() mock_job.job_details = { @@ -78,15 +95,38 @@ class TestEphemeral(unittest.TestCase): mock_job.namespace = namespace mock_job.retries_remaining = retries + mock_job.build_uuid = BUILD_UUID return mock_job + + +class TestEphemeralLifecycle(EphemeralBuilderTestCase): + """ Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """ + + def __init__(self, *args, **kwargs): + super(TestEphemeralLifecycle, self).__init__(*args, **kwargs) + self.etcd_client_mock = None + self.test_executor = None + + def _create_completed_future(self, result=None): + def inner(*args, **kwargs): + new_future = Future() + new_future.set_result(result) + return new_future + return inner + + def _create_mock_executor(self, *args, **kwargs): + self.test_executor = Mock(spec=BuilderExecutor) + self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) + self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) + self.test_executor.name = 'MockExecutor' + self.test_executor.minimum_retry_threshold = 0 + return self.test_executor + def setUp(self): - self._existing_executors = dict(EXECUTORS) + super(TestEphemeralLifecycle, self).setUp() - self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass - EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client - - self.etcd_wait_event.clear() + EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor self.register_component_callback = Mock() self.unregister_component_callback = Mock() @@ -100,17 +140,231 @@ class TestEphemeral(unittest.TestCase): self.job_complete_callback, '127.0.0.1', 30, + etcd_creator=self._create_mock_etcd_client, + ) + + self.manager.initialize({'EXECUTOR': 'test'}) + + # Test that we are watching the realm and jobs key once initialized. + self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None, + timeout=ETCD_MAX_WATCH_TIMEOUT) + + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None, + timeout=ETCD_MAX_WATCH_TIMEOUT) + + + self.mock_job = self._create_build_job() + self.mock_job_key = os.path.join('building/', BUILD_UUID) + + def tearDown(self): + super(TestEphemeralLifecycle, self).tearDown() + self.manager.shutdown() + + + @coroutine + def _setup_job_for_managers(self): + self.etcd_client_mock.read = Mock(side_effect=KeyError) + test_component = Mock(spec=BuildComponent) + test_component.builder_realm = REALM_ID + test_component.start_build = Mock(side_effect=self._create_completed_future()) + self.register_component_callback.return_value = test_component + + # Ask for a builder to be scheduled + self.etcd_client_mock.write.reset() + + is_scheduled = yield From(self.manager.schedule(self.mock_job)) + self.assertTrue(is_scheduled) + self.assertEqual(self.test_executor.start_builder.call_count, 1) + + # Ensure the job and realm were added to etcd. + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0) + realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1]) + realm_data['realm'] = REALM_ID + + # Right now the job is not registered with any managers because etcd has not accepted the job + self.assertEqual(self.register_component_callback.call_count, 0) + + # Fire off a realm changed with the same data. + realm_created = Mock(spec=etcd.EtcdResult) + realm_created.action = EtcdAction.CREATE + realm_created.key = os.path.join('realm/', REALM_ID) + realm_created.value = json.dumps(realm_data) + + self.manager._handle_realm_change(realm_created) + self.assertEqual(self.register_component_callback.call_count, 1) + + # Ensure that we have at least one component node. + self.assertEquals(1, self.manager.num_workers()) + + raise Return(test_component) + + @async_test + def test_schedule_and_complete(self): + # Test that a job is properly registered with all of the managers + test_component = yield From(self._setup_job_for_managers()) + + # Take the job ourselves + yield From(self.manager.build_component_ready(test_component)) + + self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) + self.etcd_client_mock.delete.reset_mock() + + # Finish the job + yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) + + # Ensure that the executor kills the job. + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) + + @async_test + def test_another_manager_takes_job(self): + # Prepare a job to be taken by another manager + test_component = yield From(self._setup_job_for_managers()) + + realm_deleted = Mock(spec=etcd.EtcdResult) + realm_deleted.action = EtcdAction.DELETE + realm_deleted.key = os.path.join('realm/', REALM_ID) + + realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) + realm_deleted._prev_node.value = json.dumps({ + 'realm': REALM_ID, + 'token': 'beef', + 'builder_id': '123', + 'job_queue_item': self.mock_job.job_item, + }) + + self.manager._handle_realm_change(realm_deleted) + + self.unregister_component_callback.assert_called_once_with(test_component) + + @async_test + def test_expiring_worker_not_started(self): + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, + timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': True, + 'job_queue_item': self.mock_job.job_item, + }) + + # Since the realm was never registered, expiration should do nothing. + yield From(self.manager._handle_job_expiration(expired_result)) + self.assertEqual(self.test_executor.stop_builder.call_count, 0) + + @async_test + def test_expiring_worker_started(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, + timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': True, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_job_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('123') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + @async_test + def test_builder_never_starts(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, + timeout=ETCD_MAX_WATCH_TIMEOUT, index=None) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, + }) + + yield From(self.manager._handle_job_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('123') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) + + @async_test + def test_change_worker(self): + # Send a signal to the callback that a worker key has been changed + set_result = Mock(sepc=etcd.EtcdResult) + set_result.action = 'set' + set_result.key = self.mock_job_key + + self.manager._handle_job_expiration(set_result) + self.assertEquals(self.test_executor.stop_builder.call_count, 0) + + @async_test + def test_heartbeat_response(self): + expiration_timestamp = time.time() + 60 + builder_result = Mock(spec=etcd.EtcdResult) + builder_result.value = json.dumps({ + 'expiration': expiration_timestamp, + 'max_expiration': expiration_timestamp, + }) + self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result)) + + yield From(self.manager.job_heartbeat(self.mock_job)) + + self.job_heartbeat_callback.assert_called_once_with(self.mock_job) + self.assertEqual(self.etcd_client_mock.write.call_count, 1) + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + + job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1]) + self.assertTrue(job_key_data['had_heartbeat']) + self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item']) + + +class TestEphemeral(EphemeralBuilderTestCase): + """ Simple unit tests for the ephemeral builder around config management, starting and stopping + jobs. + """ + + def setUp(self): + super(TestEphemeral, self).setUp() + + unregister_component_callback = Mock() + job_heartbeat_callback = Mock() + job_complete_callback = Mock() + + self.manager = EphemeralBuilderManager( + self._register_component, + unregister_component_callback, + job_heartbeat_callback, + job_complete_callback, + '127.0.0.1', + 30, + etcd_creator=self._create_mock_etcd_client, ) def tearDown(self): - self.etcd_wait_event.set() + super(TestEphemeral, self).tearDown() self.manager.shutdown() - EXECUTORS = self._existing_executors - EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass - def test_verify_executor_oldconfig(self): - EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTOR': 'test', 'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42) @@ -119,9 +373,10 @@ class TestEphemeral(unittest.TestCase): # Ensure that we have a single test executor. self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) + self.assertEquals('TestExecutor', self.manager.registered_executors[0].name) def test_verify_executor_newconfig(self): - EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', @@ -133,17 +388,41 @@ class TestEphemeral(unittest.TestCase): self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) + + def test_multiple_executors_samename(self): + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor + + with self.assertRaises(Exception): + self.manager.initialize({ + 'EXECUTORS': [ + { + 'NAME': 'primary', + 'EXECUTOR': 'test', + 'MINIMUM_RETRY_THRESHOLD': 42 + }, + { + 'NAME': 'primary', + 'EXECUTOR': 'anotherexecutor', + 'MINIMUM_RETRY_THRESHOLD': 24 + }, + ] + }) + + def test_verify_multiple_executors(self): - EXECUTORS['test'] = TestExecutor - EXECUTORS['anotherexecutor'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [ { + 'NAME': 'primary', 'EXECUTOR': 'test', 'MINIMUM_RETRY_THRESHOLD': 42 }, { + 'NAME': 'secondary', 'EXECUTOR': 'anotherexecutor', 'MINIMUM_RETRY_THRESHOLD': 24 }, @@ -169,7 +448,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_schedule_job_namespace_filter(self): - EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', @@ -189,7 +468,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_schedule_job_retries_filter(self): - EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [{ 'EXECUTOR': 'test', @@ -210,17 +489,19 @@ class TestEphemeral(unittest.TestCase): @async_test def test_schedule_job_executor_fallback(self): - EXECUTORS['primary'] = TestExecutor - EXECUTORS['secondary'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor self.manager.initialize({ 'EXECUTORS': [ { + 'NAME': 'primary', 'EXECUTOR': 'primary', 'NAMESPACE_WHITELIST': ['something'], 'MINIMUM_RETRY_THRESHOLD': 3, }, { + 'NAME': 'secondary', 'EXECUTOR': 'secondary', 'MINIMUM_RETRY_THRESHOLD': 2, }, @@ -274,7 +555,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_schedule_job_single_executor(self): - EXECUTORS['test'] = TestExecutor + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor self.manager.initialize({ 'EXECUTOR': 'test', @@ -299,7 +580,7 @@ class TestEphemeral(unittest.TestCase): @async_test def test_executor_exception(self): - EXECUTORS['bad'] = BadExecutor + EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor self.manager.initialize({ 'EXECUTOR': 'bad', @@ -311,6 +592,38 @@ class TestEphemeral(unittest.TestCase): self.assertFalse(result[0]) + @async_test + def test_schedule_and_stop(self): + EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor + + self.manager.initialize({ + 'EXECUTOR': 'test', + 'EXECUTOR_CONFIG': {}, + }) + + # Start the build job. + build_job = self._create_build_job(namespace='something', retries=3) + result = yield From(self.manager.schedule(build_job)) + self.assertTrue(result[0]) + + executor = self.manager.registered_executors[0] + self.assertIsNotNone(executor.job_started) + + # Register the realm so the build information is added. + yield From(self.manager._register_realm({ + 'realm': str(uuid.uuid4()), + 'token': str(uuid.uuid4()), + 'execution_id': executor.job_started, + 'executor_name': 'TestExecutor', + 'build_uuid': build_job.build_uuid, + 'job_queue_item': build_job.job_item, + })) + + # Stop the build job. + yield From(self.manager.kill_builder_executor(build_job.build_uuid)) + self.assertEquals(executor.job_stopped, executor.job_started) + + if __name__ == '__main__': unittest.main()