This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/manager/executor.py

492 lines
18 KiB
Python
Raw Normal View History

import logging
import os
import uuid
import threading
import boto.ec2
import requests
import cachetools
import trollius
2015-11-20 20:32:32 +00:00
import datetime
import release
import socket
import hashlib
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
from functools import partial
from buildman.asyncutil import AsyncWrapper
from container_cloud_config import CloudConfigContext
2015-11-20 20:32:32 +00:00
from app import metric_queue, app
from util.metrics.metricqueue import duration_collector_async
logger = logging.getLogger(__name__)
ONE_HOUR = 60*60
_TAG_RETRY_COUNT = 3 # Number of times to retry adding tags.
_TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries.
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
TEMPLATE = ENV.get_template('cloudconfig.yaml')
CloudConfigContext().populate_jinja_environment(ENV)
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
pass
class BuilderExecutor(object):
def __init__(self, executor_config, manager_hostname):
2015-11-20 20:32:32 +00:00
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
self.executor_config = executor_config
self.manager_hostname = manager_hostname
2015-11-20 20:32:32 +00:00
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
@property
def name(self):
""" Name returns the unique name for this executor. """
return self.executor_config.get('NAME') or self.__class__.__name__
@coroutine
def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
@coroutine
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
raise NotImplementedError
def allowed_for_namespace(self, namespace):
""" Returns true if this executor can be used for builds in the given namespace. """
# Check for an explicit namespace whitelist.
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
if namespace_whitelist is not None and namespace in namespace_whitelist:
return True
# Check for a staged rollout percentage. If found, we hash the namespace and, if it is found
# in the first X% of the character space, we allow this executor to be used.
staged_rollout = self.executor_config.get('STAGED_ROLLOUT')
if staged_rollout is not None:
bucket = int(hashlib.sha256(namespace).hexdigest()[-2:], 16)
return bucket < (256 * staged_rollout)
# If there are no restrictions in place, we are free to use this executor.
return staged_rollout is None and namespace_whitelist is None
@property
def minimum_retry_threshold(self):
""" Returns the minimum number of retries required for this executor to be used or 0 if
none. """
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
if quay_password is None:
quay_password = self.executor_config['QUAY_PASSWORD']
return TEMPLATE.render(
realm=realm,
token=token,
quay_username=quay_username,
quay_password=quay_password,
manager_hostname=manager_hostname,
2015-11-20 20:32:32 +00:00
websocket_scheme=self.websocket_scheme,
coreos_channel=coreos_channel,
worker_tag=self.executor_config['WORKER_TAG'],
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
volume_size=self.executor_config.get('VOLUME_SIZE', '42G'),
)
class EC2Executor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
providers.
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def __init__(self, *args, **kwargs):
self._loop = get_event_loop()
super(EC2Executor, self).__init__(*args, **kwargs)
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
))
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
""" Retrieve the CoreOS AMI id from the canonical listing.
"""
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['ec2'])
def start_builder(self, realm, token, build_uuid):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
coreos_ami = self.executor_config.get('COREOS_AMI', None)
if coreos_ami is None:
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
2016-07-14 15:49:01 +00:00
logger.debug('Generated cloud config for build %s: %s', build_uuid, user_data)
ec2_conn = self._get_conn()
2014-12-23 20:35:21 +00:00
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
volume_type='gp2',
delete_on_termination=True,
2014-12-23 20:35:21 +00:00
)
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
block_devices['/dev/xvda'] = ssd_root_ebs
interfaces = None
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
associate_public_ip_address=True,
)
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
try:
reservation = yield From(ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices,
network_interfaces=interfaces,
))
except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance')
metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
raise ec2e
if not reservation.instances:
raise ExecutorException('Unable to spawn builder instance.')
elif len(reservation.instances) != 1:
raise ExecutorException('EC2 started wrong number of instances!')
launched = AsyncWrapper(reservation.instances[0])
2016-07-14 15:49:01 +00:00
# Sleep a few seconds to wait for AWS to spawn the instance.
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
# Tag the instance with its metadata.
for i in range(0, _TAG_RETRY_COUNT):
try:
yield From(launched.add_tags({
'Name': 'Quay Ephemeral Builder',
'Realm': realm,
'Token': token,
'BuildUUID': build_uuid,
}))
except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound':
if i < _TAG_RETRY_COUNT - 1:
2016-07-14 15:49:01 +00:00
logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)',
launched.id, build_uuid, i)
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
continue
raise ExecutorException('Unable to find builder instance.')
logger.exception('Failed to write EC2 tags (attempt #%s)', i)
2016-07-14 15:49:01 +00:00
logger.debug('Machine with ID %s started for build %s', launched.id, build_uuid)
raise Return(launched.id)
@coroutine
def stop_builder(self, builder_id):
try:
ec2_conn = self._get_conn()
terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound':
logger.debug('Instance %s already terminated', builder_id)
return
logger.exception('Exception when trying to terminate instance %s', builder_id)
raise
if builder_id not in [si.id for si in terminated_instances]:
raise ExecutorException('Unable to terminate instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_hostname):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
""" Executor which uses Popen to fork a quay-builder process.
"""
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['fork'])
def start_builder(self, realm, token, build_uuid):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
2015-11-20 20:32:32 +00:00
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
env=builder_env)
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
raise Return(builder_id)
@coroutine
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')
logger.debug('Killing builder with id: %s', builder_id)
spawned, logpipe = self._jobs[builder_id]
if spawned.poll() is None:
spawned.kill()
logpipe.close()
2015-11-20 20:32:32 +00:00
class KubernetesExecutor(BuilderExecutor):
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
machine in a pod """
2015-11-20 20:32:32 +00:00
def __init__(self, *args, **kwargs):
super(KubernetesExecutor, self).__init__(*args, **kwargs)
self._loop = get_event_loop()
2015-11-20 20:32:32 +00:00
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE',
'quay.io/quay/quay-builder-qemu-coreos:stable')
2015-11-20 20:32:32 +00:00
@coroutine
def _request(self, method, path, **kwargs):
request_options = dict(kwargs)
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
if 'timeout' not in request_options:
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
if tls_cert and tls_key:
scheme = 'https'
request_options['cert'] = (tls_cert, tls_key)
if tls_ca:
request_options['verify'] = tls_ca
else:
scheme = 'http'
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
url = '%s://%s%s' % (scheme, server, path)
logger.debug('Executor config: %s', self.executor_config)
2015-11-20 20:32:32 +00:00
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
res = requests.request(method, url, **request_options)
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
raise Return(res)
def _jobs_path(self):
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
def _job_path(self, build_uuid):
return '%s/%s' % (self._jobs_path(), build_uuid)
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
2016-09-29 15:20:49 +00:00
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '4G')
vm_volume_size = self.executor_config.get('VOLUME_SIZE', '32G')
2015-11-20 20:32:32 +00:00
# Minimum acceptable free resources for this container to "fit" in a quota
2016-10-04 15:56:06 +00:00
# These may be lower than the aboslute limits if the cluster is knowingly
# oversubscribed by some amount.
2015-11-20 20:32:32 +00:00
container_requests = {
2016-10-04 15:56:06 +00:00
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '3968Mi'),
2015-11-20 20:32:32 +00:00
}
release_sha = release.GIT_HEAD or 'none'
if ' ' in release_sha:
release_sha = 'HEAD'
2015-11-20 20:32:32 +00:00
return {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'namespace': self.namespace,
2016-10-01 18:02:28 +00:00
'generateName': build_uuid + '-',
2015-11-20 20:32:32 +00:00
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
2016-07-07 21:25:16 +00:00
'manager': socket.gethostname(),
'quay-sha': release_sha,
2015-11-20 20:32:32 +00:00
},
},
'spec' : {
2016-07-07 21:25:16 +00:00
'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200),
'template': {
2015-11-20 20:32:32 +00:00
'metadata': {
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
2016-07-07 21:25:16 +00:00
'manager': socket.gethostname(),
'quay-sha': release_sha,
2015-11-20 20:32:32 +00:00
},
},
'spec': {
# This volume is a hack to mask the token for the namespace's
# default service account, which is placed in a file mounted under
# `/var/run/secrets/kubernetes.io/serviceaccount` in all pods.
# There's currently no other way to just disable the service
# account at either the pod or namespace level.
#
# https://github.com/kubernetes/kubernetes/issues/16779
#
'volumes': [
{
'name': 'secrets-mask',
'emptyDir': {
'medium': 'Memory',
},
},
],
2015-11-20 20:32:32 +00:00
'containers': [
{
'name': 'builder',
2016-09-08 19:00:12 +00:00
'imagePullPolicy': 'Always',
'image': self.image,
'securityContext': {'privileged': True},
2015-11-20 20:32:32 +00:00
'env': [
{'name': 'USERDATA', 'value': user_data},
{'name': 'VM_MEMORY', 'value': vm_memory_limit},
{'name': 'VM_VOLUME_SIZE', 'value': vm_volume_size},
2015-11-20 20:32:32 +00:00
],
2016-10-04 15:56:06 +00:00
'resources': {
'requests': container_requests,
},
'volumeMounts': [
{
'name': 'secrets-mask',
'mountPath': '/var/run/secrets/kubernetes.io/serviceaccount',
},
],
2015-11-20 20:32:32 +00:00
},
],
'imagePullSecrets': [{'name': 'builder'}],
2015-11-20 20:32:32 +00:00
'restartPolicy': 'Never',
'dnsPolicy': 'Default',
2015-11-20 20:32:32 +00:00
},
},
},
}
@coroutine
@duration_collector_async(metric_queue.builder_time_to_start, ['k8s'])
2015-11-20 20:32:32 +00:00
def start_builder(self, realm, token, build_uuid):
# generate resource
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
resource = self._job_resource(build_uuid, user_data, channel)
logger.debug('Generated kubernetes resource:\n%s', resource)
# schedule
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
if int(create_job.status_code / 100) != 2:
raise ExecutorException('Failed to create job: %s: %s: %s' %
(build_uuid, create_job.status_code, create_job.text))
2015-11-20 20:32:32 +00:00
job = create_job.json()
raise Return(job['metadata']['name'])
@coroutine
def stop_builder(self, builder_id):
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
# Delete the job itself.
try:
yield From(self._request('DELETE', self._job_path(builder_id)))
except:
logger.exception('Failed to send delete job call for job %s', builder_id)
# Delete the pod(s) for the job.
2015-11-20 20:32:32 +00:00
selectorString = "job-name=%s" % builder_id
try:
yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
2015-11-20 20:32:32 +00:00
except:
logger.exception("Failed to send delete pod call for job %s", builder_id)
2015-11-20 20:32:32 +00:00
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""
def __init__(self, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fd_read, self.fd_write = os.pipe()
self.pipe_reader = os.fdopen(self.fd_read)
self.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fd_write
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipe_reader.readline, ''):
logging.log(self.level, line.strip('\n'))
self.pipe_reader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fd_write)