import logging import etcd import uuid import calendar import os.path import json from datetime import datetime, timedelta from trollius import From, coroutine, Return, async from concurrent.futures import ThreadPoolExecutor from urllib3.exceptions import ReadTimeoutError from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent from buildman.asyncutil import AsyncWrapper logger = logging.getLogger(__name__) ETCD_BUILDER_PREFIX = 'building/' ETCD_EXPIRE_RESULT = 'expire' ETCD_DISABLE_TIMEOUT = 0 class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ _executors = { 'popen': PopenExecutor, 'ec2': EC2Executor, } _etcd_client_klass = etcd.Client def __init__(self, *args, **kwargs): self._shutting_down = False self._manager_config = None self._async_thread_executor = None self._etcd_client = None self._component_to_job = {} self._component_to_builder = {} self._executor = None self._worker_watch_task = None super(EphemeralBuilderManager, self).__init__(*args, **kwargs) def _watch_builders(self): """ Watch the builders key for expirations. """ if not self._shutting_down: workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True, timeout=ETCD_DISABLE_TIMEOUT) workers_future.add_done_callback(self._handle_key_expiration) logger.debug('Scheduling watch task.') self._worker_watch_task = async(workers_future) def _handle_key_expiration(self, changed_key_future): """ Handle when a builder expires """ if self._worker_watch_task is None or self._worker_watch_task.done(): self._watch_builders() if changed_key_future.cancelled(): # Due to lack of interest, tomorrow has been cancelled return try: etcd_result = changed_key_future.result() except ReadTimeoutError: return if etcd_result.action == ETCD_EXPIRE_RESULT: # Handle the expiration logger.debug('Builder expired, clean up the old build node') job_metadata = json.loads(etcd_result._prev_node.value) async(self._clean_up_old_builder(etcd_result.key, job_metadata)) def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor) self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}), self.public_ip_address) etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1') etcd_port = self._manager_config.get('ETCD_PORT', 2379) logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5) self._async_thread_executor = ThreadPoolExecutor(worker_threads) self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port), executor=self._async_thread_executor) self._watch_builders() def setup_time(self): setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) logger.debug('Returning setup_time: %s', setup_time) return setup_time def shutdown(self): logger.debug('Shutting down worker.') self._shutting_down = True if self._worker_watch_task is not None: logger.debug('Canceling watch task.') self._worker_watch_task.cancel() self._worker_watch_task = None if self._async_thread_executor is not None: logger.debug('Shutting down thread pool executor.') self._async_thread_executor.shutdown() @coroutine def schedule(self, build_job, loop): logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid']) # Check if there are worker slots avialable by checking the number of jobs in etcd allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1) try: building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True)) workers_alive = sum(1 for child in building.children if not child.dir) except KeyError: workers_alive = 0 logger.debug('Total jobs: %s', workers_alive) if workers_alive >= allowed_worker_count: logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, allowed_worker_count) raise Return(False) job_key = self._etcd_job_key(build_job) # First try to take a lock for this job, meaning we will be responsible for its lifeline realm = str(uuid.uuid4()) token = str(uuid.uuid4()) ttl = self.setup_time() expiration = datetime.utcnow() + timedelta(seconds=ttl) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) payload = { 'expiration': calendar.timegm(expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()), } try: yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) component = self.register_component(realm, BuildComponent, token=token) self._component_to_job[component] = build_job except KeyError: # The job was already taken by someone else, we are probably a retry logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') raise Return(False) logger.debug('Starting builder with executor: %s', self._executor) builder_id = yield From(self._executor.start_builder(realm, token)) self._component_to_builder[component] = builder_id # Store the builder in etcd associated with the job id payload['builder_id'] = builder_id yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) raise Return(True) def build_component_ready(self, build_component, loop): try: job = self._component_to_job.pop(build_component) logger.debug('Sending build %s to newly ready component on realm %s', job.job_details['build_uuid'], build_component.builder_realm) loop.call_soon(build_component.start_build, job) except KeyError: logger.warning('Builder is asking for more work, but work already completed') def build_component_disposed(self, build_component, timed_out): logger.debug('Calling build_component_disposed.') # TODO make it so that I don't have to unregister the component if it timed out self.unregister_component(build_component) @coroutine def job_completed(self, build_job, job_status, build_component): logger.debug('Calling job_completed with status: %s', job_status) # Kill the ephmeral builder yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component))) # Release the lock in etcd job_key = self._etcd_job_key(build_job) yield From(self._etcd_client.delete(job_key)) self.job_complete_callback(build_job, job_status) @coroutine def job_heartbeat(self, build_job): # Extend the deadline in etcd job_key = self._etcd_job_key(build_job) build_job_metadata_response = yield From(self._etcd_client.read(job_key)) build_job_metadata = json.loads(build_job_metadata_response.value) max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration']) max_expiration_remaining = max_expiration - datetime.utcnow() max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds())) ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec) new_expiration = datetime.utcnow() + timedelta(seconds=ttl) payload = { 'expiration': calendar.timegm(new_expiration.timetuple()), 'builder_id': build_job_metadata['builder_id'], 'max_expiration': build_job_metadata['max_expiration'], } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) self.job_heartbeat_callback(build_job) @coroutine def _clean_up_old_builder(self, job_key, job_payload): """ Terminate an old builders once the expiration date has passed. """ logger.debug('Cleaning up the old builder for job: %s', job_key) if 'builder_id' in job_payload: logger.info('Terminating expired build node.') yield From(self._executor.stop_builder(job_payload['builder_id'])) yield From(self._etcd_client.delete(job_key)) @staticmethod def _etcd_job_key(build_job): """ Create a key which is used to track a job in etcd. """ return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid']) def num_workers(self): """ Return the number of workers we're managing locally. """ return len(self._component_to_builder)