First implementation of ephemeral build lifecycle manager.

This commit is contained in:
Jake Moshenko 2014-12-16 13:41:30 -05:00
parent 79b61e7709
commit 2d7e844753
10 changed files with 453 additions and 56 deletions

View file

@ -1,11 +1,12 @@
class BaseManager(object):
""" Base for all worker managers. """
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
job_complete_callback):
job_complete_callback, public_ip_address):
self.register_component = register_component
self.unregister_component = unregister_component
self.job_heartbeat_callback = job_heartbeat_callback
self.job_complete_callback = job_complete_callback
self.public_ip_address = public_ip_address
def job_heartbeat(self, build_job):
""" Method invoked to tell the manager that a job is still running. This method will be called
@ -31,11 +32,16 @@ class BaseManager(object):
"""
raise NotImplementedError
def initialize(self):
def initialize(self, manager_config):
""" Runs any initialization code for the manager. Called once the server is in a ready state.
"""
raise NotImplementedError
def build_component_ready(self, build_component, loop):
""" Method invoked whenever a build component announces itself as ready.
"""
raise NotImplementedError
def build_component_disposed(self, build_component, timed_out):
""" Method invoked whenever a build component has been disposed. The timed_out boolean indicates
whether the component's heartbeat timed out.

View file

@ -28,10 +28,12 @@ class DynamicRegistrationComponent(BaseComponent):
class EnterpriseManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
build_components = []
shutting_down = False
def initialize(self):
def __init__(self, *args, **kwargs):
self.ready_components = set()
self.shutting_down = False
def initialize(self, manager_config):
# Add a component which is used by build workers for dynamic registration. Unlike
# production, build workers in enterprise are long-lived and register dynamically.
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
@ -45,21 +47,20 @@ class EnterpriseManager(BaseManager):
""" Adds a new build component for an Enterprise Registry. """
# Generate a new unique realm ID for the build worker.
realm = str(uuid.uuid4())
component = self.register_component(realm, BuildComponent, token="")
self.build_components.append(component)
self.register_component(realm, BuildComponent, token="")
return realm
def schedule(self, build_job, loop):
""" Schedules a build for an Enterprise Registry. """
if self.shutting_down:
if self.shutting_down or not self.ready_components:
return False
for component in self.build_components:
if component.is_ready():
loop.call_soon(component.start_build, build_job)
return True
component = self.ready_components.pop()
loop.call_soon(component.start_build, build_job)
return True
return False
def build_component_ready(self, build_component, loop):
self.ready_components.add(build_component)
def shutdown(self):
self.shutting_down = True
@ -68,5 +69,6 @@ class EnterpriseManager(BaseManager):
self.job_complete_callback(build_job, job_status)
def build_component_disposed(self, build_component, timed_out):
self.build_components.remove(build_component)
if build_component in self.ready_components:
self.ready_components.remove(build_component)

View file

@ -0,0 +1,145 @@
import logging
import etcd
import uuid
from datetime import datetime, timedelta
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
logger = logging.getLogger(__name__)
ETCD_BUILDER_PREFIX = 'building/'
def clear_etcd(client):
""" Debugging method used to clear out the section of etcd we are using to track jobs in flight.
"""
try:
building = client.read(ETCD_BUILDER_PREFIX, recursive=True)
for child in building.leaves:
if not child.dir:
logger.warning('Deleting key: %s', child.key)
client.delete(child.key)
except KeyError:
pass
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
shutting_down = False
def __init__(self, *args, **kwargs):
self._manager_config = None
self._etcd_client = None
self._component_to_job = {}
self._component_to_builder = {}
self._executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
self._executor = None
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.public_ip_address)
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
self._etcd_client = etcd.Client(host=etcd_host, port=etcd_port)
clear_etcd(self._etcd_client)
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
logger.debug('Returning setup_time: %s', setup_time)
return setup_time
def shutdown(self):
logger.debug('Calling shutdown.')
raise NotImplementedError
def schedule(self, build_job, loop):
logger.debug('Calling schedule with job: %s', build_job.repo_build.uuid)
# Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 2)
try:
building = self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True)
workers_alive = sum(1 for child in building.children if not child.dir)
except KeyError:
workers_alive = 0
logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
allowed_worker_count)
return False
job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4())
token = str(uuid.uuid4())
expiration = datetime.utcnow() + timedelta(seconds=self.setup_time())
payload = {
'expiration': expiration.isoformat(),
}
try:
self._etcd_client.write(job_key, payload, prevExist=False)
component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job
except KeyError:
# The job was already taken by someone else, we are probably a retry
logger.warning('Job already exists in etcd, did an old worker die?')
return False
builder_id = self._executor.start_builder(realm, token)
self._component_to_builder[component] = builder_id
return True
def build_component_ready(self, build_component, loop):
try:
job = self._component_to_job.pop(build_component)
logger.debug('Sending build %s to newly ready component on realm %s', job.repo_build.uuid,
build_component.builder_realm)
loop.call_soon(build_component.start_build, job)
except KeyError:
logger.warning('Builder is asking for more work, but work already completed')
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
# Kill he ephmeral builder
self._executor.stop_builder(self._component_to_builder.pop(build_component))
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
self._etcd_client.delete(job_key)
self.job_complete_callback(build_job, job_status)
@staticmethod
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
"""
return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.repo_build.uuid)

View file

@ -0,0 +1,204 @@
import logging
import os
import uuid
import threading
import boto.ec2
import requests
import cachetools
from jinja2 import FileSystemLoader, Environment
logger = logging.getLogger(__name__)
ONE_HOUR = 60*60
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
TEMPLATE = ENV.get_template('cloudconfig.yaml')
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
pass
class BuilderExecutor(object):
def __init__(self, executor_config, manager_public_ip):
self.executor_config = executor_config
self.manager_public_ip = manager_public_ip
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
def start_builder(self, realm, token):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
raise NotImplementedError
def get_manager_websocket_url(self):
return 'ws://{0}:'
def generate_cloud_config(self, realm, token, coreos_channel, manager_ip,
quay_username=None, quay_password=None, etcd_token=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
if quay_password is None:
quay_password = self.executor_config['QUAY_PASSWORD']
if etcd_token is None:
etcd_token = self.executor_config['ETCD_DISCOVERY_TOKEN']
return TEMPLATE.render(
realm=realm,
token=token,
quay_username=quay_username,
quay_password=quay_password,
etcd_token=etcd_token,
manager_ip=manager_ip,
coreos_channel=coreos_channel,
)
class EC2Executor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
providers.
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
)
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
""" Retrieve the CoreOS AMI id from the canonical listing.
"""
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
def start_builder(self, realm, token):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
coreos_ami = self._get_coreos_ami(region, channel)
user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip)
logger.debug('Generated cloud config: %s', user_data)
ec2_conn = self._get_conn()
# class FakeReservation(object):
# def __init__(self):
# self.instances = None
# reservation = FakeReservation()
reservation = ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
security_groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
)
if not reservation.instances:
raise ExecutorException('Unable to spawn builder instance.')
elif len(reservation.instances) != 1:
raise ExecutorException('EC2 started wrong number of instances!')
return reservation.instances[0]
def stop_builder(self, builder_id):
ec2_conn = self._get_conn()
stopped_instances = ec2_conn.stop_instances([builder_id], force=True)
if builder_id not in stopped_instances:
raise ExecutorException('Unable to stop instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_public_ip):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_public_ip)
""" Executor which uses Popen to fork a quay-builder process.
"""
def start_builder(self, realm, token):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://localhost:8787',
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
env=builder_env)
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
return builder_id
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')
logger.debug('Killing builder with id: %s', builder_id)
spawned, logpipe = self._jobs[builder_id]
if spawned.poll() is None:
spawned.kill()
logpipe.close()
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""
def __init__(self, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fd_read, self.fd_write = os.pipe()
self.pipe_reader = os.fdopen(self.fd_read)
self.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fd_write
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipe_reader.readline, ''):
logging.log(self.level, line.strip('\n'))
self.pipe_reader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fd_write)