450 lines
16 KiB
Python
450 lines
16 KiB
Python
import logging
|
|
import os
|
|
import uuid
|
|
import threading
|
|
import boto.ec2
|
|
import requests
|
|
import cachetools
|
|
import trollius
|
|
import datetime
|
|
import release
|
|
import socket
|
|
|
|
from jinja2 import FileSystemLoader, Environment
|
|
from trollius import coroutine, From, Return, get_event_loop
|
|
from functools import partial
|
|
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from container_cloud_config import CloudConfigContext
|
|
from app import metric_queue, app
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ONE_HOUR = 60*60
|
|
|
|
_TAG_RETRY_COUNT = 3 # Number of times to retry adding tags.
|
|
_TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries.
|
|
|
|
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
|
|
TEMPLATE = ENV.get_template('cloudconfig.yaml')
|
|
CloudConfigContext().populate_jinja_environment(ENV)
|
|
|
|
class ExecutorException(Exception):
|
|
""" Exception raised when there is a problem starting or stopping a builder.
|
|
"""
|
|
pass
|
|
|
|
|
|
class BuilderExecutor(object):
|
|
def __init__(self, executor_config, manager_hostname):
|
|
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
|
|
starting and stopping builders.
|
|
"""
|
|
self.executor_config = executor_config
|
|
self.manager_hostname = manager_hostname
|
|
|
|
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
|
|
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
|
|
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
""" Create a builder with the specified config. Returns a unique id which can be used to manage
|
|
the builder.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
""" Stop a builder which is currently running.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def allowed_for_namespace(self, namespace):
|
|
""" Returns true if this executor can be used for builds in the given namespace. """
|
|
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
|
|
if namespace_whitelist is not None:
|
|
return namespace in namespace_whitelist
|
|
|
|
return True
|
|
|
|
@property
|
|
def minimum_retry_threshold(self):
|
|
""" Returns the minimum number of retries required for this executor to be used or 0 if
|
|
none. """
|
|
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
|
|
|
|
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
|
|
quay_username=None, quay_password=None):
|
|
if quay_username is None:
|
|
quay_username = self.executor_config['QUAY_USERNAME']
|
|
|
|
if quay_password is None:
|
|
quay_password = self.executor_config['QUAY_PASSWORD']
|
|
|
|
return TEMPLATE.render(
|
|
realm=realm,
|
|
token=token,
|
|
quay_username=quay_username,
|
|
quay_password=quay_password,
|
|
manager_hostname=manager_hostname,
|
|
websocket_scheme=self.websocket_scheme,
|
|
coreos_channel=coreos_channel,
|
|
worker_tag=self.executor_config['WORKER_TAG'],
|
|
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
|
|
volume_size=self.executor_config.get('VOLUME_SIZE', '42G'),
|
|
)
|
|
|
|
|
|
class EC2Executor(BuilderExecutor):
|
|
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
|
|
providers.
|
|
"""
|
|
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._loop = get_event_loop()
|
|
super(EC2Executor, self).__init__(*args, **kwargs)
|
|
|
|
def _get_conn(self):
|
|
""" Creates an ec2 connection which can be used to manage instances.
|
|
"""
|
|
return AsyncWrapper(boto.ec2.connect_to_region(
|
|
self.executor_config['EC2_REGION'],
|
|
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
|
|
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
|
|
))
|
|
|
|
@classmethod
|
|
@cachetools.ttl_cache(ttl=ONE_HOUR)
|
|
def _get_coreos_ami(cls, ec2_region, coreos_channel):
|
|
""" Retrieve the CoreOS AMI id from the canonical listing.
|
|
"""
|
|
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
|
|
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
|
|
return stack_amis[ec2_region]
|
|
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
region = self.executor_config['EC2_REGION']
|
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
|
|
|
coreos_ami = self.executor_config.get('COREOS_AMI', None)
|
|
if coreos_ami is None:
|
|
get_ami_callable = partial(self._get_coreos_ami, region, channel)
|
|
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
|
|
|
|
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
|
|
logger.debug('Generated cloud config for build %s: %s', build_uuid, user_data)
|
|
|
|
ec2_conn = self._get_conn()
|
|
|
|
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
|
|
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
|
|
volume_type='gp2',
|
|
delete_on_termination=True,
|
|
)
|
|
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
|
|
block_devices['/dev/xvda'] = ssd_root_ebs
|
|
|
|
interfaces = None
|
|
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
|
|
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
|
|
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
|
|
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
|
|
associate_public_ip_address=True,
|
|
)
|
|
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
|
|
|
|
try:
|
|
reservation = yield From(ec2_conn.run_instances(
|
|
coreos_ami,
|
|
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
|
|
key_name=self.executor_config.get('EC2_KEY_NAME', None),
|
|
user_data=user_data,
|
|
instance_initiated_shutdown_behavior='terminate',
|
|
block_device_map=block_devices,
|
|
network_interfaces=interfaces,
|
|
))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
logger.exception('Unable to spawn builder instance')
|
|
metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
|
|
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
|
|
raise ec2e
|
|
|
|
if not reservation.instances:
|
|
raise ExecutorException('Unable to spawn builder instance.')
|
|
elif len(reservation.instances) != 1:
|
|
raise ExecutorException('EC2 started wrong number of instances!')
|
|
|
|
launched = AsyncWrapper(reservation.instances[0])
|
|
|
|
# Sleep a few seconds to wait for AWS to spawn the instance.
|
|
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
|
|
|
|
# Tag the instance with its metadata.
|
|
for i in range(0, _TAG_RETRY_COUNT):
|
|
try:
|
|
yield From(launched.add_tags({
|
|
'Name': 'Quay Ephemeral Builder',
|
|
'Realm': realm,
|
|
'Token': token,
|
|
'BuildUUID': build_uuid,
|
|
}))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
if ec2e.error_code == 'InvalidInstanceID.NotFound':
|
|
if i < _TAG_RETRY_COUNT - 1:
|
|
logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)',
|
|
launched.id, build_uuid, i)
|
|
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
|
|
continue
|
|
|
|
raise ExecutorException('Unable to find builder instance.')
|
|
|
|
logger.exception('Failed to write EC2 tags (attempt #%s)', i)
|
|
|
|
logger.debug('Machine with ID %s started for build %s', launched.id, build_uuid)
|
|
raise Return(launched.id)
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
try:
|
|
ec2_conn = self._get_conn()
|
|
terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
if ec2e.error_code == 'InvalidInstanceID.NotFound':
|
|
logger.debug('Instance %s already terminated', builder_id)
|
|
return
|
|
|
|
logger.exception('Exception when trying to terminate instance %s', builder_id)
|
|
raise
|
|
|
|
if builder_id not in [si.id for si in terminated_instances]:
|
|
raise ExecutorException('Unable to terminate instance: %s' % builder_id)
|
|
|
|
|
|
class PopenExecutor(BuilderExecutor):
|
|
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
|
|
"""
|
|
def __init__(self, executor_config, manager_hostname):
|
|
self._jobs = {}
|
|
|
|
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
|
|
|
|
""" Executor which uses Popen to fork a quay-builder process.
|
|
"""
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
# Now start a machine for this job, adding the machine id to the etcd information
|
|
logger.debug('Forking process for build')
|
|
import subprocess
|
|
|
|
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
|
|
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
|
|
builder_env = {
|
|
'TOKEN': token,
|
|
'REALM': realm,
|
|
'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
|
|
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
|
|
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
|
|
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
|
|
}
|
|
|
|
logpipe = LogPipe(logging.INFO)
|
|
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
|
|
env=builder_env)
|
|
|
|
builder_id = str(uuid.uuid4())
|
|
self._jobs[builder_id] = (spawned, logpipe)
|
|
logger.debug('Builder spawned with id: %s', builder_id)
|
|
raise Return(builder_id)
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
if builder_id not in self._jobs:
|
|
raise ExecutorException('Builder id not being tracked by executor.')
|
|
|
|
logger.debug('Killing builder with id: %s', builder_id)
|
|
spawned, logpipe = self._jobs[builder_id]
|
|
|
|
if spawned.poll() is None:
|
|
spawned.kill()
|
|
logpipe.close()
|
|
|
|
|
|
class KubernetesExecutor(BuilderExecutor):
|
|
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
|
|
machine in a pod """
|
|
def __init__(self, *args, **kwargs):
|
|
super(KubernetesExecutor, self).__init__(*args, **kwargs)
|
|
self._loop = get_event_loop()
|
|
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
|
|
self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE',
|
|
'quay.io/quay/quay-builder-qemu-coreos:stable')
|
|
|
|
@coroutine
|
|
def _request(self, method, path, **kwargs):
|
|
request_options = dict(kwargs)
|
|
|
|
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
|
|
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
|
|
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
|
|
|
|
if 'timeout' not in request_options:
|
|
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
|
|
|
|
if tls_cert and tls_key:
|
|
scheme = 'https'
|
|
request_options['cert'] = (tls_cert, tls_key)
|
|
if tls_ca:
|
|
request_options['verify'] = tls_ca
|
|
else:
|
|
scheme = 'http'
|
|
|
|
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
|
|
url = '%s://%s%s' % (scheme, server, path)
|
|
|
|
logger.debug('Executor config: %s', self.executor_config)
|
|
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
|
|
res = requests.request(method, url, **request_options)
|
|
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
|
|
raise Return(res)
|
|
|
|
def _jobs_path(self):
|
|
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
|
|
|
|
def _job_path(self, build_uuid):
|
|
return '%s/%s' % (self._jobs_path(), build_uuid)
|
|
|
|
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
|
|
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '8G')
|
|
|
|
# Max values for this container
|
|
container_limits = {
|
|
'memory' : self.executor_config.get('CONTAINER_MEMORY_LIMIT', '8Gi'),
|
|
'cpu' : self.executor_config.get('CONTAINER_CPU_LIMIT', "2"),
|
|
}
|
|
|
|
# Minimum acceptable free resources for this container to "fit" in a quota
|
|
container_requests = {
|
|
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '8Gi'),
|
|
'cpu' : self.executor_config.get('CONTAINER_CPU_REQUEST', "2"),
|
|
}
|
|
|
|
release_sha = release.GIT_HEAD or 'none'
|
|
if ' ' in release_sha:
|
|
release_sha = 'HEAD'
|
|
|
|
return {
|
|
'apiVersion': 'batch/v1',
|
|
'kind': 'Job',
|
|
'metadata': {
|
|
'namespace': self.namespace,
|
|
'generateName': build_uuid,
|
|
'labels': {
|
|
'build': build_uuid,
|
|
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
|
'manager': socket.gethostname(),
|
|
'quay-sha': release_sha,
|
|
},
|
|
},
|
|
'spec' : {
|
|
'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200),
|
|
'template': {
|
|
'metadata': {
|
|
'labels': {
|
|
'build': build_uuid,
|
|
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
|
'manager': socket.gethostname(),
|
|
'quay-sha': release_sha,
|
|
},
|
|
},
|
|
'spec': {
|
|
'containers': [
|
|
{
|
|
'name': 'builder',
|
|
'imagePullPolicy': 'IfNotPresent',
|
|
'image': self.image,
|
|
'securityContext': {'privileged': True},
|
|
'env': [
|
|
{'name': 'USERDATA', 'value': user_data},
|
|
{'name': 'VM_MEMORY', 'value': vm_memory_limit},
|
|
],
|
|
'limits' : container_limits,
|
|
'requests' : container_requests,
|
|
},
|
|
],
|
|
'imagePullSecrets': [{'name': 'builder'}],
|
|
'restartPolicy': 'Never',
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
# generate resource
|
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
|
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
|
|
resource = self._job_resource(build_uuid, user_data, channel)
|
|
logger.debug('Generated kubernetes resource:\n%s', resource)
|
|
|
|
# schedule
|
|
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
|
|
if int(create_job.status_code / 100) != 2:
|
|
raise ExecutorException('Failed to create job: %s: %s: %s' %
|
|
(build_uuid, create_job.status_code, create_job.text))
|
|
|
|
job = create_job.json()
|
|
raise Return(job['metadata']['name'])
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
|
|
|
|
# Delete the pod(s) for the job.
|
|
selectorString = "job-name=%s" % builder_id
|
|
try:
|
|
yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
|
|
except:
|
|
logger.exception("Failed to send delete pod call for job %s", builder_id)
|
|
|
|
# Delete the job itself.
|
|
try:
|
|
yield From(self._request('DELETE', self._job_path(builder_id)))
|
|
except:
|
|
logger.exception('Failed to send delete job call for job %s', builder_id)
|
|
|
|
|
|
class LogPipe(threading.Thread):
|
|
""" Adapted from http://codereview.stackexchange.com/a/17959
|
|
"""
|
|
def __init__(self, level):
|
|
"""Setup the object with a logger and a loglevel
|
|
and start the thread
|
|
"""
|
|
threading.Thread.__init__(self)
|
|
self.daemon = False
|
|
self.level = level
|
|
self.fd_read, self.fd_write = os.pipe()
|
|
self.pipe_reader = os.fdopen(self.fd_read)
|
|
self.start()
|
|
|
|
def fileno(self):
|
|
"""Return the write file descriptor of the pipe
|
|
"""
|
|
return self.fd_write
|
|
|
|
def run(self):
|
|
"""Run the thread, logging everything.
|
|
"""
|
|
for line in iter(self.pipe_reader.readline, ''):
|
|
logging.log(self.level, line.strip('\n'))
|
|
|
|
self.pipe_reader.close()
|
|
|
|
def close(self):
|
|
"""Close the write end of the pipe.
|
|
"""
|
|
os.close(self.fd_write)
|