2014-12-16 13:41:30 -05:00
import logging
import etcd
import uuid
2014-12-22 12:14:16 -05:00
import calendar
2014-12-22 16:22:07 -05:00
import os.path
2014-12-23 14:09:04 -05:00
import json
2014-12-16 13:41:30 -05:00
from datetime import datetime, timedelta
2014-12-22 16:22:07 -05:00
from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor
2014-12-23 12:13:49 -05:00
from urllib3.exceptions import ReadTimeoutError
2014-12-16 13:41:30 -05:00
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
2014-12-22 12:14:16 -05:00
from buildman.asyncutil import AsyncWrapper
2014-12-16 13:41:30 -05:00
logger = logging.getLogger(__name__)
2014-12-22 16:22:07 -05:00
2014-12-23 14:54:34 -05:00
2014-12-16 13:41:30 -05:00
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
2014-12-22 12:14:16 -05:00
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
_etcd_client_klass = etcd.Client
2014-12-16 13:41:30 -05:00
def __init__(self, *args, **kwargs):
2014-12-22 16:22:07 -05:00
self._shutting_down = False
2014-12-16 13:41:30 -05:00
self._manager_config = None
2014-12-22 16:22:07 -05:00
self._async_thread_executor = None
2014-12-16 13:41:30 -05:00
self._etcd_client = None
self._component_to_job = {}
self._component_to_builder = {}
self._executor = None
2014-12-22 16:22:07 -05:00
self._worker_watch_task = None
2014-12-16 13:41:30 -05:00
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
2014-12-22 16:22:07 -05:00
def _watch_builders(self):
""" Watch the builders key for expirations.
if not self._shutting_down:
2014-12-23 14:54:34 -05:00
workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True,
2014-12-22 16:22:07 -05:00
logger.debug('Scheduling watch task.')
self._worker_watch_task = async(workers_future)
def _handle_key_expiration(self, changed_key_future):
""" Handle when a builder expires
if self._worker_watch_task is None or self._worker_watch_task.done():
if changed_key_future.cancelled():
# Due to lack of interest, tomorrow has been cancelled
2014-12-23 12:13:49 -05:00
etcd_result = changed_key_future.result()
except ReadTimeoutError:
2014-12-22 16:22:07 -05:00
if etcd_result.action == ETCD_EXPIRE_RESULT:
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
2014-12-23 14:09:04 -05:00
job_metadata = json.loads(etcd_result._prev_node.value)
async(self._clean_up_old_builder(etcd_result.key, job_metadata))
2014-12-22 16:22:07 -05:00
2014-12-16 13:41:30 -05:00
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
etcd_host = self._manager_config.get('ETCD_HOST', '')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
2014-12-22 17:24:44 -05:00
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
2014-12-22 16:22:07 -05:00
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port),
2014-12-16 13:41:30 -05:00
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
logger.debug('Returning setup_time: %s', setup_time)
return setup_time
def shutdown(self):
2014-12-22 16:22:07 -05:00
logger.debug('Shutting down worker.')
self._shutting_down = True
if self._worker_watch_task is not None:
logger.debug('Canceling watch task.')
self._worker_watch_task = None
if self._async_thread_executor is not None:
logger.debug('Shutting down thread pool executor.')
2014-12-16 13:41:30 -05:00
2014-12-22 12:14:16 -05:00
2014-12-16 13:41:30 -05:00
def schedule(self, build_job, loop):
2014-12-22 12:14:16 -05:00
logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid'])
2014-12-16 13:41:30 -05:00
# Check if there are worker slots avialable by checking the number of jobs in etcd
2014-12-22 12:14:16 -05:00
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
2014-12-16 13:41:30 -05:00
2014-12-22 12:14:16 -05:00
building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True))
2014-12-16 13:41:30 -05:00
workers_alive = sum(1 for child in building.children if not child.dir)
except KeyError:
workers_alive = 0
logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
2014-12-22 12:14:16 -05:00
raise Return(False)
2014-12-16 13:41:30 -05:00
job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4())
token = str(uuid.uuid4())
2014-12-22 17:24:44 -05:00
ttl = self.setup_time()
expiration = datetime.utcnow() + timedelta(seconds=ttl)
2014-12-16 13:41:30 -05:00
2014-12-23 11:18:10 -05:00
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
2014-12-16 13:41:30 -05:00
payload = {
2014-12-22 12:14:16 -05:00
'expiration': calendar.timegm(expiration.timetuple()),
2014-12-23 11:18:10 -05:00
'max_expiration': calendar.timegm(max_expiration.timetuple()),
2014-12-16 13:41:30 -05:00
2014-12-23 14:09:04 -05:00
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl))
2014-12-16 13:41:30 -05:00
component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job
except KeyError:
# The job was already taken by someone else, we are probably a retry
2014-12-22 12:14:16 -05:00
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
raise Return(False)
2014-12-16 13:41:30 -05:00
2014-12-22 12:14:16 -05:00
logger.debug('Starting builder with executor: %s', self._executor)
builder_id = yield From(self._executor.start_builder(realm, token))
2014-12-16 13:41:30 -05:00
self._component_to_builder[component] = builder_id
2014-12-22 12:14:16 -05:00
# Store the builder in etcd associated with the job id
payload['builder_id'] = builder_id
2014-12-23 14:09:04 -05:00
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl))
2014-12-22 12:14:16 -05:00
raise Return(True)
2014-12-16 13:41:30 -05:00
def build_component_ready(self, build_component, loop):
job = self._component_to_job.pop(build_component)
2014-12-22 12:14:16 -05:00
logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm)
2014-12-16 13:41:30 -05:00
loop.call_soon(build_component.start_build, job)
except KeyError:
logger.warning('Builder is asking for more work, but work already completed')
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
2014-12-22 17:24:44 -05:00
# TODO make it so that I don't have to unregister the component if it timed out
2014-12-22 12:14:16 -05:00
2014-12-16 13:41:30 -05:00
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
2014-12-22 17:24:44 -05:00
# Kill the ephmeral builder
2014-12-16 13:41:30 -05:00
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
2014-12-22 12:14:16 -05:00
yield From(self._etcd_client.delete(job_key))
2014-12-16 13:41:30 -05:00
self.job_complete_callback(build_job, job_status)
2014-12-22 17:24:44 -05:00
def job_heartbeat(self, build_job):
# Extend the deadline in etcd
job_key = self._etcd_job_key(build_job)
2014-12-23 14:09:04 -05:00
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
build_job_metadata = json.loads(build_job_metadata_response.value)
2014-12-22 17:24:44 -05:00
2014-12-23 14:09:04 -05:00
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
2014-12-23 11:18:10 -05:00
max_expiration_remaining = max_expiration - datetime.utcnow()
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
2014-12-22 17:24:44 -05:00
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = {
'expiration': calendar.timegm(new_expiration.timetuple()),
2014-12-23 14:09:04 -05:00
'builder_id': build_job_metadata['builder_id'],
'max_expiration': build_job_metadata['max_expiration'],
2014-12-22 17:24:44 -05:00
2014-12-23 14:09:04 -05:00
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
2014-12-22 17:24:44 -05:00
2014-12-22 12:14:16 -05:00
def _clean_up_old_builder(self, job_key, job_payload):
""" Terminate an old builders once the expiration date has passed.
logger.debug('Cleaning up the old builder for job: %s', job_key)
if 'builder_id' in job_payload:
logger.info('Terminating expired build node.')
yield From(self._executor.stop_builder(job_payload['builder_id']))
yield From(self._etcd_client.delete(job_key))
2014-12-16 13:41:30 -05:00
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
2014-12-22 16:22:07 -05:00
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
2014-12-22 17:24:44 -05:00
def num_workers(self):
""" Return the number of workers we're managing locally.
return len(self._component_to_builder)