This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/manager/executor.py
Joseph Schorr 2c1880b944 Bug fixes, refactoring and "new" tests for the build manager
- Fixes various bugs introduced in the most recent build system commit
- Refactors state management in the build manager to be cleaner and more contained
- Adds back in the mock-based tests, fixed to not use threads and adjusted for the refactoring
- Adds some more simplified unit tests around non-etch related flows
2016-07-18 13:46:48 -04:00

455 lines
16 KiB
Python

import logging
import os
import uuid
import threading
import boto.ec2
import requests
import cachetools
import trollius
import datetime
import release
import socket
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
from functools import partial
from buildman.asyncutil import AsyncWrapper
from container_cloud_config import CloudConfigContext
from app import metric_queue, app
logger = logging.getLogger(__name__)
ONE_HOUR = 60*60
_TAG_RETRY_COUNT = 3 # Number of times to retry adding tags.
_TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries.
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
TEMPLATE = ENV.get_template('cloudconfig.yaml')
CloudConfigContext().populate_jinja_environment(ENV)
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
pass
class BuilderExecutor(object):
def __init__(self, executor_config, manager_hostname):
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
self.executor_config = executor_config
self.manager_hostname = manager_hostname
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
@property
def name(self):
""" Name returns the unique name for this executor. """
return self.executor_config.get('NAME') or self.__class__.__name__
@coroutine
def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
@coroutine
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
raise NotImplementedError
def allowed_for_namespace(self, namespace):
""" Returns true if this executor can be used for builds in the given namespace. """
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
if namespace_whitelist is not None:
return namespace in namespace_whitelist
return True
@property
def minimum_retry_threshold(self):
""" Returns the minimum number of retries required for this executor to be used or 0 if
none. """
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
if quay_password is None:
quay_password = self.executor_config['QUAY_PASSWORD']
return TEMPLATE.render(
realm=realm,
token=token,
quay_username=quay_username,
quay_password=quay_password,
manager_hostname=manager_hostname,
websocket_scheme=self.websocket_scheme,
coreos_channel=coreos_channel,
worker_tag=self.executor_config['WORKER_TAG'],
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
volume_size=self.executor_config.get('VOLUME_SIZE', '42G'),
)
class EC2Executor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
providers.
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def __init__(self, *args, **kwargs):
self._loop = get_event_loop()
super(EC2Executor, self).__init__(*args, **kwargs)
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
))
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
""" Retrieve the CoreOS AMI id from the canonical listing.
"""
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
@coroutine
def start_builder(self, realm, token, build_uuid):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
coreos_ami = self.executor_config.get('COREOS_AMI', None)
if coreos_ami is None:
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
logger.debug('Generated cloud config for build %s: %s', build_uuid, user_data)
ec2_conn = self._get_conn()
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
volume_type='gp2',
delete_on_termination=True,
)
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
block_devices['/dev/xvda'] = ssd_root_ebs
interfaces = None
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
associate_public_ip_address=True,
)
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
try:
reservation = yield From(ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices,
network_interfaces=interfaces,
))
except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance')
metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
raise ec2e
if not reservation.instances:
raise ExecutorException('Unable to spawn builder instance.')
elif len(reservation.instances) != 1:
raise ExecutorException('EC2 started wrong number of instances!')
launched = AsyncWrapper(reservation.instances[0])
# Sleep a few seconds to wait for AWS to spawn the instance.
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
# Tag the instance with its metadata.
for i in range(0, _TAG_RETRY_COUNT):
try:
yield From(launched.add_tags({
'Name': 'Quay Ephemeral Builder',
'Realm': realm,
'Token': token,
'BuildUUID': build_uuid,
}))
except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound':
if i < _TAG_RETRY_COUNT - 1:
logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)',
launched.id, build_uuid, i)
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
continue
raise ExecutorException('Unable to find builder instance.')
logger.exception('Failed to write EC2 tags (attempt #%s)', i)
logger.debug('Machine with ID %s started for build %s', launched.id, build_uuid)
raise Return(launched.id)
@coroutine
def stop_builder(self, builder_id):
try:
ec2_conn = self._get_conn()
terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound':
logger.debug('Instance %s already terminated', builder_id)
return
logger.exception('Exception when trying to terminate instance %s', builder_id)
raise
if builder_id not in [si.id for si in terminated_instances]:
raise ExecutorException('Unable to terminate instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_hostname):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
""" Executor which uses Popen to fork a quay-builder process.
"""
@coroutine
def start_builder(self, realm, token, build_uuid):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
env=builder_env)
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
raise Return(builder_id)
@coroutine
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')
logger.debug('Killing builder with id: %s', builder_id)
spawned, logpipe = self._jobs[builder_id]
if spawned.poll() is None:
spawned.kill()
logpipe.close()
class KubernetesExecutor(BuilderExecutor):
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
machine in a pod """
def __init__(self, *args, **kwargs):
super(KubernetesExecutor, self).__init__(*args, **kwargs)
self._loop = get_event_loop()
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE',
'quay.io/quay/quay-builder-qemu-coreos:stable')
@coroutine
def _request(self, method, path, **kwargs):
request_options = dict(kwargs)
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
if 'timeout' not in request_options:
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
if tls_cert and tls_key:
scheme = 'https'
request_options['cert'] = (tls_cert, tls_key)
if tls_ca:
request_options['verify'] = tls_ca
else:
scheme = 'http'
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
url = '%s://%s%s' % (scheme, server, path)
logger.debug('Executor config: %s', self.executor_config)
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
res = requests.request(method, url, **request_options)
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
raise Return(res)
def _jobs_path(self):
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
def _job_path(self, build_uuid):
return '%s/%s' % (self._jobs_path(), build_uuid)
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '8G')
# Max values for this container
container_limits = {
'memory' : self.executor_config.get('CONTAINER_MEMORY_LIMIT', '8Gi'),
'cpu' : self.executor_config.get('CONTAINER_CPU_LIMIT', "2"),
}
# Minimum acceptable free resources for this container to "fit" in a quota
container_requests = {
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '8Gi'),
'cpu' : self.executor_config.get('CONTAINER_CPU_REQUEST', "2"),
}
release_sha = release.GIT_HEAD or 'none'
if ' ' in release_sha:
release_sha = 'HEAD'
return {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'namespace': self.namespace,
'generateName': build_uuid,
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
'manager': socket.gethostname(),
'quay-sha': release_sha,
},
},
'spec' : {
'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200),
'template': {
'metadata': {
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
'manager': socket.gethostname(),
'quay-sha': release_sha,
},
},
'spec': {
'containers': [
{
'name': 'builder',
'imagePullPolicy': 'IfNotPresent',
'image': self.image,
'securityContext': {'privileged': True},
'env': [
{'name': 'USERDATA', 'value': user_data},
{'name': 'VM_MEMORY', 'value': vm_memory_limit},
],
'limits' : container_limits,
'requests' : container_requests,
},
],
'imagePullSecrets': [{'name': 'builder'}],
'restartPolicy': 'Never',
},
},
},
}
@coroutine
def start_builder(self, realm, token, build_uuid):
# generate resource
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
resource = self._job_resource(build_uuid, user_data, channel)
logger.debug('Generated kubernetes resource:\n%s', resource)
# schedule
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
if int(create_job.status_code / 100) != 2:
raise ExecutorException('Failed to create job: %s: %s: %s' %
(build_uuid, create_job.status_code, create_job.text))
job = create_job.json()
raise Return(job['metadata']['name'])
@coroutine
def stop_builder(self, builder_id):
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
# Delete the pod(s) for the job.
selectorString = "job-name=%s" % builder_id
try:
yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
except:
logger.exception("Failed to send delete pod call for job %s", builder_id)
# Delete the job itself.
try:
yield From(self._request('DELETE', self._job_path(builder_id)))
except:
logger.exception('Failed to send delete job call for job %s', builder_id)
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""
def __init__(self, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fd_read, self.fd_write = os.pipe()
self.pipe_reader = os.fdopen(self.fd_read)
self.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fd_write
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipe_reader.readline, ''):
logging.log(self.level, line.strip('\n'))
self.pipe_reader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fd_write)