Bug fixes, refactoring and "new" tests for the build manager

- Fixes various bugs introduced in the most recent build system commit
- Refactors state management in the build manager to be cleaner and more contained
- Adds back in the mock-based tests, fixed to not use threads and adjusted for the refactoring
- Adds some more simplified unit tests around non-etch related flows
This commit is contained in:
Joseph Schorr 2016-07-15 18:28:48 -04:00
parent 9f6b47ad1f
commit 2c1880b944
4 changed files with 503 additions and 145 deletions

View file

@ -57,10 +57,15 @@ class BuildJob(object):
@lru_cache(maxsize=1)
def _load_repo_build(self):
try:
return model.build.get_repository_build(self.job_details['build_uuid'])
return model.build.get_repository_build(self.build_uuid)
except model.InvalidRepositoryBuildException:
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self.job_details['build_uuid'])
'Could not load repository build with ID %s' % self.build_uuid)
@property
def build_uuid(self):
""" Returns the unique UUID for this build job. """
return self.job_details['build_uuid']
@property
def namespace(self):

View file

@ -5,6 +5,7 @@ import calendar
import os.path
import json
from collections import namedtuple
from datetime import datetime, timedelta
from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor
@ -28,12 +29,6 @@ RETRY_IMMEDIATELY_TIMEOUT = 0
NO_WORKER_AVAILABLE_TIMEOUT = 10
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
class EtcdAction(object):
GET = 'get'
SET = 'set'
@ -44,13 +39,28 @@ class EtcdAction(object):
COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete'
BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name'])
def createAsyncEtcdClient(worker_threads=1, **kwargs):
client = etcd.Client(**kwargs)
async_executor = ThreadPoolExecutor(worker_threads)
return AsyncWrapper(client, executor=async_executor), async_executor
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
_etcd_client_klass = etcd.Client
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
def __init__(self, *args, **kwargs):
self._etcd_client_creator = kwargs.pop('etcd_creator', createAsyncEtcdClient)
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
self._shutting_down = False
self._manager_config = None
@ -58,22 +68,24 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_client = None
self._etcd_realm_prefix = None
self._etcd_builder_prefix = None
self._etcd_job_prefix = None
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {}
self._job_uuid_to_component = {}
self._component_to_builder = {}
self._job_to_executor = {}
# The registered executors available for running jobs, in order.
self._ordered_executors = []
self._executors = []
# The registered executors, mapped by their unique name.
self._executor_name_to_executor = {}
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
# Map from builder component to its associated job.
self._component_to_job = {}
# Map from build UUID to a BuildInfo tuple with information about the build.
self._build_uuid_to_info = {}
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
restarter=None):
@ -129,7 +141,6 @@ class EphemeralBuilderManager(BaseManager):
if not self._shutting_down:
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
'*' if recursive else '', start_index)
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
timeout=ETCD_MAX_WATCH_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
@ -137,7 +148,8 @@ class EphemeralBuilderManager(BaseManager):
self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine
def _handle_builder_expiration(self, etcd_result):
def _handle_job_expiration(self, etcd_result):
""" Handler invoked whenever a job expires in etcd. """
if etcd_result is None:
return
@ -145,26 +157,26 @@ class EphemeralBuilderManager(BaseManager):
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
build_info = self._build_uuid_to_info.get(build_job.build_uuid)
if build_info is None:
logger.error('Could not find build info for job %s under etcd expire with metadata: %s',
build_job.build_uuid, job_metadata)
return
if 'builder_id' in job_metadata:
builder_id = job_metadata['builder_id']
execution_id = build_info.execution_id
# Before we delete the build node, we take a lock to make sure that only one manager
# can terminate the node.
try:
lock_key = self._etcd_lock_key(builder_id)
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
return
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
# the job as incomplete here.
if not job_metadata.get('had_heartbeat', True):
logger.warning('Build executor failed to successfully boot with execution id %s',
execution_id)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
if not job_metadata.get('had_heartbeat', True):
logger.warning('Build node failed to successfully boot: %s', builder_id)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
# Finally, we terminate the build execution for the job.
logger.info('Terminating expired build executor for job %s with execution id %s',
build_job.build_uuid, execution_id)
yield From(self.kill_builder_executor(build_job.build_uuid))
def _handle_realm_change(self, etcd_result):
if etcd_result is None:
@ -180,13 +192,14 @@ class EphemeralBuilderManager(BaseManager):
# connection
realm_spec = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None)
if component is not None:
build_uuid = build_job.build_uuid
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is not None:
# We were not the manager which the worker connected to, remove the bookkeeping for it
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm'])
del self._component_to_job[component]
del self._component_to_builder[component]
self.unregister_component(component)
logger.debug('Unregistering unused component for build %s', build_uuid)
self._component_to_job.pop(build_info.component, None)
self.unregister_component(build_info.component)
else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
@ -200,15 +213,21 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
return component
# Create the build information block for the registered realm.
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
execution_id = realm_spec['execution_id']
executor_name = realm_spec['executor_name']
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
executor_name=executor_name)
self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
self._build_uuid_to_info[build_job.build_uuid] = build_info
return component
@property
def registered_executors(self):
return self._executors
return self._ordered_executors
@coroutine
def _register_existing_realms(self):
@ -223,22 +242,27 @@ class EphemeralBuilderManager(BaseManager):
encountered.add(component)
# Remove any components not encountered so we can clean up.
for found in list(self._component_to_job.keys()):
if not found in encountered:
self._component_to_job.pop(component)
self._component_to_builder.pop(component)
for component, job in list(self._component_to_job.items()):
if not component in encountered:
self._component_to_job.pop(component, None)
self._build_uuid_to_info.pop(job.build_uuid, None)
except (KeyError, etcd.EtcdKeyError):
# no realms have been registered yet
pass
def _load_executor(self, executor_class_name, executor_config):
executor_klass = EXECUTORS.get(executor_class_name)
def _load_executor(self, executor_kind_name, executor_config):
executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_class_name)
logger.error('Unknown executor %s; skipping install', executor_kind_name)
return
self._executors.append(executor_klass(executor_config, self.manager_hostname))
executor = executor_klass(executor_config, self.manager_hostname)
if executor.name in self._executor_name_to_executor:
raise Exception('Executor with name %s already registered' % executor.name)
self._ordered_executors.append(executor)
self._executor_name_to_executor[executor.name] = executor
def initialize(self, manager_config):
logger.debug('Calling initialize')
@ -265,21 +289,17 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
cert=etcd_auth, ca_cert=etcd_ca_cert,
protocol=etcd_protocol,
read_timeout=5),
executor=self._async_thread_executor)
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads,
host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert,
protocol=etcd_protocol, read_timeout=5)
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration)
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
@ -310,8 +330,8 @@ class EphemeralBuilderManager(BaseManager):
# Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
try:
building = yield From(self._etcd_client.read(self._etcd_builder_prefix, recursive=True))
workers_alive = sum(1 for child in building.children if not child.dir)
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
workers_alive = sum(1 for child in active_jobs.children if not child.dir)
except (KeyError, etcd.EtcdKeyError):
workers_alive = 0
except etcd.EtcdException:
@ -359,40 +379,40 @@ class EphemeralBuilderManager(BaseManager):
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
started_with_executor = None
builder_id = None
execution_id = None
logger.debug("Registered executors are: %s", [ex.__class__.__name__ for ex in self._executors])
for executor in self._executors:
executor_type = executor.__class__.__name__
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
for executor in self._ordered_executors:
# Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor_type)
executor.name)
continue
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
logger.debug('Job %s cannot use executor %s as it is below retry threshold (retry #: %s)',
build_uuid, executor_type, build_job.retries_remaining)
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
build_uuid, executor.name, executor.minimum_retry_threshold,
build_job.retries_remaining)
continue
logger.debug('Starting builder for job %s with selected executor: %s', build_uuid,
executor_type)
executor.name)
try:
builder_id = yield From(executor.start_builder(realm, token, build_uuid))
execution_id = yield From(executor.start_builder(realm, token, build_uuid))
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
try:
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid])
except:
logger.exception('Exception when writing start metrics for builder %s for job %s',
builder_id, build_uuid)
logger.exception('Exception when writing start metrics for execution %s for job %s',
execution_id, build_uuid)
started_with_executor = executor
@ -403,23 +423,15 @@ class EphemeralBuilderManager(BaseManager):
logger.error('Could not start ephemeral worker for build %s', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started builder with ID %s for job: %s with executor: %s', builder_id, build_uuid,
started_with_executor.__class__.__name__)
# Store the builder in etcd associated with the job id
try:
payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
ttl=setup_time))
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started execution with ID %s for job: %s with executor: %s',
execution_id, build_uuid, started_with_executor.name)
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'builder_id': builder_id,
'execution_id': execution_id,
'executor_name': started_with_executor.name,
'job_queue_item': build_job.job_item,
})
@ -434,22 +446,27 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time)
self._job_to_executor[builder_id] = started_with_executor
logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid,
started_with_executor.__class__.__name__, builder_id)
started_with_executor.name, execution_id)
raise Return(True, None)
@coroutine
def build_component_ready(self, build_component):
try:
# Clean up the bookkeeping for allowing any manager to take the job
# Pop off the job for the component. We do so before we send out the etcd watch below,
# as it will also remove this mapping.
job = self._component_to_job.pop(build_component)
del self._job_uuid_to_component[job.job_details['build_uuid']]
if job is None:
logger.error('Could not find job for the build component on realm %s',
build_component.builder_realm)
return
# Clean up the bookkeeping for allowing any manager to take the job.
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
# Start the build job.
logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm)
job.build_uuid, build_component.builder_realm)
yield From(build_component.start_build(job))
except (KeyError, etcd.EtcdKeyError):
logger.warning('Builder is asking for more work, but work already completed')
@ -460,21 +477,46 @@ class EphemeralBuilderManager(BaseManager):
@coroutine
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
logger.debug('Calling job_completed for job %s with status: %s',
build_job.build_uuid, job_status)
# Kill the ephmeral builder
builder_id = self._component_to_builder.pop(build_component)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
del self._job_to_executor[builder_id]
# Mark the job as completed.
self.job_complete_callback(build_job, job_status)
# Release the lock in etcd
# Kill the ephmeral builder.
yield From(self.kill_builder_executor(build_job.build_uuid))
# Delete the build job from etcd.
job_key = self._etcd_job_key(build_job)
try:
yield From(self._etcd_client.delete(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Builder is asking for job to be removed, but work already completed')
self.job_complete_callback(build_job, job_status)
logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status)
@coroutine
def kill_builder_executor(self, build_uuid):
logger.info('Starting termination of executor for job %s', build_uuid)
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is None:
logger.error('Could not find build information for build %s', build_uuid)
return
# Remove the build's component.
self._component_to_job.pop(build_info.component, None)
# Stop the build node/executor itself.
executor = self._executor_name_to_executor.get(build_info.executor_name)
if executor is None:
logger.error('Could not find registered executor %s for build %s',
build_info.executor_name, build_uuid)
return
# Terminate the executor's execution.
logger.info('Terminating executor for job %s with execution id %s',
build_uuid, build_info.execution_id)
yield From(executor.stop_builder(build_info.execution_id))
@coroutine
def job_heartbeat(self, build_job):
@ -484,7 +526,7 @@ class EphemeralBuilderManager(BaseManager):
try:
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.info('Job %s no longer exists in etcd', build_job.job_details['build_uuid'])
logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
return
build_job_metadata = json.loads(build_job_metadata_response.value)
@ -498,25 +540,18 @@ class EphemeralBuilderManager(BaseManager):
payload = {
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
}
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
self.job_heartbeat_callback(build_job)
def _etcd_job_key(self, build_job):
""" Create a key which is used to track a job in etcd.
"""
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid'])
def _etcd_lock_key(self, unique_lock_id):
""" Create a key which is used to create a temporary lock in etcd.
"""
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid'])
def _etcd_realm_key(self, realm):
""" Create a key which is used to track an incoming connection on a realm.
@ -526,4 +561,4 @@ class EphemeralBuilderManager(BaseManager):
def num_workers(self):
""" Return the number of workers we're managing locally.
"""
return len(self._component_to_builder)
return len(self._component_to_job)

View file

@ -48,6 +48,11 @@ class BuilderExecutor(object):
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
@property
def name(self):
""" Name returns the unique name for this executor. """
return self.executor_config.get('NAME') or self.__class__.__name__
@coroutine
def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage

View file

@ -3,13 +3,16 @@ import etcd
import time
import json
import uuid
import os
from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock
from threading import Event
from mock import Mock, ANY
from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS
from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
@ -27,11 +30,16 @@ def async_test(f):
class TestExecutor(BuilderExecutor):
job_started = None
job_stopped = None
@coroutine
def start_builder(self, realm, token, build_uuid):
self.job_started = True
raise Return(str(uuid.uuid4))
self.job_started = str(uuid.uuid4())
raise Return(self.job_started)
@coroutine
def stop_builder(self, execution_id):
self.job_stopped = execution_id
@ -41,23 +49,22 @@ class BadExecutor(BuilderExecutor):
raise ExecutorException('raised on purpose!')
class TestEphemeral(unittest.TestCase):
class EphemeralBuilderTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
self.etcd_wait_event = Event()
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def hang_until_event(*args, **kwargs):
time.sleep(.01) # 10ms to simulate network latency
self.etcd_wait_event.wait()
def create_future(*args, **kwargs):
return Future()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.write = Mock()
return self.etcd_client_mock
self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
self.etcd_client_mock.watch = Mock(side_effect=create_future)
self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))
return (self.etcd_client_mock, None)
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
@ -66,6 +73,16 @@ class TestEphemeral(unittest.TestCase):
return new_future
return inner
def setUp(self):
self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)
def tearDown(self):
EphemeralBuilderManager.EXECUTORS = self._existing_executors
@coroutine
def _register_component(self, realm_spec, build_component, token):
raise Return('hello')
def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock()
mock_job.job_details = {
@ -78,15 +95,38 @@ class TestEphemeral(unittest.TestCase):
mock_job.namespace = namespace
mock_job.retries_remaining = retries
mock_job.build_uuid = BUILD_UUID
return mock_job
class TestEphemeralLifecycle(EphemeralBuilderTestCase):
""" Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """
def __init__(self, *args, **kwargs):
super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
self.etcd_client_mock = None
self.test_executor = None
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
self.test_executor.name = 'MockExecutor'
self.test_executor.minimum_retry_threshold = 0
return self.test_executor
def setUp(self):
self._existing_executors = dict(EXECUTORS)
super(TestEphemeralLifecycle, self).setUp()
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
@ -100,17 +140,231 @@ class TestEphemeral(unittest.TestCase):
self.job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
self.manager.initialize({'EXECUTOR': 'test'})
# Test that we are watching the realm and jobs key once initialized.
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
timeout=ETCD_MAX_WATCH_TIMEOUT)
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
super(TestEphemeralLifecycle, self).tearDown()
self.manager.shutdown()
@coroutine
def _setup_job_for_managers(self):
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
self.etcd_client_mock.write.reset()
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
# Ensure the job and realm were added to etcd.
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('realm/') == 0)
realm_data = json.loads(self.etcd_client_mock.write.call_args_list[1][0][1])
realm_data['realm'] = REALM_ID
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
# Fire off a realm changed with the same data.
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps(realm_data)
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
# Ensure that we have at least one component node.
self.assertEquals(1, self.manager.num_workers())
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
# Ensure that the executor kills the job.
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
@async_test
def test_expiring_worker_not_started(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
# Since the realm was never registered, expiration should do nothing.
yield From(self.manager._handle_job_expiration(expired_result))
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_expiring_worker_started(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': True,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_job_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('123')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_job_expiration(set_result)
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))
yield From(self.manager.job_heartbeat(self.mock_job))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
self.assertTrue(job_key_data['had_heartbeat'])
self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])
class TestEphemeral(EphemeralBuilderTestCase):
""" Simple unit tests for the ephemeral builder around config management, starting and stopping
jobs.
"""
def setUp(self):
super(TestEphemeral, self).setUp()
unregister_component_callback = Mock()
job_heartbeat_callback = Mock()
job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
def tearDown(self):
self.etcd_wait_event.set()
super(TestEphemeral, self).tearDown()
self.manager.shutdown()
EXECUTORS = self._existing_executors
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
def test_verify_executor_oldconfig(self):
EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
@ -119,9 +373,10 @@ class TestEphemeral(unittest.TestCase):
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)
def test_verify_executor_newconfig(self):
EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
@ -133,17 +388,41 @@ class TestEphemeral(unittest.TestCase):
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
def test_multiple_executors_samename(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
with self.assertRaises(Exception):
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'primary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
def test_verify_multiple_executors(self):
EXECUTORS['test'] = TestExecutor
EXECUTORS['anotherexecutor'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'NAME': 'secondary',
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
@ -169,7 +448,7 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_schedule_job_namespace_filter(self):
EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
@ -189,7 +468,7 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_schedule_job_retries_filter(self):
EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
@ -210,17 +489,19 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_schedule_job_executor_fallback(self):
EXECUTORS['primary'] = TestExecutor
EXECUTORS['secondary'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'NAME': 'primary',
'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3,
},
{
'NAME': 'secondary',
'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2,
},
@ -274,7 +555,7 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_schedule_job_single_executor(self):
EXECUTORS['test'] = TestExecutor
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
@ -299,7 +580,7 @@ class TestEphemeral(unittest.TestCase):
@async_test
def test_executor_exception(self):
EXECUTORS['bad'] = BadExecutor
EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor
self.manager.initialize({
'EXECUTOR': 'bad',
@ -311,6 +592,38 @@ class TestEphemeral(unittest.TestCase):
self.assertFalse(result[0])
@async_test
def test_schedule_and_stop(self):
EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': {},
})
# Start the build job.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
executor = self.manager.registered_executors[0]
self.assertIsNotNone(executor.job_started)
# Register the realm so the build information is added.
yield From(self.manager._register_realm({
'realm': str(uuid.uuid4()),
'token': str(uuid.uuid4()),
'execution_id': executor.job_started,
'executor_name': 'TestExecutor',
'build_uuid': build_job.build_uuid,
'job_queue_item': build_job.job_item,
}))
# Stop the build job.
yield From(self.manager.kill_builder_executor(build_job.build_uuid))
self.assertEquals(executor.job_stopped, executor.job_started)
if __name__ == '__main__':
unittest.main()