246 lines
8.8 KiB
Python
246 lines
8.8 KiB
Python
import logging
|
|
import etcd
|
|
import uuid
|
|
import calendar
|
|
import os.path
|
|
import json
|
|
|
|
from datetime import datetime, timedelta
|
|
from trollius import From, coroutine, Return, async
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib3.exceptions import ReadTimeoutError
|
|
|
|
from buildman.manager.basemanager import BaseManager
|
|
from buildman.manager.executor import PopenExecutor, EC2Executor
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.asyncutil import AsyncWrapper
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ETCD_BUILDER_PREFIX = 'building/'
|
|
ETCD_EXPIRE_RESULT = 'expire'
|
|
ETCD_DISABLE_TIMEOUT = 0
|
|
|
|
|
|
class EphemeralBuilderManager(BaseManager):
|
|
""" Build manager implementation for the Enterprise Registry. """
|
|
_executors = {
|
|
'popen': PopenExecutor,
|
|
'ec2': EC2Executor,
|
|
}
|
|
|
|
_etcd_client_klass = etcd.Client
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._shutting_down = False
|
|
|
|
self._manager_config = None
|
|
self._async_thread_executor = None
|
|
self._etcd_client = None
|
|
|
|
self._component_to_job = {}
|
|
self._component_to_builder = {}
|
|
|
|
self._executor = None
|
|
|
|
self._worker_watch_task = None
|
|
|
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
|
|
|
def _watch_builders(self):
|
|
""" Watch the builders key for expirations.
|
|
"""
|
|
if not self._shutting_down:
|
|
workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True,
|
|
timeout=ETCD_DISABLE_TIMEOUT)
|
|
workers_future.add_done_callback(self._handle_key_expiration)
|
|
logger.debug('Scheduling watch task.')
|
|
self._worker_watch_task = async(workers_future)
|
|
|
|
def _handle_key_expiration(self, changed_key_future):
|
|
""" Handle when a builder expires
|
|
"""
|
|
if self._worker_watch_task is None or self._worker_watch_task.done():
|
|
self._watch_builders()
|
|
|
|
if changed_key_future.cancelled():
|
|
# Due to lack of interest, tomorrow has been cancelled
|
|
return
|
|
|
|
try:
|
|
etcd_result = changed_key_future.result()
|
|
except ReadTimeoutError:
|
|
return
|
|
|
|
if etcd_result.action == ETCD_EXPIRE_RESULT:
|
|
# Handle the expiration
|
|
logger.debug('Builder expired, clean up the old build node')
|
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
|
async(self._clean_up_old_builder(etcd_result.key, job_metadata))
|
|
|
|
def initialize(self, manager_config):
|
|
logger.debug('Calling initialize')
|
|
self._manager_config = manager_config
|
|
|
|
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
|
|
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
|
|
self.public_ip_address)
|
|
|
|
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
|
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
|
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
|
|
|
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
|
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
|
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port),
|
|
executor=self._async_thread_executor)
|
|
|
|
self._watch_builders()
|
|
|
|
def setup_time(self):
|
|
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
|
|
logger.debug('Returning setup_time: %s', setup_time)
|
|
return setup_time
|
|
|
|
def shutdown(self):
|
|
logger.debug('Shutting down worker.')
|
|
self._shutting_down = True
|
|
|
|
if self._worker_watch_task is not None:
|
|
logger.debug('Canceling watch task.')
|
|
self._worker_watch_task.cancel()
|
|
self._worker_watch_task = None
|
|
|
|
if self._async_thread_executor is not None:
|
|
logger.debug('Shutting down thread pool executor.')
|
|
self._async_thread_executor.shutdown()
|
|
|
|
@coroutine
|
|
def schedule(self, build_job, loop):
|
|
logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid'])
|
|
|
|
# Check if there are worker slots avialable by checking the number of jobs in etcd
|
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
|
try:
|
|
building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True))
|
|
workers_alive = sum(1 for child in building.children if not child.dir)
|
|
except KeyError:
|
|
workers_alive = 0
|
|
|
|
logger.debug('Total jobs: %s', workers_alive)
|
|
|
|
if workers_alive >= allowed_worker_count:
|
|
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
|
|
allowed_worker_count)
|
|
raise Return(False)
|
|
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
|
realm = str(uuid.uuid4())
|
|
token = str(uuid.uuid4())
|
|
ttl = self.setup_time()
|
|
expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
|
|
|
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(expiration.timetuple()),
|
|
'max_expiration': calendar.timegm(max_expiration.timetuple()),
|
|
}
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl))
|
|
component = self.register_component(realm, BuildComponent, token=token)
|
|
self._component_to_job[component] = build_job
|
|
except KeyError:
|
|
# The job was already taken by someone else, we are probably a retry
|
|
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
|
|
raise Return(False)
|
|
|
|
logger.debug('Starting builder with executor: %s', self._executor)
|
|
builder_id = yield From(self._executor.start_builder(realm, token))
|
|
self._component_to_builder[component] = builder_id
|
|
|
|
# Store the builder in etcd associated with the job id
|
|
payload['builder_id'] = builder_id
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl))
|
|
|
|
raise Return(True)
|
|
|
|
def build_component_ready(self, build_component, loop):
|
|
try:
|
|
job = self._component_to_job.pop(build_component)
|
|
logger.debug('Sending build %s to newly ready component on realm %s',
|
|
job.job_details['build_uuid'], build_component.builder_realm)
|
|
loop.call_soon(build_component.start_build, job)
|
|
except KeyError:
|
|
logger.warning('Builder is asking for more work, but work already completed')
|
|
|
|
def build_component_disposed(self, build_component, timed_out):
|
|
logger.debug('Calling build_component_disposed.')
|
|
|
|
# TODO make it so that I don't have to unregister the component if it timed out
|
|
self.unregister_component(build_component)
|
|
|
|
@coroutine
|
|
def job_completed(self, build_job, job_status, build_component):
|
|
logger.debug('Calling job_completed with status: %s', job_status)
|
|
|
|
# Kill the ephmeral builder
|
|
self._executor.stop_builder(self._component_to_builder.pop(build_component))
|
|
|
|
# Release the lock in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
yield From(self._etcd_client.delete(job_key))
|
|
|
|
self.job_complete_callback(build_job, job_status)
|
|
|
|
@coroutine
|
|
def job_heartbeat(self, build_job):
|
|
# Extend the deadline in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
|
|
build_job_metadata = json.loads(build_job_metadata_response.value)
|
|
|
|
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
|
|
|
|
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
|
|
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(new_expiration.timetuple()),
|
|
'builder_id': build_job_metadata['builder_id'],
|
|
'max_expiration': build_job_metadata['max_expiration'],
|
|
}
|
|
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
|
|
|
|
self.job_heartbeat_callback(build_job)
|
|
|
|
@coroutine
|
|
def _clean_up_old_builder(self, job_key, job_payload):
|
|
""" Terminate an old builders once the expiration date has passed.
|
|
"""
|
|
logger.debug('Cleaning up the old builder for job: %s', job_key)
|
|
if 'builder_id' in job_payload:
|
|
logger.info('Terminating expired build node.')
|
|
yield From(self._executor.stop_builder(job_payload['builder_id']))
|
|
|
|
yield From(self._etcd_client.delete(job_key))
|
|
|
|
@staticmethod
|
|
def _etcd_job_key(build_job):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
|
|
|
|
def num_workers(self):
|
|
""" Return the number of workers we're managing locally.
|
|
"""
|
|
return len(self._component_to_builder)
|