Merge pull request #1624 from coreos-inc/builder-cleanup-tests

Bug fixes, refactoring and "new" tests for the build manager
This commit is contained in:
josephschorr 2016-07-21 13:50:41 -04:00 committed by GitHub
commit cf630838f0
4 changed files with 513 additions and 148 deletions

View file

@ -57,10 +57,15 @@ class BuildJob(object):
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def _load_repo_build(self): def _load_repo_build(self):
try: try:
return model.build.get_repository_build(self.job_details['build_uuid']) return model.build.get_repository_build(self.build_uuid)
except model.InvalidRepositoryBuildException: except model.InvalidRepositoryBuildException:
raise BuildJobLoadException( raise BuildJobLoadException(
'Could not load repository build with ID %s' % self.job_details['build_uuid']) 'Could not load repository build with ID %s' % self.build_uuid)
@property
def build_uuid(self):
""" Returns the unique UUID for this build job. """
return self.job_details['build_uuid']
@property @property
def namespace(self): def namespace(self):

View file

@ -5,6 +5,7 @@ import calendar
import os.path import os.path
import json import json
from collections import namedtuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from trollius import From, coroutine, Return, async from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -28,12 +29,6 @@ RETRY_IMMEDIATELY_TIMEOUT = 0
NO_WORKER_AVAILABLE_TIMEOUT = 10 NO_WORKER_AVAILABLE_TIMEOUT = 10
DEFAULT_EPHEMERAL_API_TIMEOUT = 20 DEFAULT_EPHEMERAL_API_TIMEOUT = 20
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
class EtcdAction(object): class EtcdAction(object):
GET = 'get' GET = 'get'
SET = 'set' SET = 'set'
@ -44,13 +39,28 @@ class EtcdAction(object):
COMPARE_AND_SWAP = 'compareAndSwap' COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete' COMPARE_AND_DELETE = 'compareAndDelete'
BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name'])
def _create_async_etcd_client(worker_threads=1, **kwargs):
client = etcd.Client(**kwargs)
async_executor = ThreadPoolExecutor(worker_threads)
return AsyncWrapper(client, executor=async_executor), async_executor
class EphemeralBuilderManager(BaseManager): class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """ """ Build manager implementation for the Enterprise Registry. """
_etcd_client_klass = etcd.Client EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client)
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
self._shutting_down = False self._shutting_down = False
self._manager_config = None self._manager_config = None
@ -58,22 +68,24 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_client = None self._etcd_client = None
self._etcd_realm_prefix = None self._etcd_realm_prefix = None
self._etcd_builder_prefix = None self._etcd_job_prefix = None
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {} # The registered executors available for running jobs, in order.
self._job_uuid_to_component = {} self._ordered_executors = []
self._component_to_builder = {}
self._job_to_executor = {}
self._executors = [] # The registered executors, mapped by their unique name.
self._executor_name_to_executor = {}
# Map of etcd keys being watched to the tasks watching them # Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {} self._watch_tasks = {}
super(EphemeralBuilderManager, self).__init__(*args, **kwargs) # Map from builder component to its associated job.
self._component_to_job = {}
# Map from build UUID to a BuildInfo tuple with information about the build.
self._build_uuid_to_info = {}
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True, def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
restarter=None): restarter=None):
@ -129,7 +141,6 @@ class EphemeralBuilderManager(BaseManager):
if not self._shutting_down: if not self._shutting_down:
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key, logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
'*' if recursive else '', start_index) '*' if recursive else '', start_index)
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index, watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
timeout=ETCD_MAX_WATCH_TIMEOUT) timeout=ETCD_MAX_WATCH_TIMEOUT)
watch_future.add_done_callback(callback_wrapper) watch_future.add_done_callback(callback_wrapper)
@ -137,34 +148,37 @@ class EphemeralBuilderManager(BaseManager):
self._watch_tasks[watch_task_key] = async(watch_future) self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine @coroutine
def _handle_builder_expiration(self, etcd_result): def _handle_job_expiration(self, etcd_result):
""" Handler invoked whenever a job expires in etcd. """
if etcd_result is None: if etcd_result is None:
return return
if etcd_result.action == EtcdAction.EXPIRE: if etcd_result.action != EtcdAction.EXPIRE:
# Handle the expiration return
logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value)
if 'builder_id' in job_metadata: # Handle the expiration
builder_id = job_metadata['builder_id'] logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
build_info = self._build_uuid_to_info.get(build_job.build_uuid)
if build_info is None:
logger.error('Could not find build info for job %s under etcd expire with metadata: %s',
build_job.build_uuid, job_metadata)
return
# Before we delete the build node, we take a lock to make sure that only one manager execution_id = build_info.execution_id
# can terminate the node.
try:
lock_key = self._etcd_lock_key(builder_id)
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
return
if not job_metadata.get('had_heartbeat', True): # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
logger.warning('Build node failed to successfully boot: %s', builder_id) # the job as incomplete here.
build_job = BuildJob(AttrDict(job_metadata['job_queue_item'])) if not job_metadata.get('had_heartbeat', True):
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) logger.warning('Build executor failed to successfully boot with execution id %s',
execution_id)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id) # Finally, we terminate the build execution for the job.
yield From(self._job_to_executor[builder_id].stop_builder(builder_id)) logger.info('Terminating expired build executor for job %s with execution id %s',
build_job.build_uuid, execution_id)
yield From(self.kill_builder_executor(build_job.build_uuid))
def _handle_realm_change(self, etcd_result): def _handle_realm_change(self, etcd_result):
if etcd_result is None: if etcd_result is None:
@ -180,13 +194,14 @@ class EphemeralBuilderManager(BaseManager):
# connection # connection
realm_spec = json.loads(etcd_result._prev_node.value) realm_spec = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None) build_uuid = build_job.build_uuid
if component is not None:
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is not None:
# We were not the manager which the worker connected to, remove the bookkeeping for it # We were not the manager which the worker connected to, remove the bookkeeping for it
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm']) logger.debug('Unregistering unused component for build %s', build_uuid)
del self._component_to_job[component] self._component_to_job.pop(build_info.component, None)
del self._component_to_builder[component] self.unregister_component(build_info.component)
self.unregister_component(component)
else: else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key) logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
@ -200,15 +215,23 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Realm already registered with manager: %s', realm_spec['realm']) logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
return component return component
# Create the build information block for the registered realm.
build_job = BuildJob(AttrDict(realm_spec['job_queue_item'])) build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
# TODO(jschorr): Remove the back-compat lookups once we've finished the rollout.
execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None))
executor_name = realm_spec.get('executor_name', 'EC2Executor')
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
executor_name=executor_name)
self._component_to_job[component] = build_job self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id'] self._build_uuid_to_info[build_job.build_uuid] = build_info
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
return component return component
@property @property
def registered_executors(self): def registered_executors(self):
return self._executors return self._ordered_executors
@coroutine @coroutine
def _register_existing_realms(self): def _register_existing_realms(self):
@ -223,22 +246,27 @@ class EphemeralBuilderManager(BaseManager):
encountered.add(component) encountered.add(component)
# Remove any components not encountered so we can clean up. # Remove any components not encountered so we can clean up.
for found in list(self._component_to_job.keys()): for component, job in list(self._component_to_job.items()):
if not found in encountered: if not component in encountered:
self._component_to_job.pop(component) self._component_to_job.pop(component, None)
self._component_to_builder.pop(component) self._build_uuid_to_info.pop(job.build_uuid, None)
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
# no realms have been registered yet # no realms have been registered yet
pass pass
def _load_executor(self, executor_class_name, executor_config): def _load_executor(self, executor_kind_name, executor_config):
executor_klass = EXECUTORS.get(executor_class_name) executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
if executor_klass is None: if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_class_name) logger.error('Unknown executor %s; skipping install', executor_kind_name)
return return
self._executors.append(executor_klass(executor_config, self.manager_hostname)) executor = executor_klass(executor_config, self.manager_hostname)
if executor.name in self._executor_name_to_executor:
raise Exception('Executor with name %s already registered' % executor.name)
self._ordered_executors.append(executor)
self._executor_name_to_executor[executor.name] = executor
def initialize(self, manager_config): def initialize(self, manager_config):
logger.debug('Calling initialize') logger.debug('Calling initialize')
@ -265,21 +293,17 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5) worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
self._async_thread_executor = ThreadPoolExecutor(worker_threads) (self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads,
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port, host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert,
cert=etcd_auth, ca_cert=etcd_ca_cert, protocol=etcd_protocol, read_timeout=5)
protocol=etcd_protocol,
read_timeout=5),
executor=self._async_thread_executor)
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/') self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration) self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change, self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms) restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT) DEFAULT_EPHEMERAL_API_TIMEOUT)
@ -310,8 +334,8 @@ class EphemeralBuilderManager(BaseManager):
# Check if there are worker slots avialable by checking the number of jobs in etcd # Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1) allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
try: try:
building = yield From(self._etcd_client.read(self._etcd_builder_prefix, recursive=True)) active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
workers_alive = sum(1 for child in building.children if not child.dir) workers_alive = sum(1 for child in active_jobs.children if not child.dir)
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
workers_alive = 0 workers_alive = 0
except etcd.EtcdException: except etcd.EtcdException:
@ -359,40 +383,40 @@ class EphemeralBuilderManager(BaseManager):
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
started_with_executor = None started_with_executor = None
builder_id = None execution_id = None
logger.debug("Registered executors are: %s", [ex.__class__.__name__ for ex in self._executors]) logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
for executor in self._executors:
executor_type = executor.__class__.__name__
for executor in self._ordered_executors:
# Check if we can use this executor based on its whitelist, by namespace. # Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace): if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace, logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor_type) executor.name)
continue continue
# Check if we can use this executor based on the retries remaining. # Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining: if executor.minimum_retry_threshold > build_job.retries_remaining:
logger.debug('Job %s cannot use executor %s as it is below retry threshold (retry #: %s)', logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
build_uuid, executor_type, build_job.retries_remaining) build_uuid, executor.name, executor.minimum_retry_threshold,
build_job.retries_remaining)
continue continue
logger.debug('Starting builder for job %s with selected executor: %s', build_uuid, logger.debug('Starting builder for job %s with selected executor: %s', build_uuid,
executor_type) executor.name)
try: try:
builder_id = yield From(executor.start_builder(realm, token, build_uuid)) execution_id = yield From(executor.start_builder(realm, token, build_uuid))
except: except:
logger.exception('Exception when starting builder for job: %s', build_uuid) logger.exception('Exception when starting builder for job: %s', build_uuid)
continue continue
try: try:
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count') metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid])
except: except:
logger.exception('Exception when writing start metrics for builder %s for job %s', logger.exception('Exception when writing start metrics for execution %s for job %s',
builder_id, build_uuid) execution_id, build_uuid)
started_with_executor = executor started_with_executor = executor
@ -403,24 +427,19 @@ class EphemeralBuilderManager(BaseManager):
logger.error('Could not start ephemeral worker for build %s', build_uuid) logger.error('Could not start ephemeral worker for build %s', build_uuid)
raise Return(False, self._ephemeral_api_timeout) raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started builder with ID %s for job: %s with executor: %s', builder_id, build_uuid, logger.debug('Started execution with ID %s for job: %s with executor: %s',
started_with_executor.__class__.__name__) execution_id, build_uuid, started_with_executor.name)
# Store the builder in etcd associated with the job id
try:
payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
ttl=setup_time))
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
# Store the realm spec which will allow any manager to accept this builder when it connects # Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({ realm_spec = json.dumps({
'realm': realm, 'realm': realm,
'token': token, 'token': token,
'builder_id': builder_id, 'execution_id': execution_id,
'executor_name': started_with_executor.name,
'job_queue_item': build_job.job_item, 'job_queue_item': build_job.job_item,
# TODO: remove this back-compat field once we finish the rollout.
'builder_id': execution_id,
}) })
try: try:
@ -434,22 +453,27 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time) raise Return(False, setup_time)
self._job_to_executor[builder_id] = started_with_executor
logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid, logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid,
started_with_executor.__class__.__name__, builder_id) started_with_executor.name, execution_id)
raise Return(True, None) raise Return(True, None)
@coroutine @coroutine
def build_component_ready(self, build_component): def build_component_ready(self, build_component):
try: try:
# Clean up the bookkeeping for allowing any manager to take the job # Pop off the job for the component. We do so before we send out the etcd watch below,
# as it will also remove this mapping.
job = self._component_to_job.pop(build_component) job = self._component_to_job.pop(build_component)
del self._job_uuid_to_component[job.job_details['build_uuid']] if job is None:
logger.error('Could not find job for the build component on realm %s',
build_component.builder_realm)
return
# Clean up the bookkeeping for allowing any manager to take the job.
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm))) yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
# Start the build job.
logger.debug('Sending build %s to newly ready component on realm %s', logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm) job.build_uuid, build_component.builder_realm)
yield From(build_component.start_build(job)) yield From(build_component.start_build(job))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
logger.warning('Builder is asking for more work, but work already completed') logger.warning('Builder is asking for more work, but work already completed')
@ -460,21 +484,46 @@ class EphemeralBuilderManager(BaseManager):
@coroutine @coroutine
def job_completed(self, build_job, job_status, build_component): def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status) logger.debug('Calling job_completed for job %s with status: %s',
build_job.build_uuid, job_status)
# Kill the ephmeral builder # Mark the job as completed.
builder_id = self._component_to_builder.pop(build_component) self.job_complete_callback(build_job, job_status)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
del self._job_to_executor[builder_id]
# Release the lock in etcd # Kill the ephmeral builder.
yield From(self.kill_builder_executor(build_job.build_uuid))
# Delete the build job from etcd.
job_key = self._etcd_job_key(build_job) job_key = self._etcd_job_key(build_job)
try: try:
yield From(self._etcd_client.delete(job_key)) yield From(self._etcd_client.delete(job_key))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
logger.debug('Builder is asking for job to be removed, but work already completed') logger.debug('Builder is asking for job to be removed, but work already completed')
self.job_complete_callback(build_job, job_status) logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status)
@coroutine
def kill_builder_executor(self, build_uuid):
logger.info('Starting termination of executor for job %s', build_uuid)
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is None:
logger.error('Could not find build information for build %s', build_uuid)
return
# Remove the build's component.
self._component_to_job.pop(build_info.component, None)
# Stop the build node/executor itself.
executor = self._executor_name_to_executor.get(build_info.executor_name)
if executor is None:
logger.error('Could not find registered executor %s for build %s',
build_info.executor_name, build_uuid)
return
# Terminate the executor's execution.
logger.info('Terminating executor for job %s with execution id %s',
build_uuid, build_info.execution_id)
yield From(executor.stop_builder(build_info.execution_id))
@coroutine @coroutine
def job_heartbeat(self, build_job): def job_heartbeat(self, build_job):
@ -484,7 +533,7 @@ class EphemeralBuilderManager(BaseManager):
try: try:
build_job_metadata_response = yield From(self._etcd_client.read(job_key)) build_job_metadata_response = yield From(self._etcd_client.read(job_key))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
logger.info('Job %s no longer exists in etcd', build_job.job_details['build_uuid']) logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
return return
build_job_metadata = json.loads(build_job_metadata_response.value) build_job_metadata = json.loads(build_job_metadata_response.value)
@ -498,25 +547,18 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(new_expiration.timetuple()), 'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item, 'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'], 'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True, 'had_heartbeat': True,
} }
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
self.job_heartbeat_callback(build_job) self.job_heartbeat_callback(build_job)
def _etcd_job_key(self, build_job): def _etcd_job_key(self, build_job):
""" Create a key which is used to track a job in etcd. """ Create a key which is used to track a job in etcd.
""" """
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid'])
def _etcd_lock_key(self, unique_lock_id):
""" Create a key which is used to create a temporary lock in etcd.
"""
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
def _etcd_realm_key(self, realm): def _etcd_realm_key(self, realm):
""" Create a key which is used to track an incoming connection on a realm. """ Create a key which is used to track an incoming connection on a realm.
@ -526,4 +568,4 @@ class EphemeralBuilderManager(BaseManager):
def num_workers(self): def num_workers(self):
""" Return the number of workers we're managing locally. """ Return the number of workers we're managing locally.
""" """
return len(self._component_to_builder) return len(self._component_to_job)

View file

@ -48,6 +48,11 @@ class BuilderExecutor(object):
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws' default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme) self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
@property
def name(self):
""" Name returns the unique name for this executor. """
return self.executor_config.get('NAME') or self.__class__.__name__
@coroutine @coroutine
def start_builder(self, realm, token, build_uuid): def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage """ Create a builder with the specified config. Returns a unique id which can be used to manage

View file

@ -3,13 +3,16 @@ import etcd
import time import time
import json import json
import uuid import uuid
import os
from trollius import coroutine, get_event_loop, From, Future, Return from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock from mock import Mock, ANY
from threading import Event
from buildman.manager.executor import BuilderExecutor, ExecutorException from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
@ -27,11 +30,16 @@ def async_test(f):
class TestExecutor(BuilderExecutor): class TestExecutor(BuilderExecutor):
job_started = None job_started = None
job_stopped = None
@coroutine @coroutine
def start_builder(self, realm, token, build_uuid): def start_builder(self, realm, token, build_uuid):
self.job_started = True self.job_started = str(uuid.uuid4())
raise Return(str(uuid.uuid4)) raise Return(self.job_started)
@coroutine
def stop_builder(self, execution_id):
self.job_stopped = execution_id
@ -41,23 +49,22 @@ class BadExecutor(BuilderExecutor):
raise ExecutorException('raised on purpose!') raise ExecutorException('raised on purpose!')
class TestEphemeral(unittest.TestCase): class EphemeralBuilderTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.etcd_client_mock = None self.etcd_client_mock = None
self.etcd_wait_event = Event() super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs): def _create_mock_etcd_client(self, *args, **kwargs):
def hang_until_event(*args, **kwargs): def create_future(*args, **kwargs):
time.sleep(.01) # 10ms to simulate network latency return Future()
self.etcd_wait_event.wait()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
self.etcd_client_mock.read = Mock(side_effect=KeyError) self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.write = Mock() self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
return self.etcd_client_mock self.etcd_client_mock.watch = Mock(side_effect=create_future)
self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))
return (self.etcd_client_mock, None)
def _create_completed_future(self, result=None): def _create_completed_future(self, result=None):
def inner(*args, **kwargs): def inner(*args, **kwargs):
@ -66,6 +73,16 @@ class TestEphemeral(unittest.TestCase):
return new_future return new_future
return inner return inner
def setUp(self):
self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)
def tearDown(self):
EphemeralBuilderManager.EXECUTORS = self._existing_executors
@coroutine
def _register_component(self, realm_spec, build_component, token):
raise Return('hello')
def _create_build_job(self, namespace='namespace', retries=3): def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock() mock_job = Mock()
mock_job.job_details = { mock_job.job_details = {
@ -78,15 +95,38 @@ class TestEphemeral(unittest.TestCase):
mock_job.namespace = namespace mock_job.namespace = namespace
mock_job.retries_remaining = retries mock_job.retries_remaining = retries
mock_job.build_uuid = BUILD_UUID
return mock_job return mock_job
class TestEphemeralLifecycle(EphemeralBuilderTestCase):
""" Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """
def __init__(self, *args, **kwargs):
super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
self.etcd_client_mock = None
self.test_executor = None
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
self.test_executor.name = 'MockExecutor'
self.test_executor.minimum_retry_threshold = 0
return self.test_executor
def setUp(self): def setUp(self):
self._existing_executors = dict(EXECUTORS) super(TestEphemeralLifecycle, self).setUp()
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock() self.register_component_callback = Mock()
self.unregister_component_callback = Mock() self.unregister_component_callback = Mock()
@ -100,17 +140,231 @@ class TestEphemeral(unittest.TestCase):
self.job_complete_callback, self.job_complete_callback,
'127.0.0.1', '127.0.0.1',
30, 30,
etcd_creator=self._create_mock_etcd_client,
)
self.manager.initialize({'EXECUTOR': 'test'})
# Test that we are watching the realm and jobs key once initialized.
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
super(TestEphemeralLifecycle, self).tearDown()
self.manager.shutdown()
@coroutine
def _setup_job_for_managers(self):
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
self.etcd_client_mock.write.reset()
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
# Ensure the job and realm were added to etcd.
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0)
realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1])
realm_data['realm'] = REALM_ID
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
# Fire off a realm changed with the same data.
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps(realm_data)
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
# Ensure that we have at least one component node.
self.assertEquals(1, self.manager.num_workers())
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
# Ensure that the executor kills the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
@async_test
def test_expiring_worker_not_started(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
# Since the realm was never registered, expiration should do nothing.
yield From(self.manager._handle_job_expiration(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_started(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_job_expiration(set_result)
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))
yield From(self.manager.job_heartbeat(self.mock_job))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
self.assertTrue(job_key_data['had_heartbeat'])
self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])
class TestEphemeral(EphemeralBuilderTestCase):
""" Simple unit tests for the ephemeral builder around config management, starting and stopping
jobs.
"""
def setUp(self):
super(TestEphemeral, self).setUp()
unregister_component_callback = Mock()
job_heartbeat_callback = Mock()
job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
) )
def tearDown(self): def tearDown(self):
self.etcd_wait_event.set() super(TestEphemeral, self).tearDown()
self.manager.shutdown() self.manager.shutdown()
EXECUTORS = self._existing_executors
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
def test_verify_executor_oldconfig(self): def test_verify_executor_oldconfig(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTOR': 'test', 'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42) 'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
@ -119,9 +373,10 @@ class TestEphemeral(unittest.TestCase):
# Ensure that we have a single test executor. # Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)
def test_verify_executor_newconfig(self): def test_verify_executor_newconfig(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTORS': [{ 'EXECUTORS': [{
'EXECUTOR': 'test', 'EXECUTOR': 'test',
@ -133,17 +388,41 @@ class TestEphemeral(unittest.TestCase):
self.assertEquals(1, len(self.manager.registered_executors)) self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
def test_multiple_executors_samename(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
with self.assertRaises(Exception):
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'primary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
def test_verify_multiple_executors(self): def test_verify_multiple_executors(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EXECUTORS['anotherexecutor'] = TestExecutor EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTORS': [ 'EXECUTORS': [
{ {
'NAME': 'primary',
'EXECUTOR': 'test', 'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42 'MINIMUM_RETRY_THRESHOLD': 42
}, },
{ {
'NAME': 'secondary',
'EXECUTOR': 'anotherexecutor', 'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24 'MINIMUM_RETRY_THRESHOLD': 24
}, },
@ -169,7 +448,7 @@ class TestEphemeral(unittest.TestCase):
@async_test @async_test
def test_schedule_job_namespace_filter(self): def test_schedule_job_namespace_filter(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTORS': [{ 'EXECUTORS': [{
'EXECUTOR': 'test', 'EXECUTOR': 'test',
@ -189,7 +468,7 @@ class TestEphemeral(unittest.TestCase):
@async_test @async_test
def test_schedule_job_retries_filter(self): def test_schedule_job_retries_filter(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTORS': [{ 'EXECUTORS': [{
'EXECUTOR': 'test', 'EXECUTOR': 'test',
@ -210,17 +489,19 @@ class TestEphemeral(unittest.TestCase):
@async_test @async_test
def test_schedule_job_executor_fallback(self): def test_schedule_job_executor_fallback(self):
EXECUTORS['primary'] = TestExecutor EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
EXECUTORS['secondary'] = TestExecutor EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTORS': [ 'EXECUTORS': [
{ {
'NAME': 'primary',
'EXECUTOR': 'primary', 'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'], 'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3, 'MINIMUM_RETRY_THRESHOLD': 3,
}, },
{ {
'NAME': 'secondary',
'EXECUTOR': 'secondary', 'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2, 'MINIMUM_RETRY_THRESHOLD': 2,
}, },
@ -274,7 +555,7 @@ class TestEphemeral(unittest.TestCase):
@async_test @async_test
def test_schedule_job_single_executor(self): def test_schedule_job_single_executor(self):
EXECUTORS['test'] = TestExecutor EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTOR': 'test', 'EXECUTOR': 'test',
@ -299,7 +580,7 @@ class TestEphemeral(unittest.TestCase):
@async_test @async_test
def test_executor_exception(self): def test_executor_exception(self):
EXECUTORS['bad'] = BadExecutor EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor
self.manager.initialize({ self.manager.initialize({
'EXECUTOR': 'bad', 'EXECUTOR': 'bad',
@ -311,6 +592,38 @@ class TestEphemeral(unittest.TestCase):
self.assertFalse(result[0]) self.assertFalse(result[0])
@async_test
def test_schedule_and_stop(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
# Start the build job.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
executor = self.manager.registered_executors[0]
self.assertIsNotNone(executor.job_started)
# Register the realm so the build information is added.
yield From(self.manager._register_realm({
'realm': str(uuid.uuid4()),
'token': str(uuid.uuid4()),
'execution_id': executor.job_started,
'executor_name': 'TestExecutor',
'build_uuid': build_job.build_uuid,
'job_queue_item': build_job.job_item,
}))
# Stop the build job.
yield From(self.manager.kill_builder_executor(build_job.build_uuid))
self.assertEquals(executor.job_stopped, executor.job_started)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()