Switch a few of the buildman methods to coroutines in order to support network calls in methods. Add a test for the ephemeral build manager.

This commit is contained in:
Jake Moshenko 2014-12-22 12:14:16 -05:00
parent a280bbcb6d
commit 12ee8e0fc0
11 changed files with 233 additions and 52 deletions

27
buildman/asyncutil.py Normal file
View file

@ -0,0 +1,27 @@
from functools import partial, wraps
from trollius import get_event_loop
class AsyncWrapper(object):
""" Wrapper class which will transform a syncronous library to one that can be used with
trollius coroutines.
"""
def __init__(self, delegate, loop=None, executor=None):
self._loop = loop if loop is not None else get_event_loop()
self._delegate = delegate
self._executor = executor
def __getattr__(self, attrib):
delegate_attr = getattr(self._delegate, attrib)
if not callable(delegate_attr):
return delegate_attr
def wrapper(*args, **kwargs):
""" Wraps the delegate_attr with primitives that will transform sync calls to ones shelled
out to a thread pool.
"""
callable_delegate_attr = partial(delegate_attr, *args, **kwargs)
return self._loop.run_in_executor(self._executor, callable_delegate_attr)
return wrapper

View file

@ -6,7 +6,6 @@ import trollius
import re
from autobahn.wamp.exception import ApplicationError
from trollius.coroutines import From
from buildman.server import BuildJobResult
from buildman.component.basecomponent import BaseComponent
@ -54,10 +53,10 @@ class BuildComponent(BaseComponent):
def onJoin(self, details):
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
yield trollius.From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield trollius.From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
self._set_status(ComponentStatus.WAITING)
@ -270,9 +269,10 @@ class BuildComponent(BaseComponent):
else:
self._build_finished(BuildJobResult.ERROR)
@trollius.coroutine
def _build_finished(self, job_status):
""" Alerts the parent that a build has completed and sets the status back to running. """
self.parent_manager.job_completed(self._current_job, job_status, self)
yield trollius.From(self.parent_manager.job_completed(self._current_job, job_status, self))
self._current_job = None
# Set the component back to a running state.
@ -313,7 +313,7 @@ class BuildComponent(BaseComponent):
def _on_heartbeat(self):
""" Updates the last known heartbeat. """
self._last_heartbeat = datetime.datetime.now()
self._last_heartbeat = datetime.datetime.utcnow()
@trollius.coroutine
def _heartbeat(self):
@ -321,7 +321,7 @@ class BuildComponent(BaseComponent):
and updating the heartbeat in the build status dictionary (if applicable). This allows
the build system to catch crashes from either end.
"""
yield From(trollius.sleep(INITIAL_TIMEOUT))
yield trollius.From(trollius.sleep(INITIAL_TIMEOUT))
while True:
# If the component is no longer running or actively building, nothing more to do.
@ -335,7 +335,6 @@ class BuildComponent(BaseComponent):
with build_status as status_dict:
status_dict['heartbeat'] = int(time.time())
# Mark the build item.
current_job = self._current_job
if current_job is not None:
@ -343,11 +342,12 @@ class BuildComponent(BaseComponent):
# Check the heartbeat from the worker.
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
if self._last_heartbeat and self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
if (self._last_heartbeat and
self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA):
self._timeout()
return
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT))
def _timeout(self):
self._set_status(ComponentStatus.TIMED_OUT)

View file

@ -1,3 +1,5 @@
from trollius import coroutine
class BaseManager(object):
""" Base for all worker managers. """
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
@ -26,6 +28,7 @@ class BaseManager(object):
"""
raise NotImplementedError
@coroutine
def schedule(self, build_job, loop):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled
and False if all workers are busy.
@ -48,8 +51,11 @@ class BaseManager(object):
"""
raise NotImplementedError
@coroutine
def job_completed(self, build_job, job_status, build_component):
""" Method invoked once a job_item has completed, in some manner. The job_status will be
one of: incomplete, error, complete. If incomplete, the job should be requeued.
one of: incomplete, error, complete. Implementations of this method should call
self.job_complete_callback with a status of Incomplete if they wish for the job to be
automatically requeued.
"""
raise NotImplementedError

View file

@ -5,7 +5,7 @@ from buildman.component.basecomponent import BaseComponent
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.basemanager import BaseManager
from trollius.coroutines import From
from trollius.coroutines import From, Return, coroutine
REGISTRATION_REALM = 'registration'
logger = logging.getLogger(__name__)
@ -50,14 +50,15 @@ class EnterpriseManager(BaseManager):
self.register_component(realm, BuildComponent, token="")
return realm
@coroutine
def schedule(self, build_job, loop):
""" Schedules a build for an Enterprise Registry. """
if self.shutting_down or not self.ready_components:
return False
raise Return(False)
component = self.ready_components.pop()
loop.call_soon(component.start_build, build_job)
return True
raise Return(True)
def build_component_ready(self, build_component, loop):
self.ready_components.add(build_component)
@ -65,6 +66,7 @@ class EnterpriseManager(BaseManager):
def shutdown(self):
self.shutting_down = True
@coroutine
def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(build_job, job_status)

View file

@ -1,12 +1,15 @@
import logging
import etcd
import uuid
import calendar
from datetime import datetime, timedelta
from trollius import From, coroutine, Return
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
from buildman.asyncutil import AsyncWrapper
logger = logging.getLogger(__name__)
@ -32,6 +35,13 @@ class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
shutting_down = False
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
_etcd_client_klass = etcd.Client
def __init__(self, *args, **kwargs):
self._manager_config = None
self._etcd_client = None
@ -39,10 +49,6 @@ class EphemeralBuilderManager(BaseManager):
self._component_to_job = {}
self._component_to_builder = {}
self._executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
self._executor = None
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
@ -58,9 +64,8 @@ class EphemeralBuilderManager(BaseManager):
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
self._etcd_client = etcd.Client(host=etcd_host, port=etcd_port)
clear_etcd(self._etcd_client)
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port))
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
@ -71,13 +76,14 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Calling shutdown.')
raise NotImplementedError
@coroutine
def schedule(self, build_job, loop):
logger.debug('Calling schedule with job: %s', build_job.repo_build.uuid)
logger.debug('Calling schedule with job: %s', build_job.job_details['build_uuid'])
# Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 2)
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
try:
building = self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True)
building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True))
workers_alive = sum(1 for child in building.children if not child.dir)
except KeyError:
workers_alive = 0
@ -87,7 +93,7 @@ class EphemeralBuilderManager(BaseManager):
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
allowed_worker_count)
return False
raise Return(False)
job_key = self._etcd_job_key(build_job)
@ -97,28 +103,33 @@ class EphemeralBuilderManager(BaseManager):
expiration = datetime.utcnow() + timedelta(seconds=self.setup_time())
payload = {
'expiration': expiration.isoformat(),
'expiration': calendar.timegm(expiration.timetuple()),
}
try:
self._etcd_client.write(job_key, payload, prevExist=False)
yield From(self._etcd_client.write(job_key, payload, prevExist=False))
component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job
except KeyError:
# The job was already taken by someone else, we are probably a retry
logger.warning('Job already exists in etcd, did an old worker die?')
return False
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
raise Return(False)
builder_id = self._executor.start_builder(realm, token)
logger.debug('Starting builder with executor: %s', self._executor)
builder_id = yield From(self._executor.start_builder(realm, token))
self._component_to_builder[component] = builder_id
return True
# Store the builder in etcd associated with the job id
payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, payload, prevExist=True))
raise Return(True)
def build_component_ready(self, build_component, loop):
try:
job = self._component_to_job.pop(build_component)
logger.debug('Sending build %s to newly ready component on realm %s', job.repo_build.uuid,
build_component.builder_realm)
logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm)
loop.call_soon(build_component.start_build, job)
except KeyError:
logger.warning('Builder is asking for more work, but work already completed')
@ -126,6 +137,7 @@ class EphemeralBuilderManager(BaseManager):
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
@coroutine
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
@ -134,12 +146,24 @@ class EphemeralBuilderManager(BaseManager):
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
self._etcd_client.delete(job_key)
yield From(self._etcd_client.delete(job_key))
self.job_complete_callback(build_job, job_status)
@coroutine
def _clean_up_old_builder(self, job_key, job_payload):
""" Terminate an old builders once the expiration date has passed.
"""
logger.debug('Cleaning up the old builder for job: %s', job_key)
if 'builder_id' in job_payload:
logger.info('Terminating expired build node.')
yield From(self._executor.stop_builder(job_payload['builder_id']))
yield From(self._etcd_client.delete(job_key))
@staticmethod
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
"""
return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.repo_build.uuid)
return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])

View file

@ -7,6 +7,10 @@ import requests
import cachetools
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
from functools import partial
from buildman.asyncutil import AsyncWrapper
logger = logging.getLogger(__name__)
@ -32,12 +36,14 @@ class BuilderExecutor(object):
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
@coroutine
def start_builder(self, realm, token):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
@coroutine
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
@ -74,14 +80,18 @@ class EC2Executor(BuilderExecutor):
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def __init__(self, *args, **kwargs):
self._loop = get_event_loop()
super(EC2Executor, self).__init__(*args, **kwargs)
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return boto.ec2.connect_to_region(
return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
)
))
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
@ -92,25 +102,24 @@ class EC2Executor(BuilderExecutor):
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
@coroutine
def start_builder(self, realm, token):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
coreos_ami = self._get_coreos_ami(region, channel)
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip)
logger.debug('Generated cloud config: %s', user_data)
ec2_conn = self._get_conn()
# class FakeReservation(object):
# def __init__(self):
# self.instances = None
# reservation = FakeReservation()
reservation = ec2_conn.run_instances(
reservation = yield ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
security_groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
)
if not reservation.instances:
@ -124,12 +133,13 @@ class EC2Executor(BuilderExecutor):
'Realm': realm,
'Token': token,
})
return launched.id
raise Return(launched.id)
@coroutine
def stop_builder(self, builder_id):
ec2_conn = self._get_conn()
stopped_instance_ids = [si.id for si in ec2_conn.stop_instances([builder_id], force=True)]
if builder_id not in stopped_instance_ids:
stopped_instances = yield ec2_conn.stop_instances([builder_id], force=True)
if builder_id not in [si.id for si in stopped_instances]:
raise ExecutorException('Unable to stop instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
@ -142,6 +152,7 @@ class PopenExecutor(BuilderExecutor):
""" Executor which uses Popen to fork a quay-builder process.
"""
@coroutine
def start_builder(self, realm, token):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
@ -162,9 +173,9 @@ class PopenExecutor(BuilderExecutor):
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
return builder_id
raise Return(builder_id)
@coroutine
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')

View file

@ -154,7 +154,8 @@ class BuilderServer(object):
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(build_job, self._loop):
scheduled = yield From(self._lifecycle_manager.schedule(build_job, self._loop))
if scheduled:
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:

View file

@ -29,10 +29,10 @@ coreos:
After=docker.service
[Service]
Restart=always
TimeoutStartSec=600
TimeoutStopSec=2000
ExecStartPre=/usr/bin/sudo /bin/sh -xc "echo '{{ manager_ip }} buildman.quay.io' >> /etc/hosts; exit 0"
ExecStartPre=/usr/bin/docker login -u {{ quay_username }} -p {{ quay_password }} -e unused quay.io
ExecStart=/usr/bin/docker run --rm --net=host --name quay-builder --privileged --env-file /root/overrides.list -v /var/run/docker.sock:/var/run/docker.sock quay.io/coreos/registry-build-worker:latest
ExecStop=/usr/bin/docker stop quay-builder
ExecStopPost=/usr/bin/sudo /bin/sh -xc "/bin/sleep 600; /sbin/shutown -h now"

View file

@ -72,8 +72,8 @@ def build_status_view(build_obj, can_write=False):
# minutes. If not, then the build timed out.
if phase != database.BUILD_PHASE.COMPLETE and phase != database.BUILD_PHASE.ERROR:
if status is not None and 'heartbeat' in status and status['heartbeat']:
heartbeat = datetime.datetime.fromtimestamp(status['heartbeat'])
if datetime.datetime.now() - heartbeat > datetime.timedelta(minutes=1):
heartbeat = datetime.datetime.utcfromtimestamp(status['heartbeat'])
if datetime.datetime.utcnow() - heartbeat > datetime.timedelta(minutes=1):
phase = database.BUILD_PHASE.INTERNAL_ERROR
logger.debug('Can write: %s job_config: %s', can_write, build_obj.job_config)

View file

@ -43,3 +43,4 @@ git+https://github.com/DevTable/avatar-generator.git
gipc
python-etcd
cachetools
mock

109
test/test_buildman.py Normal file
View file

@ -0,0 +1,109 @@
import unittest
import etcd
from trollius import coroutine, get_event_loop, From, Future
from mock import Mock
from functools import partial
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, ETCD_BUILDER_PREFIX
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
future = coro(*args, **kwargs)
loop = get_event_loop()
loop.run_until_complete(future)
return wrapper
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
return self.etcd_client_mock
def _create_mock_executor(self, *args, **kwargs):
def create_completed_future(result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=create_completed_future())
return self.test_executor
def _create_build_job(self):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
return mock_job
def setUp(self):
EphemeralBuilderManager._executors['test'] = self._create_mock_executor
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.register_component_callback = Mock()
self.uniregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.uniregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1'
)
self.manager.initialize({'EXECUTOR': 'test'})
def tearDown(self):
del EphemeralBuilderManager._executors['test']
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@async_test
def test_schedule_and_complete(self):
mock_job = self._create_build_job()
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = BuildComponent(None)
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
loop = get_event_loop()
is_scheduled = yield From(self.manager.schedule(mock_job, loop))
self.assertTrue(is_scheduled)
job_key = ETCD_BUILDER_PREFIX + mock_job.job_details['build_uuid']
self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
self.assertEqual(len(self.test_executor.start_builder.call_args_list), 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], job_key)
self.assertEqual(len(self.register_component_callback.call_args_list), 1)
yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(len(self.test_executor.stop_builder.call_args_list), 1)
self.etcd_client_mock.delete.assert_called_once_with(job_key)