This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/manager/executor.py

204 lines
6.4 KiB
Python

import logging
import os
import uuid
import threading
import boto.ec2
import requests
import cachetools
from jinja2 import FileSystemLoader, Environment
logger = logging.getLogger(__name__)
ONE_HOUR = 60*60
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
TEMPLATE = ENV.get_template('cloudconfig.yaml')
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
pass
class BuilderExecutor(object):
def __init__(self, executor_config, manager_public_ip):
self.executor_config = executor_config
self.manager_public_ip = manager_public_ip
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
def start_builder(self, realm, token):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
raise NotImplementedError
def get_manager_websocket_url(self):
return 'ws://{0}:'
def generate_cloud_config(self, realm, token, coreos_channel, manager_ip,
quay_username=None, quay_password=None, etcd_token=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
if quay_password is None:
quay_password = self.executor_config['QUAY_PASSWORD']
if etcd_token is None:
etcd_token = self.executor_config['ETCD_DISCOVERY_TOKEN']
return TEMPLATE.render(
realm=realm,
token=token,
quay_username=quay_username,
quay_password=quay_password,
etcd_token=etcd_token,
manager_ip=manager_ip,
coreos_channel=coreos_channel,
)
class EC2Executor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
providers.
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
)
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
""" Retrieve the CoreOS AMI id from the canonical listing.
"""
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
def start_builder(self, realm, token):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
coreos_ami = self._get_coreos_ami(region, channel)
user_data = self.generate_cloud_config(realm, token, channel, self.manager_public_ip)
logger.debug('Generated cloud config: %s', user_data)
ec2_conn = self._get_conn()
# class FakeReservation(object):
# def __init__(self):
# self.instances = None
# reservation = FakeReservation()
reservation = ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
security_groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
)
if not reservation.instances:
raise ExecutorException('Unable to spawn builder instance.')
elif len(reservation.instances) != 1:
raise ExecutorException('EC2 started wrong number of instances!')
return reservation.instances[0].id
def stop_builder(self, builder_id):
ec2_conn = self._get_conn()
stopped_instance_ids = [si.id for si in ec2_conn.stop_instances([builder_id], force=True)]
if builder_id not in stopped_instance_ids:
raise ExecutorException('Unable to stop instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_public_ip):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_public_ip)
""" Executor which uses Popen to fork a quay-builder process.
"""
def start_builder(self, realm, token):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://localhost:8787',
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
env=builder_env)
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
return builder_id
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')
logger.debug('Killing builder with id: %s', builder_id)
spawned, logpipe = self._jobs[builder_id]
if spawned.poll() is None:
spawned.kill()
logpipe.close()
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""
def __init__(self, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fd_read, self.fd_write = os.pipe()
self.pipe_reader = os.fdopen(self.fd_read)
self.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fd_write
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipe_reader.readline, ''):
logging.log(self.level, line.strip('\n'))
self.pipe_reader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fd_write)