From 12ee8e0fc02a7abe1c0cf457698fa618a7a26a76 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 22 Dec 2014 12:14:16 -0500 Subject: [PATCH] Switch a few of the buildman methods to coroutines in order to support network calls in methods. Add a test for the ephemeral build manager. --- buildman/asyncutil.py | 27 +++++++ buildman/component/buildcomponent.py | 22 +++--- buildman/manager/basemanager.py | 8 +- buildman/manager/enterprise.py | 8 +- buildman/manager/ephemeral.py | 64 +++++++++++----- buildman/manager/executor.py | 37 +++++---- buildman/server.py | 3 +- buildman/templates/cloudconfig.yaml | 2 +- endpoints/api/build.py | 4 +- requirements-nover.txt | 1 + test/test_buildman.py | 109 +++++++++++++++++++++++++++ 11 files changed, 233 insertions(+), 52 deletions(-) create mode 100644 buildman/asyncutil.py create mode 100644 test/test_buildman.py diff --git a/buildman/asyncutil.py b/buildman/asyncutil.py new file mode 100644 index 000000000..4f2d4e1a9 --- /dev/null +++ b/buildman/asyncutil.py @@ -0,0 +1,27 @@ +from functools import partial, wraps +from trollius import get_event_loop + + +class AsyncWrapper(object): + """ Wrapper class which will transform a syncronous library to one that can be used with + trollius coroutines. + """ + def __init__(self, delegate, loop=None, executor=None): + self._loop = loop if loop is not None else get_event_loop() + self._delegate = delegate + self._executor = executor + + def __getattr__(self, attrib): + delegate_attr = getattr(self._delegate, attrib) + + if not callable(delegate_attr): + return delegate_attr + + def wrapper(*args, **kwargs): + """ Wraps the delegate_attr with primitives that will transform sync calls to ones shelled + out to a thread pool. + """ + callable_delegate_attr = partial(delegate_attr, *args, **kwargs) + return self._loop.run_in_executor(self._executor, callable_delegate_attr) + + return wrapper diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py index 05d342628..53b04bf87 100644 --- a/buildman/component/buildcomponent.py +++ b/buildman/component/buildcomponent.py @@ -6,7 +6,6 @@ import trollius import re from autobahn.wamp.exception import ApplicationError -from trollius.coroutines import From from buildman.server import BuildJobResult from buildman.component.basecomponent import BaseComponent @@ -54,10 +53,10 @@ class BuildComponent(BaseComponent): def onJoin(self, details): logger.debug('Registering methods and listeners for component %s', self.builder_realm) - yield From(self.register(self._on_ready, u'io.quay.buildworker.ready')) - yield From(self.register(self._ping, u'io.quay.buildworker.ping')) - yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) - yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) + yield trollius.From(self.register(self._on_ready, u'io.quay.buildworker.ready')) + yield trollius.From(self.register(self._ping, u'io.quay.buildworker.ping')) + yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) + yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) self._set_status(ComponentStatus.WAITING) @@ -270,9 +269,10 @@ class BuildComponent(BaseComponent): else: self._build_finished(BuildJobResult.ERROR) + @trollius.coroutine def _build_finished(self, job_status): """ Alerts the parent that a build has completed and sets the status back to running. """ - self.parent_manager.job_completed(self._current_job, job_status, self) + yield trollius.From(self.parent_manager.job_completed(self._current_job, job_status, self)) self._current_job = None # Set the component back to a running state. @@ -313,7 +313,7 @@ class BuildComponent(BaseComponent): def _on_heartbeat(self): """ Updates the last known heartbeat. """ - self._last_heartbeat = datetime.datetime.now() + self._last_heartbeat = datetime.datetime.utcnow() @trollius.coroutine def _heartbeat(self): @@ -321,7 +321,7 @@ class BuildComponent(BaseComponent): and updating the heartbeat in the build status dictionary (if applicable). This allows the build system to catch crashes from either end. """ - yield From(trollius.sleep(INITIAL_TIMEOUT)) + yield trollius.From(trollius.sleep(INITIAL_TIMEOUT)) while True: # If the component is no longer running or actively building, nothing more to do. @@ -335,7 +335,6 @@ class BuildComponent(BaseComponent): with build_status as status_dict: status_dict['heartbeat'] = int(time.time()) - # Mark the build item. current_job = self._current_job if current_job is not None: @@ -343,11 +342,12 @@ class BuildComponent(BaseComponent): # Check the heartbeat from the worker. logger.debug('Checking heartbeat on realm %s', self.builder_realm) - if self._last_heartbeat and self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA: + if (self._last_heartbeat and + self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA): self._timeout() return - yield From(trollius.sleep(HEARTBEAT_TIMEOUT)) + yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT)) def _timeout(self): self._set_status(ComponentStatus.TIMED_OUT) diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index f71971997..fc9fd70cf 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -1,3 +1,5 @@ +from trollius import coroutine + class BaseManager(object): """ Base for all worker managers. """ def __init__(self, register_component, unregister_component, job_heartbeat_callback, @@ -26,6 +28,7 @@ class BaseManager(object): """ raise NotImplementedError + @coroutine def schedule(self, build_job, loop): """ Schedules a queue item to be built. Returns True if the item was properly scheduled and False if all workers are busy. @@ -48,8 +51,11 @@ class BaseManager(object): """ raise NotImplementedError + @coroutine def job_completed(self, build_job, job_status, build_component): """ Method invoked once a job_item has completed, in some manner. The job_status will be - one of: incomplete, error, complete. If incomplete, the job should be requeued. + one of: incomplete, error, complete. Implementations of this method should call + self.job_complete_callback with a status of Incomplete if they wish for the job to be + automatically requeued. """ raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 1eedf2790..516464ff3 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -5,7 +5,7 @@ from buildman.component.basecomponent import BaseComponent from buildman.component.buildcomponent import BuildComponent from buildman.manager.basemanager import BaseManager -from trollius.coroutines import From +from trollius.coroutines import From, Return, coroutine REGISTRATION_REALM = 'registration' logger = logging.getLogger(__name__) @@ -50,14 +50,15 @@ class EnterpriseManager(BaseManager): self.register_component(realm, BuildComponent, token="") return realm + @coroutine def schedule(self, build_job, loop): """ Schedules a build for an Enterprise Registry. """ if self.shutting_down or not self.ready_components: - return False + raise Return(False) component = self.ready_components.pop() loop.call_soon(component.start_build, build_job) - return True + raise Return(True) def build_component_ready(self, build_component, loop): self.ready_components.add(build_component) @@ -65,6 +66,7 @@ class EnterpriseManager(BaseManager): def shutdown(self): self.shutting_down = True + @coroutine def job_completed(self, build_job, job_status, build_component): self.job_complete_callback(build_job, job_status) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 68af9de0e..ed2da908e 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -1,12 +1,15 @@ import logging import etcd import uuid +import calendar from datetime import datetime, timedelta +from trollius import From, coroutine, Return from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent +from buildman.asyncutil import AsyncWrapper logger = logging.getLogger(__name__) @@ -32,6 +35,13 @@ class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ shutting_down = False + _executors = { + 'popen': PopenExecutor, + 'ec2': EC2Executor, + } + + _etcd_client_klass = etcd.Client + def __init__(self, *args, **kwargs): self._manager_config = None self._etcd_client = None @@ -39,10 +49,6 @@ class EphemeralBuilderManager(BaseManager): self._component_to_job = {} self._component_to_builder = {} - self._executors = { - 'popen': PopenExecutor, - 'ec2': EC2Executor, - } self._executor = None super(EphemeralBuilderManager, self).__init__(*args, **kwargs) @@ -58,9 +64,8 @@ class EphemeralBuilderManager(BaseManager): etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1') etcd_port = self._manager_config.get('ETCD_PORT', 2379) logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port) - self._etcd_client = etcd.Client(host=etcd_host, port=etcd_port) - clear_etcd(self._etcd_client) + self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port)) def setup_time(self): setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) @@ -71,13 +76,14 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Calling shutdown.') raise NotImplementedError + @coroutine def schedule(self, build_job, loop): - logger.debug('Calling schedule with job: %s', build_job.repo_build.uuid) + logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid']) # Check if there are worker slots avialable by checking the number of jobs in etcd - allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 2) + allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1) try: - building = self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True) + building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True)) workers_alive = sum(1 for child in building.children if not child.dir) except KeyError: workers_alive = 0 @@ -87,7 +93,7 @@ class EphemeralBuilderManager(BaseManager): if workers_alive >= allowed_worker_count: logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, allowed_worker_count) - return False + raise Return(False) job_key = self._etcd_job_key(build_job) @@ -97,28 +103,33 @@ class EphemeralBuilderManager(BaseManager): expiration = datetime.utcnow() + timedelta(seconds=self.setup_time()) payload = { - 'expiration': expiration.isoformat(), + 'expiration': calendar.timegm(expiration.timetuple()), } try: - self._etcd_client.write(job_key, payload, prevExist=False) + yield From(self._etcd_client.write(job_key, payload, prevExist=False)) component = self.register_component(realm, BuildComponent, token=token) self._component_to_job[component] = build_job except KeyError: # The job was already taken by someone else, we are probably a retry - logger.warning('Job already exists in etcd, did an old worker die?') - return False + logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') + raise Return(False) - builder_id = self._executor.start_builder(realm, token) + logger.debug('Starting builder with executor: %s', self._executor) + builder_id = yield From(self._executor.start_builder(realm, token)) self._component_to_builder[component] = builder_id - return True + # Store the builder in etcd associated with the job id + payload['builder_id'] = builder_id + yield From(self._etcd_client.write(job_key, payload, prevExist=True)) + + raise Return(True) def build_component_ready(self, build_component, loop): try: job = self._component_to_job.pop(build_component) - logger.debug('Sending build %s to newly ready component on realm %s', job.repo_build.uuid, - build_component.builder_realm) + logger.debug('Sending build %s to newly ready component on realm %s', + job.job_details['build_uuid'], build_component.builder_realm) loop.call_soon(build_component.start_build, job) except KeyError: logger.warning('Builder is asking for more work, but work already completed') @@ -126,6 +137,7 @@ class EphemeralBuilderManager(BaseManager): def build_component_disposed(self, build_component, timed_out): logger.debug('Calling build_component_disposed.') + @coroutine def job_completed(self, build_job, job_status, build_component): logger.debug('Calling job_completed with status: %s', job_status) @@ -134,12 +146,24 @@ class EphemeralBuilderManager(BaseManager): # Release the lock in etcd job_key = self._etcd_job_key(build_job) - self._etcd_client.delete(job_key) + yield From(self._etcd_client.delete(job_key)) self.job_complete_callback(build_job, job_status) + @coroutine + def _clean_up_old_builder(self, job_key, job_payload): + """ Terminate an old builders once the expiration date has passed. + """ + logger.debug('Cleaning up the old builder for job: %s', job_key) + if 'builder_id' in job_payload: + logger.info('Terminating expired build node.') + yield From(self._executor.stop_builder(job_payload['builder_id'])) + + yield From(self._etcd_client.delete(job_key)) + + @staticmethod def _etcd_job_key(build_job): """ Create a key which is used to track a job in etcd. """ - return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.repo_build.uuid) + return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid']) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index b35a90c97..82b98ef5c 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -7,6 +7,10 @@ import requests import cachetools from jinja2 import FileSystemLoader, Environment +from trollius import coroutine, From, Return, get_event_loop +from functools import partial + +from buildman.asyncutil import AsyncWrapper logger = logging.getLogger(__name__) @@ -32,12 +36,14 @@ class BuilderExecutor(object): """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for starting and stopping builders. """ + @coroutine def start_builder(self, realm, token): """ Create a builder with the specified config. Returns a unique id which can be used to manage the builder. """ raise NotImplementedError + @coroutine def stop_builder(self, builder_id): """ Stop a builder which is currently running. """ @@ -74,14 +80,18 @@ class EC2Executor(BuilderExecutor): """ COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt' + def __init__(self, *args, **kwargs): + self._loop = get_event_loop() + super(EC2Executor, self).__init__(*args, **kwargs) + def _get_conn(self): """ Creates an ec2 connection which can be used to manage instances. """ - return boto.ec2.connect_to_region( + return AsyncWrapper(boto.ec2.connect_to_region( self.executor_config['EC2_REGION'], aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], - ) + )) @classmethod @cachetools.ttl_cache(ttl=ONE_HOUR) @@ -92,25 +102,24 @@ class EC2Executor(BuilderExecutor): stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')]) return stack_amis[ec2_region] + @coroutine def start_builder(self, realm, token): region = self.executor_config['EC2_REGION'] channel = self.executor_config.get('COREOS_CHANNEL', 'stable') - coreos_ami = self._get_coreos_ami(region, channel) + get_ami_callable = partial(self._get_coreos_ami, region, channel) + coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable)) user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip) logger.debug('Generated cloud config: %s', user_data) ec2_conn = self._get_conn() - # class FakeReservation(object): - # def __init__(self): - # self.instances = None - # reservation = FakeReservation() - reservation = ec2_conn.run_instances( + reservation = yield ec2_conn.run_instances( coreos_ami, instance_type=self.executor_config['EC2_INSTANCE_TYPE'], security_groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], key_name=self.executor_config.get('EC2_KEY_NAME', None), user_data=user_data, + instance_initiated_shutdown_behavior='terminate', ) if not reservation.instances: @@ -124,12 +133,13 @@ class EC2Executor(BuilderExecutor): 'Realm': realm, 'Token': token, }) - return launched.id + raise Return(launched.id) + @coroutine def stop_builder(self, builder_id): ec2_conn = self._get_conn() - stopped_instance_ids = [si.id for si in ec2_conn.stop_instances([builder_id], force=True)] - if builder_id not in stopped_instance_ids: + stopped_instances = yield ec2_conn.stop_instances([builder_id], force=True) + if builder_id not in [si.id for si in stopped_instances]: raise ExecutorException('Unable to stop instance: %s' % builder_id) class PopenExecutor(BuilderExecutor): @@ -142,6 +152,7 @@ class PopenExecutor(BuilderExecutor): """ Executor which uses Popen to fork a quay-builder process. """ + @coroutine def start_builder(self, realm, token): # Now start a machine for this job, adding the machine id to the etcd information logger.debug('Forking process for build') @@ -162,9 +173,9 @@ class PopenExecutor(BuilderExecutor): builder_id = str(uuid.uuid4()) self._jobs[builder_id] = (spawned, logpipe) logger.debug('Builder spawned with id: %s', builder_id) - return builder_id - + raise Return(builder_id) + @coroutine def stop_builder(self, builder_id): if builder_id not in self._jobs: raise ExecutorException('Builder id not being tracked by executor.') diff --git a/buildman/server.py b/buildman/server.py index 6f57b6627..66f0010b6 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -154,7 +154,8 @@ class BuilderServer(object): self._queue.incomplete(job_item, restore_retry=False) logger.debug('Build job found. Checking for an avaliable worker.') - if self._lifecycle_manager.schedule(build_job, self._loop): + scheduled = yield From(self._lifecycle_manager.schedule(build_job, self._loop)) + if scheduled: self._job_count = self._job_count + 1 logger.debug('Build job scheduled. Running: %s', self._job_count) else: diff --git a/buildman/templates/cloudconfig.yaml b/buildman/templates/cloudconfig.yaml index ca9c6c16a..e75ce5626 100644 --- a/buildman/templates/cloudconfig.yaml +++ b/buildman/templates/cloudconfig.yaml @@ -29,10 +29,10 @@ coreos: After=docker.service [Service] - Restart=always TimeoutStartSec=600 TimeoutStopSec=2000 ExecStartPre=/usr/bin/sudo /bin/sh -xc "echo '{{ manager_ip }} buildman.quay.io' >> /etc/hosts; exit 0" ExecStartPre=/usr/bin/docker login -u {{ quay_username }} -p {{ quay_password }} -e unused quay.io ExecStart=/usr/bin/docker run --rm --net=host --name quay-builder --privileged --env-file /root/overrides.list -v /var/run/docker.sock:/var/run/docker.sock quay.io/coreos/registry-build-worker:latest ExecStop=/usr/bin/docker stop quay-builder + ExecStopPost=/usr/bin/sudo /bin/sh -xc "/bin/sleep 600; /sbin/shutown -h now" diff --git a/endpoints/api/build.py b/endpoints/api/build.py index e7fdf2f11..506c250da 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -72,8 +72,8 @@ def build_status_view(build_obj, can_write=False): # minutes. If not, then the build timed out. if phase != database.BUILD_PHASE.COMPLETE and phase != database.BUILD_PHASE.ERROR: if status is not None and 'heartbeat' in status and status['heartbeat']: - heartbeat = datetime.datetime.fromtimestamp(status['heartbeat']) - if datetime.datetime.now() - heartbeat > datetime.timedelta(minutes=1): + heartbeat = datetime.datetime.utcfromtimestamp(status['heartbeat']) + if datetime.datetime.utcnow() - heartbeat > datetime.timedelta(minutes=1): phase = database.BUILD_PHASE.INTERNAL_ERROR logger.debug('Can write: %s job_config: %s', can_write, build_obj.job_config) diff --git a/requirements-nover.txt b/requirements-nover.txt index 51cd42e3c..2993895d7 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -43,3 +43,4 @@ git+https://github.com/DevTable/avatar-generator.git gipc python-etcd cachetools +mock diff --git a/test/test_buildman.py b/test/test_buildman.py new file mode 100644 index 000000000..0886b671a --- /dev/null +++ b/test/test_buildman.py @@ -0,0 +1,109 @@ +import unittest +import etcd + +from trollius import coroutine, get_event_loop, From, Future +from mock import Mock +from functools import partial + +from buildman.manager.executor import BuilderExecutor +from buildman.manager.ephemeral import EphemeralBuilderManager, ETCD_BUILDER_PREFIX +from buildman.server import BuildJobResult +from buildman.component.buildcomponent import BuildComponent + + +BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' + + +import logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +def async_test(f): + def wrapper(*args, **kwargs): + coro = coroutine(f) + future = coro(*args, **kwargs) + loop = get_event_loop() + loop.run_until_complete(future) + return wrapper + +class TestEphemeral(unittest.TestCase): + def __init__(self, *args, **kwargs): + self.etcd_client_mock = None + self.test_executor = None + super(TestEphemeral, self).__init__(*args, **kwargs) + + def _create_mock_etcd_client(self, *args, **kwargs): + self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') + return self.etcd_client_mock + + def _create_mock_executor(self, *args, **kwargs): + def create_completed_future(result=None): + def inner(*args, **kwargs): + new_future = Future() + new_future.set_result(result) + return new_future + return inner + + self.test_executor = Mock(spec=BuilderExecutor) + self.test_executor.start_builder = Mock(side_effect=create_completed_future('123')) + self.test_executor.stop_builder = Mock(side_effect=create_completed_future()) + return self.test_executor + + def _create_build_job(self): + mock_job = Mock() + mock_job.job_details = { + 'build_uuid': BUILD_UUID, + } + return mock_job + + def setUp(self): + EphemeralBuilderManager._executors['test'] = self._create_mock_executor + + self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass + EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client + + self.register_component_callback = Mock() + self.uniregister_component_callback = Mock() + self.job_heartbeat_callback = Mock() + self.job_complete_callback = Mock() + + self.manager = EphemeralBuilderManager( + self.register_component_callback, + self.uniregister_component_callback, + self.job_heartbeat_callback, + self.job_complete_callback, + '127.0.0.1' + ) + + self.manager.initialize({'EXECUTOR': 'test'}) + + def tearDown(self): + del EphemeralBuilderManager._executors['test'] + EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass + + @async_test + def test_schedule_and_complete(self): + mock_job = self._create_build_job() + + self.etcd_client_mock.read = Mock(side_effect=KeyError) + test_component = BuildComponent(None) + self.register_component_callback.return_value = test_component + + # Ask for a builder to be scheduled + loop = get_event_loop() + is_scheduled = yield From(self.manager.schedule(mock_job, loop)) + + self.assertTrue(is_scheduled) + + job_key = ETCD_BUILDER_PREFIX + mock_job.job_details['build_uuid'] + self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True) + self.assertEqual(len(self.test_executor.start_builder.call_args_list), 1) + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], job_key) + self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], job_key) + + self.assertEqual(len(self.register_component_callback.call_args_list), 1) + + yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component)) + + self.assertEqual(len(self.test_executor.stop_builder.call_args_list), 1) + self.etcd_client_mock.delete.assert_called_once_with(job_key)