diff --git a/buildman/builder.py b/buildman/builder.py index 4e88d3ed7..e1c7a852b 100644 --- a/buildman/builder.py +++ b/buildman/builder.py @@ -41,8 +41,11 @@ def run_build_manager(): if manager_klass is None: return - public_ip = os.environ.get('PUBLIC_IP', '127.0.0.1') - logger.debug('Will pass public IP address %s to builders for websocket connection', public_ip) + manager_hostname = os.environ.get('BUILDMAN_HOSTNAME', + app.config.get('BUILDMAN_HOSTNAME', + app.config['SERVER_HOSTNAME'])) + logger.debug('Will pass buildman hostname %s to builders for websocket connection', + manager_hostname) logger.debug('Starting build manager with lifecycle "%s"', build_manager_config[0]) ssl_context = None @@ -53,7 +56,7 @@ def run_build_manager(): os.path.join(os.environ.get('SSL_CONFIG'), 'ssl.key')) server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs, - user_files, manager_klass, build_manager_config[1], public_ip) + user_files, manager_klass, build_manager_config[1], manager_hostname) server.run('0.0.0.0', ssl=ssl_context) if __name__ == '__main__': diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py index 726435bcc..42e6696f2 100644 --- a/buildman/component/buildcomponent.py +++ b/buildman/component/buildcomponent.py @@ -9,6 +9,7 @@ from autobahn.wamp.exception import ApplicationError from buildman.server import BuildJobResult from buildman.component.basecomponent import BaseComponent +from buildman.jobutil.buildjob import BuildJobLoadException from buildman.jobutil.buildpack import BuildPackage, BuildPackageException from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.workererror import WorkerError @@ -58,19 +59,20 @@ class BuildComponent(BaseComponent): yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) - self._set_status(ComponentStatus.WAITING) + yield trollius.From(self._set_status(ComponentStatus.WAITING)) def is_ready(self): """ Determines whether a build component is ready to begin a build. """ return self._component_status == ComponentStatus.RUNNING + @trollius.coroutine def start_build(self, build_job): """ Starts a build. """ self._current_job = build_job self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid) self._image_info = {} - self._set_status(ComponentStatus.BUILDING) + yield trollius.From(self._set_status(ComponentStatus.BUILDING)) # Retrieve the job's buildpack. buildpack_url = self.user_files.get_file_url(build_job.repo_build.resource_key, @@ -82,23 +84,27 @@ class BuildComponent(BaseComponent): buildpack = BuildPackage.from_url(buildpack_url) except BuildPackageException as bpe: self._build_failure('Could not retrieve build package', bpe) - return + raise trollius.Return() # Extract the base image information from the Dockerfile. parsed_dockerfile = None logger.debug('Parsing dockerfile') - build_config = build_job.build_config + try: + build_config = build_job.build_config + except BuildJobLoadException as irbe: + self._build_failure('Could not load build job information', irbe) + try: parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir')) except BuildPackageException as bpe: self._build_failure('Could not find Dockerfile in build package', bpe) - return + raise trollius.Return() image_and_tag_tuple = parsed_dockerfile.get_image_and_tag() if image_and_tag_tuple is None or image_and_tag_tuple[0] is None: self._build_failure('Missing FROM line in Dockerfile') - return + raise trollius.Return() base_image_information = { 'repository': image_and_tag_tuple[0], @@ -147,9 +153,7 @@ class BuildComponent(BaseComponent): logger.debug('Invoking build: %s', self.builder_realm) logger.debug('With Arguments: %s', build_arguments) - return (self - .call("io.quay.builder.build", **build_arguments) - .add_done_callback(self._build_complete)) + self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete) @staticmethod def _total_completion(statuses, total_images): @@ -276,38 +280,42 @@ class BuildComponent(BaseComponent): self._current_job = None # Set the component back to a running state. - self._set_status(ComponentStatus.RUNNING) + yield trollius.From(self._set_status(ComponentStatus.RUNNING)) @staticmethod def _ping(): """ Ping pong. """ return 'pong' + @trollius.coroutine def _on_ready(self, token, version): if not version in SUPPORTED_WORKER_VERSIONS: - logger.warning('Build component (token "%s") is running an out-of-date version: %s', version) - return False + logger.warning('Build component (token "%s") is running an out-of-date version: %s', token, + version) + raise trollius.Return(False) if self._component_status != 'waiting': logger.warning('Build component (token "%s") is already connected', self.expected_token) - return False + raise trollius.Return(False) if token != self.expected_token: - logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token) - return False + logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, + token) + raise trollius.Return(False) - self._set_status(ComponentStatus.RUNNING) + yield trollius.From(self._set_status(ComponentStatus.RUNNING)) # Start the heartbeat check and updating loop. loop = trollius.get_event_loop() loop.create_task(self._heartbeat()) logger.debug('Build worker %s is connected and ready', self.builder_realm) - return True + raise trollius.Return(True) + @trollius.coroutine def _set_status(self, phase): if phase == ComponentStatus.RUNNING: loop = trollius.get_event_loop() - self.parent_manager.build_component_ready(self, loop) + yield trollius.From(self.parent_manager.build_component_ready(self, loop)) self._component_status = phase @@ -344,13 +352,14 @@ class BuildComponent(BaseComponent): logger.debug('Checking heartbeat on realm %s', self.builder_realm) if (self._last_heartbeat and self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA): - self._timeout() + yield trollius.From(self._timeout()) return yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT)) + @trollius.coroutine def _timeout(self): - self._set_status(ComponentStatus.TIMED_OUT) + yield trollius.From(self._set_status(ComponentStatus.TIMED_OUT)) logger.warning('Build component with realm %s has timed out', self.builder_realm) self._dispose(timed_out=True) diff --git a/buildman/jobutil/buildjob.py b/buildman/jobutil/buildjob.py index e92be23a6..c2d2769db 100644 --- a/buildman/jobutil/buildjob.py +++ b/buildman/jobutil/buildjob.py @@ -1,6 +1,9 @@ +import json + +from cachetools import lru_cache + from data import model -import json class BuildJobLoadException(Exception): """ Exception raised if a build job could not be instantiated for some reason. """ @@ -18,14 +21,22 @@ class BuildJob(object): 'Could not parse build queue item config with ID %s' % self.job_details['build_uuid'] ) + @lru_cache(maxsize=1) + def _load_repo_build(self): try: - self.repo_build = model.get_repository_build(self.job_details['build_uuid']) + return model.get_repository_build(self.job_details['build_uuid']) except model.InvalidRepositoryBuildException: raise BuildJobLoadException( 'Could not load repository build with ID %s' % self.job_details['build_uuid']) + @property + def repo_build(self): + return self._load_repo_build() + + @property + def build_config(self): try: - self.build_config = json.loads(self.repo_build.job_config) + return json.loads(self.repo_build.job_config) except ValueError: raise BuildJobLoadException( 'Could not parse repository build job config with ID %s' % self.job_details['build_uuid'] diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index ee17cf531..2c57ac095 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -3,12 +3,12 @@ from trollius import coroutine class BaseManager(object): """ Base for all worker managers. """ def __init__(self, register_component, unregister_component, job_heartbeat_callback, - job_complete_callback, public_ip_address, heartbeat_period_sec): + job_complete_callback, manager_hostname, heartbeat_period_sec): self.register_component = register_component self.unregister_component = unregister_component self.job_heartbeat_callback = job_heartbeat_callback self.job_complete_callback = job_complete_callback - self.public_ip_address = public_ip_address + self.manager_hostname = manager_hostname self.heartbeat_period_sec = heartbeat_period_sec @coroutine @@ -31,7 +31,7 @@ class BaseManager(object): raise NotImplementedError @coroutine - def schedule(self, build_job, loop): + def schedule(self, build_job): """ Schedules a queue item to be built. Returns True if the item was properly scheduled and False if all workers are busy. """ @@ -42,7 +42,8 @@ class BaseManager(object): """ raise NotImplementedError - def build_component_ready(self, build_component, loop): + @coroutine + def build_component_ready(self, build_component): """ Method invoked whenever a build component announces itself as ready. """ raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 5a97c0955..d7fdea39a 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -5,7 +5,7 @@ from buildman.component.basecomponent import BaseComponent from buildman.component.buildcomponent import BuildComponent from buildman.manager.basemanager import BaseManager -from trollius.coroutines import From, Return, coroutine +from trollius import From, Return, coroutine, async REGISTRATION_REALM = 'registration' logger = logging.getLogger(__name__) @@ -51,16 +51,19 @@ class EnterpriseManager(BaseManager): return realm @coroutine - def schedule(self, build_job, loop): + def schedule(self, build_job): """ Schedules a build for an Enterprise Registry. """ if self.shutting_down or not self.ready_components: raise Return(False) component = self.ready_components.pop() - loop.call_soon(component.start_build, build_job) + + yield From(component.start_build(build_job)) + raise Return(True) - def build_component_ready(self, build_component, loop): + @coroutine + def build_component_ready(self, build_component): self.ready_components.add(build_component) def shutdown(self): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 63f03a6b7..7126ec836 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -13,16 +13,28 @@ from urllib3.exceptions import ReadTimeoutError from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent +from buildman.jobutil.buildjob import BuildJob from buildman.asyncutil import AsyncWrapper +from util.morecollections import AttrDict logger = logging.getLogger(__name__) ETCD_BUILDER_PREFIX = 'building/' -ETCD_EXPIRE_RESULT = 'expire' +ETCD_REALM_PREFIX = 'realm/' ETCD_DISABLE_TIMEOUT = 0 +class EtcdAction(object): + GET = 'get' + SET = 'set' + EXPIRE = 'expire' + UPDATE = 'update' + DELETE = 'delete' + CREATE = 'create' + COMPARE_AND_SWAP = 'compareAndSwap' + COMPARE_AND_DELETE = 'compareAndDelete' + class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ @@ -41,52 +53,82 @@ class EphemeralBuilderManager(BaseManager): self._etcd_client = None self._component_to_job = {} + self._job_uuid_to_component = {} self._component_to_builder = {} self._executor = None - self._worker_watch_task = None + # Map of etcd keys being watched to the tasks watching them + self._watch_tasks = {} super(EphemeralBuilderManager, self).__init__(*args, **kwargs) - def _watch_builders(self): - """ Watch the builders key for expirations. - """ + def _watch_etcd(self, etcd_key, change_callback, recursive=True): + watch_task_key = (etcd_key, recursive) + def callback_wrapper(changed_key_future): + + if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done(): + self._watch_etcd(etcd_key, change_callback) + + if changed_key_future.cancelled(): + # Due to lack of interest, tomorrow has been cancelled + return + + try: + etcd_result = changed_key_future.result() + except ReadTimeoutError: + return + + change_callback(etcd_result) + if not self._shutting_down: - workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True, - timeout=ETCD_DISABLE_TIMEOUT) - workers_future.add_done_callback(self._handle_key_expiration) - logger.debug('Scheduling watch task.') - self._worker_watch_task = async(workers_future) + watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, + timeout=ETCD_DISABLE_TIMEOUT) + watch_future.add_done_callback(callback_wrapper) + logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') + self._watch_tasks[watch_task_key] = async(watch_future) - def _handle_key_expiration(self, changed_key_future): - """ Handle when a builder expires - """ - if self._worker_watch_task is None or self._worker_watch_task.done(): - self._watch_builders() - - if changed_key_future.cancelled(): - # Due to lack of interest, tomorrow has been cancelled - return - - try: - etcd_result = changed_key_future.result() - except ReadTimeoutError: - return - - if etcd_result.action == ETCD_EXPIRE_RESULT: + def _handle_builder_expiration(self, etcd_result): + if etcd_result.action == EtcdAction.EXPIRE: # Handle the expiration logger.debug('Builder expired, clean up the old build node') job_metadata = json.loads(etcd_result._prev_node.value) async(self._clean_up_old_builder(etcd_result.key, job_metadata)) + def _handle_realm_change(self, etcd_result): + if etcd_result.action == EtcdAction.SET: + # We must listen on the realm created by ourselves or another worker + realm_spec = json.loads(etcd_result.value) + component = self.register_component(realm_spec['realm'], BuildComponent, + token=realm_spec['token']) + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) + self._component_to_job[component] = build_job + self._component_to_builder[component] = realm_spec['builder_id'] + self._job_uuid_to_component[build_job.job_details['build_uuid']] = component + + elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE: + # We must stop listening for new connections on the specified realm, if we did not get the + # connection + realm_spec = json.loads(etcd_result._prev_node.value) + build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) + component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None) + if component is not None: + # We were not the manager which the worker connected to, remove the bookkeeping for it + logger.debug('Unregistering unused component on realm: %s', realm_spec['realm']) + del self._component_to_job[component] + del self._component_to_builder[component] + self.unregister_component(component) + + else: + logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) + def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor) self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}), - self.public_ip_address) + self.manager_hostname) etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1') etcd_port = self._manager_config.get('ETCD_PORT', 2379) @@ -97,7 +139,8 @@ class EphemeralBuilderManager(BaseManager): self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port), executor=self._async_thread_executor) - self._watch_builders() + self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration) + self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change) def setup_time(self): setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) @@ -108,17 +151,17 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Shutting down worker.') self._shutting_down = True - if self._worker_watch_task is not None: - logger.debug('Canceling watch task.') - self._worker_watch_task.cancel() - self._worker_watch_task = None + for (etcd_key, _), task in self._watch_tasks.items(): + if not task.done(): + logger.debug('Canceling watch task for %s', etcd_key) + task.cancel() if self._async_thread_executor is not None: logger.debug('Shutting down thread pool executor.') self._async_thread_executor.shutdown() @coroutine - def schedule(self, build_job, loop): + def schedule(self, build_job): logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid']) # Check if there are worker slots avialable by checking the number of jobs in etcd @@ -154,8 +197,6 @@ class EphemeralBuilderManager(BaseManager): try: yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) - component = self.register_component(realm, BuildComponent, token=token) - self._component_to_job[component] = build_job except KeyError: # The job was already taken by someone else, we are probably a retry logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') @@ -163,20 +204,38 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Starting builder with executor: %s', self._executor) builder_id = yield From(self._executor.start_builder(realm, token)) - self._component_to_builder[component] = builder_id # Store the builder in etcd associated with the job id payload['builder_id'] = builder_id yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) + # Store the realm spec which will allow any manager to accept this builder when it connects + realm_spec = json.dumps({ + 'realm': realm, + 'token': token, + 'builder_id': builder_id, + 'job_queue_item': build_job.job_item, + }) + try: + yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, + ttl=ttl)) + except KeyError: + logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') + raise Return(False) + raise Return(True) - def build_component_ready(self, build_component, loop): + @coroutine + def build_component_ready(self, build_component): try: + # Clean up the bookkeeping for allowing any manager to take the job job = self._component_to_job.pop(build_component) + del self._job_uuid_to_component[job.job_details['build_uuid']] + yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm))) + logger.debug('Sending build %s to newly ready component on realm %s', job.job_details['build_uuid'], build_component.builder_realm) - loop.call_soon(build_component.start_build, job) + yield From(build_component.start_build(job)) except KeyError: logger.warning('Builder is asking for more work, but work already completed') @@ -240,6 +299,12 @@ class EphemeralBuilderManager(BaseManager): """ return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid']) + @staticmethod + def _etcd_realm_key(realm): + """ Create a key which is used to track an incoming connection on a realm. + """ + return os.path.join(ETCD_REALM_PREFIX, realm) + def num_workers(self): """ Return the number of workers we're managing locally. """ diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index 814b95a5b..c4b38366d 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -29,9 +29,9 @@ class ExecutorException(Exception): class BuilderExecutor(object): - def __init__(self, executor_config, manager_public_ip): + def __init__(self, executor_config, manager_hostname): self.executor_config = executor_config - self.manager_public_ip = manager_public_ip + self.manager_hostname = manager_hostname """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for starting and stopping builders. @@ -52,7 +52,7 @@ class BuilderExecutor(object): def get_manager_websocket_url(self): return 'ws://{0}:' - def generate_cloud_config(self, realm, token, coreos_channel, manager_ip, + def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, quay_username=None, quay_password=None, etcd_token=None): if quay_username is None: quay_username = self.executor_config['QUAY_USERNAME'] @@ -69,7 +69,7 @@ class BuilderExecutor(object): quay_username=quay_username, quay_password=quay_password, etcd_token=etcd_token, - manager_ip=manager_ip, + manager_hostname=manager_hostname, coreos_channel=coreos_channel, ) @@ -108,7 +108,7 @@ class EC2Executor(BuilderExecutor): channel = self.executor_config.get('COREOS_CHANNEL', 'stable') get_ami_callable = partial(self._get_coreos_ami, region, channel) coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable)) - user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip) + user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname) logger.debug('Generated cloud config: %s', user_data) @@ -155,10 +155,10 @@ class EC2Executor(BuilderExecutor): class PopenExecutor(BuilderExecutor): """ Implementation of BuilderExecutor which uses Popen to fork a quay-builder process. """ - def __init__(self, executor_config, manager_public_ip): + def __init__(self, executor_config, manager_hostname): self._jobs = {} - super(PopenExecutor, self).__init__(executor_config, manager_public_ip) + super(PopenExecutor, self).__init__(executor_config, manager_hostname) """ Executor which uses Popen to fork a quay-builder process. """ diff --git a/buildman/server.py b/buildman/server.py index ba9536c1e..e1175f718 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -37,7 +37,7 @@ class BuilderServer(object): controller. """ def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, - lifecycle_manager_config, manager_public_ip): + lifecycle_manager_config, manager_hostname): self._loop = None self._current_status = 'starting' self._current_components = [] @@ -53,7 +53,7 @@ class BuilderServer(object): self._unregister_component, self._job_heartbeat, self._job_complete, - manager_public_ip, + manager_hostname, HEARTBEAT_PERIOD_SEC, ) self._lifecycle_manager_config = lifecycle_manager_config @@ -158,7 +158,7 @@ class BuilderServer(object): self._queue.incomplete(job_item, restore_retry=False) logger.debug('Build job found. Checking for an avaliable worker.') - scheduled = yield From(self._lifecycle_manager.schedule(build_job, self._loop)) + scheduled = yield From(self._lifecycle_manager.schedule(build_job)) if scheduled: self._job_count = self._job_count + 1 logger.debug('Build job scheduled. Running: %s', self._job_count) @@ -168,7 +168,6 @@ class BuilderServer(object): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) - @trollius.coroutine def _initialize(self, loop, host, ssl=None): self._loop = loop diff --git a/buildman/templates/cloudconfig.yaml b/buildman/templates/cloudconfig.yaml index 3bebde670..d6ae3aeca 100644 --- a/buildman/templates/cloudconfig.yaml +++ b/buildman/templates/cloudconfig.yaml @@ -6,7 +6,7 @@ write_files: content: | REALM={{ realm }} TOKEN={{ token }} - ENDPOINT=wss://buildman.quay.io:8787 + SERVER=wss://{{ manager_hostname }} coreos: update: @@ -31,7 +31,6 @@ coreos: [Service] TimeoutStartSec=600 TimeoutStopSec=2000 - ExecStartPre=/bin/sh -xc "echo '{{ manager_ip }} buildman.quay.io' >> /etc/hosts; exit 0" ExecStartPre=/usr/bin/docker login -u {{ quay_username }} -p {{ quay_password }} -e unused quay.io ExecStart=/usr/bin/docker run --rm --net=host --name quay-builder --privileged --env-file /root/overrides.list -v /var/run/docker.sock:/var/run/docker.sock quay.io/coreos/registry-build-worker:latest ExecStop=/usr/bin/docker stop quay-builder diff --git a/data/queue.py b/data/queue.py index 5c720eed2..865511519 100644 --- a/data/queue.py +++ b/data/queue.py @@ -78,7 +78,8 @@ class WorkQueue(object): def get(self, processing_time=300): """ Get an available item and mark it as unavailable for the default of five - minutes. + minutes. The result of this method must always be composed of simple + python objects which are JSON serializable for network portability reasons. """ now = datetime.utcnow() diff --git a/test/test_buildman.py b/test/test_buildman.py index 0d0b6ced2..f10ba473e 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -4,19 +4,20 @@ import os.path import time import json -from trollius import coroutine, get_event_loop, From, Future, sleep +from trollius import coroutine, get_event_loop, From, Future, sleep, Return from mock import Mock from threading import Event from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX, - ETCD_EXPIRE_RESULT) + ETCD_REALM_PREFIX, EtcdAction) from buildman.server import BuildJobResult from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' +REALM_ID = '1234-realm' def async_test(f): @@ -43,17 +44,17 @@ class TestEphemeral(unittest.TestCase): self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) return self.etcd_client_mock - def _create_mock_executor(self, *args, **kwargs): - def create_completed_future(result=None): - def inner(*args, **kwargs): - new_future = Future() - new_future.set_result(result) - return new_future - return inner + def _create_completed_future(self, result=None): + def inner(*args, **kwargs): + new_future = Future() + new_future.set_result(result) + return new_future + return inner + def _create_mock_executor(self, *args, **kwargs): self.test_executor = Mock(spec=BuilderExecutor) - self.test_executor.start_builder = Mock(side_effect=create_completed_future('123')) - self.test_executor.stop_builder = Mock(side_effect=create_completed_future()) + self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) + self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) return self.test_executor def _create_build_job(self): @@ -61,6 +62,10 @@ class TestEphemeral(unittest.TestCase): mock_job.job_details = { 'build_uuid': BUILD_UUID, } + mock_job.job_item = { + 'body': json.dumps(mock_job.job_details), + 'id': 1, + } return mock_job def setUp(self): @@ -71,13 +76,13 @@ class TestEphemeral(unittest.TestCase): self.etcd_wait_event.clear() self.register_component_callback = Mock() - self.uniregister_component_callback = Mock() + self.unregister_component_callback = Mock() self.job_heartbeat_callback = Mock() self.job_complete_callback = Mock() self.manager = EphemeralBuilderManager( self.register_component_callback, - self.uniregister_component_callback, + self.unregister_component_callback, self.job_heartbeat_callback, self.job_complete_callback, '127.0.0.1', @@ -97,15 +102,19 @@ class TestEphemeral(unittest.TestCase): del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass - @async_test - def test_schedule_and_complete(self): + @coroutine + def _setup_job_for_managers(self): + # Test that we are watching the realm location before anything else happens + self.etcd_client_mock.watch.assert_any_call(ETCD_REALM_PREFIX, recursive=True, timeout=0) + self.etcd_client_mock.read = Mock(side_effect=KeyError) - test_component = BuildComponent(None) + test_component = Mock(spec=BuildComponent) + test_component.builder_realm = REALM_ID + test_component.start_build = Mock(side_effect=self._create_completed_future()) self.register_component_callback.return_value = test_component # Ask for a builder to be scheduled - loop = get_event_loop() - is_scheduled = yield From(self.manager.schedule(self.mock_job, loop)) + is_scheduled = yield From(self.manager.schedule(self.mock_job)) self.assertTrue(is_scheduled) @@ -114,29 +123,76 @@ class TestEphemeral(unittest.TestCase): self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) + # Right now the job is not registered with any managers because etcd has not accepted the job + self.assertEqual(self.register_component_callback.call_count, 0) + + realm_created = Mock(spec=etcd.EtcdResult) + realm_created.action = EtcdAction.SET + realm_created.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID) + realm_created.value = json.dumps({ + 'realm': REALM_ID, + 'token': 'beef', + 'builder_id': '123', + 'job_queue_item': self.mock_job.job_item, + }) + + self.manager._handle_realm_change(realm_created) + self.assertEqual(self.register_component_callback.call_count, 1) + raise Return(test_component) + + @async_test + def test_schedule_and_complete(self): + # Test that a job is properly registered with all of the managers + test_component = yield From(self._setup_job_for_managers()) + + # Take the job ourselves + yield From(self.manager.build_component_ready(test_component)) + + self.etcd_client_mock.delete.assert_called_once_with(os.path.join(ETCD_REALM_PREFIX, REALM_ID)) + self.etcd_client_mock.delete.reset_mock() + + # Finish the job yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) + @async_test + def test_another_manager_takes_job(self): + # Prepare a job to be taken by another manager + test_component = yield From(self._setup_job_for_managers()) + + realm_deleted = Mock(spec=etcd.EtcdResult) + realm_deleted.action = EtcdAction.DELETE + realm_deleted.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID) + + realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) + realm_deleted._prev_node.value = json.dumps({ + 'realm': REALM_ID, + 'token': 'beef', + 'builder_id': '123', + 'job_queue_item': self.mock_job.job_item, + }) + + self.manager._handle_realm_change(realm_deleted) + + self.unregister_component_callback.assert_called_once_with(test_component) + @async_test def test_expiring_worker(self): # Test that we are watching before anything else happens - self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True, - timeout=0) + self.etcd_client_mock.watch.assert_any_call(ETCD_BUILDER_PREFIX, recursive=True, timeout=0) # Send a signal to the callback that a worker has expired expired_result = Mock(spec=etcd.EtcdResult) - expired_result.action = ETCD_EXPIRE_RESULT + expired_result.action = EtcdAction.EXPIRE expired_result.key = self.mock_job_key expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) - expired_future = Future() - expired_future.set_result(expired_result) - self.manager._handle_key_expiration(expired_future) + self.manager._handle_builder_expiration(expired_result) yield From(sleep(.01)) @@ -151,10 +207,8 @@ class TestEphemeral(unittest.TestCase): set_result = Mock(sepc=etcd.EtcdResult) set_result.action = 'set' set_result.key = self.mock_job_key - set_future = Future() - set_future.set_result(set_result) - self.manager._handle_key_expiration(set_future) + self.manager._handle_builder_expiration(set_result) yield From(sleep(.01)) @@ -179,15 +233,3 @@ class TestEphemeral(unittest.TestCase): self.job_heartbeat_callback.assert_called_once_with(self.mock_job) self.assertEqual(self.etcd_client_mock.write.call_count, 1) self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) - - @async_test - def test_etcd_read_timeout(self): - # Send a signal to the callback that a worker key has been changed - read_timeout_future = Future() - read_timeout_future.set_exception(ReadTimeoutError(None, None, None)) - - self.manager._handle_key_expiration(read_timeout_future) - - yield From(sleep(.01)) - - self.assertEquals(self.test_executor.stop_builder.call_count, 0)