import logging import os import uuid import threading import boto.ec2 import requests import cachetools import trollius from jinja2 import FileSystemLoader, Environment from trollius import coroutine, From, Return, get_event_loop from functools import partial from buildman.asyncutil import AsyncWrapper from container_cloud_config import CloudConfigContext from app import metric_queue logger = logging.getLogger(__name__) ONE_HOUR = 60*60 _TAG_RETRY_COUNT = 3 # Number of times to retry adding tags. _TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries. ENV = Environment(loader=FileSystemLoader('buildman/templates')) TEMPLATE = ENV.get_template('cloudconfig.yaml') CloudConfigContext().populate_jinja_environment(ENV) class ExecutorException(Exception): """ Exception raised when there is a problem starting or stopping a builder. """ pass class BuilderExecutor(object): def __init__(self, executor_config, manager_hostname): self.executor_config = executor_config self.manager_hostname = manager_hostname """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for starting and stopping builders. """ @coroutine def start_builder(self, realm, token, build_uuid): """ Create a builder with the specified config. Returns a unique id which can be used to manage the builder. """ raise NotImplementedError @coroutine def stop_builder(self, builder_id): """ Stop a builder which is currently running. """ raise NotImplementedError def get_manager_websocket_url(self): return 'ws://{0}:' def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, quay_username=None, quay_password=None): if quay_username is None: quay_username = self.executor_config['QUAY_USERNAME'] if quay_password is None: quay_password = self.executor_config['QUAY_PASSWORD'] return TEMPLATE.render( realm=realm, token=token, quay_username=quay_username, quay_password=quay_password, manager_hostname=manager_hostname, coreos_channel=coreos_channel, worker_tag=self.executor_config['WORKER_TAG'], logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None), volume_size=self.executor_config.get('VOLUME_SIZE', '42G'), ) class EC2Executor(BuilderExecutor): """ Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud providers. """ COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt' def __init__(self, *args, **kwargs): self._loop = get_event_loop() super(EC2Executor, self).__init__(*args, **kwargs) def _get_conn(self): """ Creates an ec2 connection which can be used to manage instances. """ return AsyncWrapper(boto.ec2.connect_to_region( self.executor_config['EC2_REGION'], aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], )) @classmethod @cachetools.ttl_cache(ttl=ONE_HOUR) def _get_coreos_ami(cls, ec2_region, coreos_channel): """ Retrieve the CoreOS AMI id from the canonical listing. """ stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')]) return stack_amis[ec2_region] @coroutine def start_builder(self, realm, token, build_uuid): region = self.executor_config['EC2_REGION'] channel = self.executor_config.get('COREOS_CHANNEL', 'stable') get_ami_callable = partial(self._get_coreos_ami, region, channel) coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable)) user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname) logger.debug('Generated cloud config: %s', user_data) ec2_conn = self._get_conn() ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType( size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), volume_type='gp2', delete_on_termination=True, ) block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping() block_devices['/dev/xvda'] = ssd_root_ebs interfaces = None if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None: interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], associate_public_ip_address=True, ) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) try: reservation = yield From(ec2_conn.run_instances( coreos_ami, instance_type=self.executor_config['EC2_INSTANCE_TYPE'], key_name=self.executor_config.get('EC2_KEY_NAME', None), user_data=user_data, instance_initiated_shutdown_behavior='terminate', block_device_map=block_devices, network_interfaces=interfaces, )) except boto.exception.EC2ResponseError as ec2e: logger.exception('Unable to spawn builder instance') metric_queue.put('EC2BuildStartFailure', 1, unit='Count') raise ec2e if not reservation.instances: raise ExecutorException('Unable to spawn builder instance.') elif len(reservation.instances) != 1: raise ExecutorException('EC2 started wrong number of instances!') launched = AsyncWrapper(reservation.instances[0]) for i in range(0, _TAG_RETRY_COUNT): try: yield From(launched.add_tags({ 'Name': 'Quay Ephemeral Builder', 'Realm': realm, 'Token': token, 'BuildUUID': build_uuid, })) except boto.exception.EC2ResponseError as ec2e: if ec2e.error_code == 'InvalidInstanceID.NotFound': if i < _TAG_RETRY_COUNT - 1: logger.warning('Failed to write EC2 tags (attempt #%s)', i) yield From(trollius.sleep(_TAG_RETRY_SLEEP)) continue raise ExecutorException('Unable to find builder instance.') logger.exception('Failed to write EC2 tags (attempt #%s)', i) raise Return(launched.id) @coroutine def stop_builder(self, builder_id): try: ec2_conn = self._get_conn() terminated_instances = yield From(ec2_conn.terminate_instances([builder_id])) except boto.exception.EC2ResponseError as ec2e: if ec2e.error_code == 'InvalidInstanceID.NotFound': logger.debug('Instance %s already terminated', builder_id) return logger.exception('Exception when trying to terminate instance %s', builder_id) raise if builder_id not in [si.id for si in terminated_instances]: raise ExecutorException('Unable to terminate instance: %s' % builder_id) class PopenExecutor(BuilderExecutor): """ Implementation of BuilderExecutor which uses Popen to fork a quay-builder process. """ def __init__(self, executor_config, manager_hostname): self._jobs = {} super(PopenExecutor, self).__init__(executor_config, manager_hostname) """ Executor which uses Popen to fork a quay-builder process. """ @coroutine def start_builder(self, realm, token, build_uuid): # Now start a machine for this job, adding the machine id to the etcd information logger.debug('Forking process for build') import subprocess builder_env = { 'TOKEN': token, 'REALM': realm, 'ENDPOINT': 'ws://localhost:8787', 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), } logpipe = LogPipe(logging.INFO) spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe, env=builder_env) builder_id = str(uuid.uuid4()) self._jobs[builder_id] = (spawned, logpipe) logger.debug('Builder spawned with id: %s', builder_id) raise Return(builder_id) @coroutine def stop_builder(self, builder_id): if builder_id not in self._jobs: raise ExecutorException('Builder id not being tracked by executor.') logger.debug('Killing builder with id: %s', builder_id) spawned, logpipe = self._jobs[builder_id] if spawned.poll() is None: spawned.kill() logpipe.close() class LogPipe(threading.Thread): """ Adapted from http://codereview.stackexchange.com/a/17959 """ def __init__(self, level): """Setup the object with a logger and a loglevel and start the thread """ threading.Thread.__init__(self) self.daemon = False self.level = level self.fd_read, self.fd_write = os.pipe() self.pipe_reader = os.fdopen(self.fd_read) self.start() def fileno(self): """Return the write file descriptor of the pipe """ return self.fd_write def run(self): """Run the thread, logging everything. """ for line in iter(self.pipe_reader.readline, ''): logging.log(self.level, line.strip('\n')) self.pipe_reader.close() def close(self): """Close the write end of the pipe. """ os.close(self.fd_write)