This means you can use legacy networking machines by simply changing the instance type and removing the specified 'EC2_VPC_SUBNET_ID' from the executor config.
		
			
				
	
	
		
			256 lines
		
	
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			256 lines
		
	
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import os
 | |
| import uuid
 | |
| import threading
 | |
| import boto.ec2
 | |
| import requests
 | |
| import cachetools
 | |
| 
 | |
| from jinja2 import FileSystemLoader, Environment
 | |
| from trollius import coroutine, From, Return, get_event_loop
 | |
| from functools import partial
 | |
| 
 | |
| from buildman.asyncutil import AsyncWrapper
 | |
| from container_cloud_config import CloudConfigContext
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| ONE_HOUR = 60*60
 | |
| 
 | |
| ENV = Environment(loader=FileSystemLoader('buildman/templates'))
 | |
| TEMPLATE = ENV.get_template('cloudconfig.yaml')
 | |
| CloudConfigContext().populate_jinja_environment(ENV)
 | |
| 
 | |
| class ExecutorException(Exception):
 | |
|   """ Exception raised when there is a problem starting or stopping a builder.
 | |
|   """
 | |
|   pass
 | |
| 
 | |
| 
 | |
| class BuilderExecutor(object):
 | |
|   def __init__(self, executor_config, manager_hostname):
 | |
|     self.executor_config = executor_config
 | |
|     self.manager_hostname = manager_hostname
 | |
| 
 | |
|   """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
 | |
|       starting and stopping builders.
 | |
|   """
 | |
|   @coroutine
 | |
|   def start_builder(self, realm, token, build_uuid):
 | |
|     """ Create a builder with the specified config. Returns a unique id which can be used to manage
 | |
|         the builder.
 | |
|     """
 | |
|     raise NotImplementedError
 | |
| 
 | |
|   @coroutine
 | |
|   def stop_builder(self, builder_id):
 | |
|     """ Stop a builder which is currently running.
 | |
|     """
 | |
|     raise NotImplementedError
 | |
| 
 | |
|   def get_manager_websocket_url(self):
 | |
|     return 'ws://{0}:'
 | |
| 
 | |
|   def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
 | |
|                             quay_username=None, quay_password=None):
 | |
|     if quay_username is None:
 | |
|       quay_username = self.executor_config['QUAY_USERNAME']
 | |
| 
 | |
|     if quay_password is None:
 | |
|       quay_password = self.executor_config['QUAY_PASSWORD']
 | |
| 
 | |
|     return TEMPLATE.render(
 | |
|         realm=realm,
 | |
|         token=token,
 | |
|         quay_username=quay_username,
 | |
|         quay_password=quay_password,
 | |
|         manager_hostname=manager_hostname,
 | |
|         coreos_channel=coreos_channel,
 | |
|         worker_tag=self.executor_config['WORKER_TAG'],
 | |
|         logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
 | |
|     )
 | |
| 
 | |
| 
 | |
| class EC2Executor(BuilderExecutor):
 | |
|   """ Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
 | |
|       providers.
 | |
|   """
 | |
|   COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
 | |
| 
 | |
|   def __init__(self, *args, **kwargs):
 | |
|     self._loop = get_event_loop()
 | |
|     super(EC2Executor, self).__init__(*args, **kwargs)
 | |
| 
 | |
|   def _get_conn(self):
 | |
|     """ Creates an ec2 connection which can be used to manage instances.
 | |
|     """
 | |
|     return AsyncWrapper(boto.ec2.connect_to_region(
 | |
|         self.executor_config['EC2_REGION'],
 | |
|         aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
 | |
|         aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
 | |
|     ))
 | |
| 
 | |
|   @classmethod
 | |
|   @cachetools.ttl_cache(ttl=ONE_HOUR)
 | |
|   def _get_coreos_ami(cls, ec2_region, coreos_channel):
 | |
|     """ Retrieve the CoreOS AMI id from the canonical listing.
 | |
|     """
 | |
|     stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
 | |
|     stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
 | |
|     return stack_amis[ec2_region]
 | |
| 
 | |
|   @coroutine
 | |
|   def start_builder(self, realm, token, build_uuid):
 | |
|     region = self.executor_config['EC2_REGION']
 | |
|     channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
 | |
|     get_ami_callable = partial(self._get_coreos_ami, region, channel)
 | |
|     coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
 | |
|     user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
 | |
| 
 | |
|     logger.debug('Generated cloud config: %s', user_data)
 | |
| 
 | |
|     ec2_conn = self._get_conn()
 | |
| 
 | |
|     ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
 | |
|         size=48,
 | |
|         volume_type='gp2',
 | |
|         delete_on_termination=True,
 | |
|     )
 | |
|     block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
 | |
|     block_devices['/dev/xvda'] = ssd_root_ebs
 | |
| 
 | |
|     interfaces = None
 | |
|     if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
 | |
|       interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
 | |
|           subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
 | |
|           groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
 | |
|           associate_public_ip_address=True,
 | |
|       )
 | |
|       interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
 | |
| 
 | |
|     reservation = yield From(ec2_conn.run_instances(
 | |
|         coreos_ami,
 | |
|         instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
 | |
|         key_name=self.executor_config.get('EC2_KEY_NAME', None),
 | |
|         user_data=user_data,
 | |
|         instance_initiated_shutdown_behavior='terminate',
 | |
|         block_device_map=block_devices,
 | |
|         network_interfaces=interfaces,
 | |
|     ))
 | |
| 
 | |
|     if not reservation.instances:
 | |
|       raise ExecutorException('Unable to spawn builder instance.')
 | |
|     elif len(reservation.instances) != 1:
 | |
|       raise ExecutorException('EC2 started wrong number of instances!')
 | |
| 
 | |
|     launched = AsyncWrapper(reservation.instances[0])
 | |
| 
 | |
|     for i in range(0, 2):
 | |
|       try:
 | |
|         yield From(launched.add_tags({
 | |
|             'Name': 'Quay Ephemeral Builder',
 | |
|             'Realm': realm,
 | |
|             'Token': token,
 | |
|             'BuildUUID': build_uuid,
 | |
|         }))
 | |
|       except boto.exception.EC2ResponseError:
 | |
|         logger.exception('Failed to write EC2 tags (attempt #%s)', i)
 | |
| 
 | |
|     raise Return(launched.id)
 | |
| 
 | |
|   @coroutine
 | |
|   def stop_builder(self, builder_id):
 | |
|     try:
 | |
|       ec2_conn = self._get_conn()
 | |
|       terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
 | |
|     except boto.exception.EC2ResponseError as ec2e:
 | |
|       if ec2e.error_code == 404:
 | |
|         logger.debug('Instance %s already terminated', builder_id)
 | |
|         return
 | |
| 
 | |
|       logger.exception('Exception when trying to terminate instance %s', builder_id)
 | |
|       raise
 | |
| 
 | |
|     if builder_id not in [si.id for si in terminated_instances]:
 | |
|       raise ExecutorException('Unable to terminate instance: %s' % builder_id)
 | |
| 
 | |
| 
 | |
| class PopenExecutor(BuilderExecutor):
 | |
|   """ Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
 | |
|   """
 | |
|   def __init__(self, executor_config, manager_hostname):
 | |
|     self._jobs = {}
 | |
| 
 | |
|     super(PopenExecutor, self).__init__(executor_config, manager_hostname)
 | |
| 
 | |
|   """ Executor which uses Popen to fork a quay-builder process.
 | |
|   """
 | |
|   @coroutine
 | |
|   def start_builder(self, realm, token, build_uuid):
 | |
|     # Now start a machine for this job, adding the machine id to the etcd information
 | |
|     logger.debug('Forking process for build')
 | |
|     import subprocess
 | |
|     builder_env = {
 | |
|         'TOKEN': token,
 | |
|         'REALM': realm,
 | |
|         'ENDPOINT': 'ws://localhost:8787',
 | |
|         'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
 | |
|         'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
 | |
|         'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
 | |
|     }
 | |
| 
 | |
|     logpipe = LogPipe(logging.INFO)
 | |
|     spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
 | |
|                                env=builder_env)
 | |
| 
 | |
|     builder_id = str(uuid.uuid4())
 | |
|     self._jobs[builder_id] = (spawned, logpipe)
 | |
|     logger.debug('Builder spawned with id: %s', builder_id)
 | |
|     raise Return(builder_id)
 | |
| 
 | |
|   @coroutine
 | |
|   def stop_builder(self, builder_id):
 | |
|     if builder_id not in self._jobs:
 | |
|       raise ExecutorException('Builder id not being tracked by executor.')
 | |
| 
 | |
|     logger.debug('Killing builder with id: %s', builder_id)
 | |
|     spawned, logpipe = self._jobs[builder_id]
 | |
| 
 | |
|     if spawned.poll() is None:
 | |
|       spawned.kill()
 | |
|     logpipe.close()
 | |
| 
 | |
| 
 | |
| class LogPipe(threading.Thread):
 | |
|   """ Adapted from http://codereview.stackexchange.com/a/17959
 | |
|   """
 | |
|   def __init__(self, level):
 | |
|     """Setup the object with a logger and a loglevel
 | |
|     and start the thread
 | |
|     """
 | |
|     threading.Thread.__init__(self)
 | |
|     self.daemon = False
 | |
|     self.level = level
 | |
|     self.fd_read, self.fd_write = os.pipe()
 | |
|     self.pipe_reader = os.fdopen(self.fd_read)
 | |
|     self.start()
 | |
| 
 | |
|   def fileno(self):
 | |
|     """Return the write file descriptor of the pipe
 | |
|     """
 | |
|     return self.fd_write
 | |
| 
 | |
|   def run(self):
 | |
|     """Run the thread, logging everything.
 | |
|     """
 | |
|     for line in iter(self.pipe_reader.readline, ''):
 | |
|       logging.log(self.level, line.strip('\n'))
 | |
| 
 | |
|     self.pipe_reader.close()
 | |
| 
 | |
|   def close(self):
 | |
|     """Close the write end of the pipe.
 | |
|     """
 | |
|     os.close(self.fd_write)
 |