Generalize the ephemeral build managers so that any manager may manage a builder spawned by any other manager.

This commit is contained in:
Jake Moshenko 2014-12-31 11:33:56 -05:00
parent ccb19571d6
commit cc70225043
11 changed files with 258 additions and 125 deletions

View file

@ -41,8 +41,11 @@ def run_build_manager():
if manager_klass is None:
return
public_ip = os.environ.get('PUBLIC_IP', '127.0.0.1')
logger.debug('Will pass public IP address %s to builders for websocket connection', public_ip)
manager_hostname = os.environ.get('BUILDMAN_HOSTNAME',
app.config.get('BUILDMAN_HOSTNAME',
app.config['SERVER_HOSTNAME']))
logger.debug('Will pass buildman hostname %s to builders for websocket connection',
manager_hostname)
logger.debug('Starting build manager with lifecycle "%s"', build_manager_config[0])
ssl_context = None
@ -53,7 +56,7 @@ def run_build_manager():
os.path.join(os.environ.get('SSL_CONFIG'), 'ssl.key'))
server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs,
user_files, manager_klass, build_manager_config[1], public_ip)
user_files, manager_klass, build_manager_config[1], manager_hostname)
server.run('0.0.0.0', ssl=ssl_context)
if __name__ == '__main__':

View file

@ -9,6 +9,7 @@ from autobahn.wamp.exception import ApplicationError
from buildman.server import BuildJobResult
from buildman.component.basecomponent import BaseComponent
from buildman.jobutil.buildjob import BuildJobLoadException
from buildman.jobutil.buildpack import BuildPackage, BuildPackageException
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.workererror import WorkerError
@ -58,19 +59,20 @@ class BuildComponent(BaseComponent):
yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
self._set_status(ComponentStatus.WAITING)
yield trollius.From(self._set_status(ComponentStatus.WAITING))
def is_ready(self):
""" Determines whether a build component is ready to begin a build. """
return self._component_status == ComponentStatus.RUNNING
@trollius.coroutine
def start_build(self, build_job):
""" Starts a build. """
self._current_job = build_job
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
self._image_info = {}
self._set_status(ComponentStatus.BUILDING)
yield trollius.From(self._set_status(ComponentStatus.BUILDING))
# Retrieve the job's buildpack.
buildpack_url = self.user_files.get_file_url(build_job.repo_build.resource_key,
@ -82,23 +84,27 @@ class BuildComponent(BaseComponent):
buildpack = BuildPackage.from_url(buildpack_url)
except BuildPackageException as bpe:
self._build_failure('Could not retrieve build package', bpe)
return
raise trollius.Return()
# Extract the base image information from the Dockerfile.
parsed_dockerfile = None
logger.debug('Parsing dockerfile')
try:
build_config = build_job.build_config
except BuildJobLoadException as irbe:
self._build_failure('Could not load build job information', irbe)
try:
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
except BuildPackageException as bpe:
self._build_failure('Could not find Dockerfile in build package', bpe)
return
raise trollius.Return()
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_failure('Missing FROM line in Dockerfile')
return
raise trollius.Return()
base_image_information = {
'repository': image_and_tag_tuple[0],
@ -147,9 +153,7 @@ class BuildComponent(BaseComponent):
logger.debug('Invoking build: %s', self.builder_realm)
logger.debug('With Arguments: %s', build_arguments)
return (self
.call("io.quay.builder.build", **build_arguments)
.add_done_callback(self._build_complete))
self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete)
@staticmethod
def _total_completion(statuses, total_images):
@ -276,38 +280,42 @@ class BuildComponent(BaseComponent):
self._current_job = None
# Set the component back to a running state.
self._set_status(ComponentStatus.RUNNING)
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
@staticmethod
def _ping():
""" Ping pong. """
return 'pong'
@trollius.coroutine
def _on_ready(self, token, version):
if not version in SUPPORTED_WORKER_VERSIONS:
logger.warning('Build component (token "%s") is running an out-of-date version: %s', version)
return False
logger.warning('Build component (token "%s") is running an out-of-date version: %s', token,
version)
raise trollius.Return(False)
if self._component_status != 'waiting':
logger.warning('Build component (token "%s") is already connected', self.expected_token)
return False
raise trollius.Return(False)
if token != self.expected_token:
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token)
return False
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token,
token)
raise trollius.Return(False)
self._set_status(ComponentStatus.RUNNING)
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
# Start the heartbeat check and updating loop.
loop = trollius.get_event_loop()
loop.create_task(self._heartbeat())
logger.debug('Build worker %s is connected and ready', self.builder_realm)
return True
raise trollius.Return(True)
@trollius.coroutine
def _set_status(self, phase):
if phase == ComponentStatus.RUNNING:
loop = trollius.get_event_loop()
self.parent_manager.build_component_ready(self, loop)
yield trollius.From(self.parent_manager.build_component_ready(self, loop))
self._component_status = phase
@ -344,13 +352,14 @@ class BuildComponent(BaseComponent):
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
if (self._last_heartbeat and
self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA):
self._timeout()
yield trollius.From(self._timeout())
return
yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT))
@trollius.coroutine
def _timeout(self):
self._set_status(ComponentStatus.TIMED_OUT)
yield trollius.From(self._set_status(ComponentStatus.TIMED_OUT))
logger.warning('Build component with realm %s has timed out', self.builder_realm)
self._dispose(timed_out=True)

View file

@ -1,6 +1,9 @@
import json
from cachetools import lru_cache
from data import model
import json
class BuildJobLoadException(Exception):
""" Exception raised if a build job could not be instantiated for some reason. """
@ -18,14 +21,22 @@ class BuildJob(object):
'Could not parse build queue item config with ID %s' % self.job_details['build_uuid']
)
@lru_cache(maxsize=1)
def _load_repo_build(self):
try:
self.repo_build = model.get_repository_build(self.job_details['build_uuid'])
return model.get_repository_build(self.job_details['build_uuid'])
except model.InvalidRepositoryBuildException:
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self.job_details['build_uuid'])
@property
def repo_build(self):
return self._load_repo_build()
@property
def build_config(self):
try:
self.build_config = json.loads(self.repo_build.job_config)
return json.loads(self.repo_build.job_config)
except ValueError:
raise BuildJobLoadException(
'Could not parse repository build job config with ID %s' % self.job_details['build_uuid']

View file

@ -3,12 +3,12 @@ from trollius import coroutine
class BaseManager(object):
""" Base for all worker managers. """
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
job_complete_callback, public_ip_address, heartbeat_period_sec):
job_complete_callback, manager_hostname, heartbeat_period_sec):
self.register_component = register_component
self.unregister_component = unregister_component
self.job_heartbeat_callback = job_heartbeat_callback
self.job_complete_callback = job_complete_callback
self.public_ip_address = public_ip_address
self.manager_hostname = manager_hostname
self.heartbeat_period_sec = heartbeat_period_sec
@coroutine
@ -31,7 +31,7 @@ class BaseManager(object):
raise NotImplementedError
@coroutine
def schedule(self, build_job, loop):
def schedule(self, build_job):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled
and False if all workers are busy.
"""
@ -42,7 +42,8 @@ class BaseManager(object):
"""
raise NotImplementedError
def build_component_ready(self, build_component, loop):
@coroutine
def build_component_ready(self, build_component):
""" Method invoked whenever a build component announces itself as ready.
"""
raise NotImplementedError

View file

@ -5,7 +5,7 @@ from buildman.component.basecomponent import BaseComponent
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.basemanager import BaseManager
from trollius.coroutines import From, Return, coroutine
from trollius import From, Return, coroutine, async
REGISTRATION_REALM = 'registration'
logger = logging.getLogger(__name__)
@ -51,16 +51,19 @@ class EnterpriseManager(BaseManager):
return realm
@coroutine
def schedule(self, build_job, loop):
def schedule(self, build_job):
""" Schedules a build for an Enterprise Registry. """
if self.shutting_down or not self.ready_components:
raise Return(False)
component = self.ready_components.pop()
loop.call_soon(component.start_build, build_job)
yield From(component.start_build(build_job))
raise Return(True)
def build_component_ready(self, build_component, loop):
@coroutine
def build_component_ready(self, build_component):
self.ready_components.add(build_component)
def shutdown(self):

View file

@ -13,16 +13,28 @@ from urllib3.exceptions import ReadTimeoutError
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper
from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
ETCD_BUILDER_PREFIX = 'building/'
ETCD_EXPIRE_RESULT = 'expire'
ETCD_REALM_PREFIX = 'realm/'
ETCD_DISABLE_TIMEOUT = 0
class EtcdAction(object):
GET = 'get'
SET = 'set'
EXPIRE = 'expire'
UPDATE = 'update'
DELETE = 'delete'
CREATE = 'create'
COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete'
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
@ -41,29 +53,22 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_client = None
self._component_to_job = {}
self._job_uuid_to_component = {}
self._component_to_builder = {}
self._executor = None
self._worker_watch_task = None
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def _watch_builders(self):
""" Watch the builders key for expirations.
"""
if not self._shutting_down:
workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True,
timeout=ETCD_DISABLE_TIMEOUT)
workers_future.add_done_callback(self._handle_key_expiration)
logger.debug('Scheduling watch task.')
self._worker_watch_task = async(workers_future)
def _watch_etcd(self, etcd_key, change_callback, recursive=True):
watch_task_key = (etcd_key, recursive)
def callback_wrapper(changed_key_future):
def _handle_key_expiration(self, changed_key_future):
""" Handle when a builder expires
"""
if self._worker_watch_task is None or self._worker_watch_task.done():
self._watch_builders()
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback)
if changed_key_future.cancelled():
# Due to lack of interest, tomorrow has been cancelled
@ -74,19 +79,56 @@ class EphemeralBuilderManager(BaseManager):
except ReadTimeoutError:
return
if etcd_result.action == ETCD_EXPIRE_RESULT:
change_callback(etcd_result)
if not self._shutting_down:
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive,
timeout=ETCD_DISABLE_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '')
self._watch_tasks[watch_task_key] = async(watch_future)
def _handle_builder_expiration(self, etcd_result):
if etcd_result.action == EtcdAction.EXPIRE:
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value)
async(self._clean_up_old_builder(etcd_result.key, job_metadata))
def _handle_realm_change(self, etcd_result):
if etcd_result.action == EtcdAction.SET:
# We must listen on the realm created by ourselves or another worker
realm_spec = json.loads(etcd_result.value)
component = self.register_component(realm_spec['realm'], BuildComponent,
token=realm_spec['token'])
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
# We must stop listening for new connections on the specified realm, if we did not get the
# connection
realm_spec = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None)
if component is not None:
# We were not the manager which the worker connected to, remove the bookkeeping for it
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm'])
del self._component_to_job[component]
del self._component_to_builder[component]
self.unregister_component(component)
else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.public_ip_address)
self.manager_hostname)
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
@ -97,7 +139,8 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port),
executor=self._async_thread_executor)
self._watch_builders()
self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration)
self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change)
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
@ -108,17 +151,17 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Shutting down worker.')
self._shutting_down = True
if self._worker_watch_task is not None:
logger.debug('Canceling watch task.')
self._worker_watch_task.cancel()
self._worker_watch_task = None
for (etcd_key, _), task in self._watch_tasks.items():
if not task.done():
logger.debug('Canceling watch task for %s', etcd_key)
task.cancel()
if self._async_thread_executor is not None:
logger.debug('Shutting down thread pool executor.')
self._async_thread_executor.shutdown()
@coroutine
def schedule(self, build_job, loop):
def schedule(self, build_job):
logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid'])
# Check if there are worker slots avialable by checking the number of jobs in etcd
@ -154,8 +197,6 @@ class EphemeralBuilderManager(BaseManager):
try:
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl))
component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job
except KeyError:
# The job was already taken by someone else, we are probably a retry
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
@ -163,20 +204,38 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Starting builder with executor: %s', self._executor)
builder_id = yield From(self._executor.start_builder(realm, token))
self._component_to_builder[component] = builder_id
# Store the builder in etcd associated with the job id
payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl))
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
})
try:
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
ttl=ttl))
except KeyError:
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.')
raise Return(False)
raise Return(True)
def build_component_ready(self, build_component, loop):
@coroutine
def build_component_ready(self, build_component):
try:
# Clean up the bookkeeping for allowing any manager to take the job
job = self._component_to_job.pop(build_component)
del self._job_uuid_to_component[job.job_details['build_uuid']]
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm)
loop.call_soon(build_component.start_build, job)
yield From(build_component.start_build(job))
except KeyError:
logger.warning('Builder is asking for more work, but work already completed')
@ -240,6 +299,12 @@ class EphemeralBuilderManager(BaseManager):
"""
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
@staticmethod
def _etcd_realm_key(realm):
""" Create a key which is used to track an incoming connection on a realm.
"""
return os.path.join(ETCD_REALM_PREFIX, realm)
def num_workers(self):
""" Return the number of workers we're managing locally.
"""

View file

@ -29,9 +29,9 @@ class ExecutorException(Exception):
class BuilderExecutor(object):
def __init__(self, executor_config, manager_public_ip):
def __init__(self, executor_config, manager_hostname):
self.executor_config = executor_config
self.manager_public_ip = manager_public_ip
self.manager_hostname = manager_hostname
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
@ -52,7 +52,7 @@ class BuilderExecutor(object):
def get_manager_websocket_url(self):
return 'ws://{0}:'
def generate_cloud_config(self, realm, token, coreos_channel, manager_ip,
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None, etcd_token=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
@ -69,7 +69,7 @@ class BuilderExecutor(object):
quay_username=quay_username,
quay_password=quay_password,
etcd_token=etcd_token,
manager_ip=manager_ip,
manager_hostname=manager_hostname,
coreos_channel=coreos_channel,
)
@ -108,7 +108,7 @@ class EC2Executor(BuilderExecutor):
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip)
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
logger.debug('Generated cloud config: %s', user_data)
@ -155,10 +155,10 @@ class EC2Executor(BuilderExecutor):
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_public_ip):
def __init__(self, executor_config, manager_hostname):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_public_ip)
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
""" Executor which uses Popen to fork a quay-builder process.
"""

View file

@ -37,7 +37,7 @@ class BuilderServer(object):
controller.
"""
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_public_ip):
lifecycle_manager_config, manager_hostname):
self._loop = None
self._current_status = 'starting'
self._current_components = []
@ -53,7 +53,7 @@ class BuilderServer(object):
self._unregister_component,
self._job_heartbeat,
self._job_complete,
manager_public_ip,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
)
self._lifecycle_manager_config = lifecycle_manager_config
@ -158,7 +158,7 @@ class BuilderServer(object):
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.')
scheduled = yield From(self._lifecycle_manager.schedule(build_job, self._loop))
scheduled = yield From(self._lifecycle_manager.schedule(build_job))
if scheduled:
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
@ -168,7 +168,6 @@ class BuilderServer(object):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host, ssl=None):
self._loop = loop

View file

@ -6,7 +6,7 @@ write_files:
content: |
REALM={{ realm }}
TOKEN={{ token }}
ENDPOINT=wss://buildman.quay.io:8787
SERVER=wss://{{ manager_hostname }}
coreos:
update:
@ -31,7 +31,6 @@ coreos:
[Service]
TimeoutStartSec=600
TimeoutStopSec=2000
ExecStartPre=/bin/sh -xc "echo '{{ manager_ip }} buildman.quay.io' >> /etc/hosts; exit 0"
ExecStartPre=/usr/bin/docker login -u {{ quay_username }} -p {{ quay_password }} -e unused quay.io
ExecStart=/usr/bin/docker run --rm --net=host --name quay-builder --privileged --env-file /root/overrides.list -v /var/run/docker.sock:/var/run/docker.sock quay.io/coreos/registry-build-worker:latest
ExecStop=/usr/bin/docker stop quay-builder

View file

@ -78,7 +78,8 @@ class WorkQueue(object):
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes.
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()

View file

@ -4,19 +4,20 @@ import os.path
import time
import json
from trollius import coroutine, get_event_loop, From, Future, sleep
from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock
from threading import Event
from urllib3.exceptions import ReadTimeoutError
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
ETCD_EXPIRE_RESULT)
ETCD_REALM_PREFIX, EtcdAction)
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'
def async_test(f):
@ -43,17 +44,17 @@ class TestEphemeral(unittest.TestCase):
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
return self.etcd_client_mock
def _create_mock_executor(self, *args, **kwargs):
def create_completed_future(result=None):
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=create_completed_future())
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
return self.test_executor
def _create_build_job(self):
@ -61,6 +62,10 @@ class TestEphemeral(unittest.TestCase):
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
mock_job.job_item = {
'body': json.dumps(mock_job.job_details),
'id': 1,
}
return mock_job
def setUp(self):
@ -71,13 +76,13 @@ class TestEphemeral(unittest.TestCase):
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
self.uniregister_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.uniregister_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
@ -97,15 +102,19 @@ class TestEphemeral(unittest.TestCase):
del EphemeralBuilderManager._executors['test']
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@async_test
def test_schedule_and_complete(self):
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call(ETCD_REALM_PREFIX, recursive=True, timeout=0)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = BuildComponent(None)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
loop = get_event_loop()
is_scheduled = yield From(self.manager.schedule(self.mock_job, loop))
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
@ -114,29 +123,76 @@ class TestEphemeral(unittest.TestCase):
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.SET
realm_created.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID)
realm_created.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join(ETCD_REALM_PREFIX, REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
@async_test
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True,
timeout=0)
self.etcd_client_mock.watch.assert_any_call(ETCD_BUILDER_PREFIX, recursive=True, timeout=0)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = ETCD_EXPIRE_RESULT
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
expired_future = Future()
expired_future.set_result(expired_result)
self.manager._handle_key_expiration(expired_future)
self.manager._handle_builder_expiration(expired_result)
yield From(sleep(.01))
@ -151,10 +207,8 @@ class TestEphemeral(unittest.TestCase):
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
set_future = Future()
set_future.set_result(set_result)
self.manager._handle_key_expiration(set_future)
self.manager._handle_builder_expiration(set_result)
yield From(sleep(.01))
@ -179,15 +233,3 @@ class TestEphemeral(unittest.TestCase):
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
@async_test
def test_etcd_read_timeout(self):
# Send a signal to the callback that a worker key has been changed
read_timeout_future = Future()
read_timeout_future.set_exception(ReadTimeoutError(None, None, None))
self.manager._handle_key_expiration(read_timeout_future)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)