779f0f1b54
This adds a empty volume on a tmpfs to builder pods and mounts it over the directory Kubernetes uses for secrets, which should prevent pods from having access to the default service account.
491 lines
18 KiB
Python
491 lines
18 KiB
Python
import logging
|
|
import os
|
|
import uuid
|
|
import threading
|
|
import boto.ec2
|
|
import requests
|
|
import cachetools
|
|
import trollius
|
|
import datetime
|
|
import release
|
|
import socket
|
|
import hashlib
|
|
|
|
from jinja2 import FileSystemLoader, Environment
|
|
from trollius import coroutine, From, Return, get_event_loop
|
|
from functools import partial
|
|
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from container_cloud_config import CloudConfigContext
|
|
from app import metric_queue, app
|
|
from util.metrics.metricqueue import duration_collector_async
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ONE_HOUR = 60*60
|
|
|
|
_TAG_RETRY_COUNT = 3 # Number of times to retry adding tags.
|
|
_TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries.
|
|
|
|
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
|
|
TEMPLATE = ENV.get_template('cloudconfig.yaml')
|
|
CloudConfigContext().populate_jinja_environment(ENV)
|
|
|
|
class ExecutorException(Exception):
|
|
""" Exception raised when there is a problem starting or stopping a builder.
|
|
"""
|
|
pass
|
|
|
|
|
|
class BuilderExecutor(object):
|
|
def __init__(self, executor_config, manager_hostname):
|
|
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
|
|
starting and stopping builders.
|
|
"""
|
|
self.executor_config = executor_config
|
|
self.manager_hostname = manager_hostname
|
|
|
|
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
|
|
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
|
|
|
|
@property
|
|
def name(self):
|
|
""" Name returns the unique name for this executor. """
|
|
return self.executor_config.get('NAME') or self.__class__.__name__
|
|
|
|
@coroutine
|
|
def start_builder(self, realm, token, build_uuid):
|
|
""" Create a builder with the specified config. Returns a unique id which can be used to manage
|
|
the builder.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
""" Stop a builder which is currently running.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def allowed_for_namespace(self, namespace):
|
|
""" Returns true if this executor can be used for builds in the given namespace. """
|
|
|
|
# Check for an explicit namespace whitelist.
|
|
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
|
|
if namespace_whitelist is not None and namespace in namespace_whitelist:
|
|
return True
|
|
|
|
# Check for a staged rollout percentage. If found, we hash the namespace and, if it is found
|
|
# in the first X% of the character space, we allow this executor to be used.
|
|
staged_rollout = self.executor_config.get('STAGED_ROLLOUT')
|
|
if staged_rollout is not None:
|
|
bucket = int(hashlib.sha256(namespace).hexdigest()[-2:], 16)
|
|
return bucket < (256 * staged_rollout)
|
|
|
|
# If there are no restrictions in place, we are free to use this executor.
|
|
return staged_rollout is None and namespace_whitelist is None
|
|
|
|
@property
|
|
def minimum_retry_threshold(self):
|
|
""" Returns the minimum number of retries required for this executor to be used or 0 if
|
|
none. """
|
|
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
|
|
|
|
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
|
|
quay_username=None, quay_password=None):
|
|
if quay_username is None:
|
|
quay_username = self.executor_config['QUAY_USERNAME']
|
|
|
|
if quay_password is None:
|
|
quay_password = self.executor_config['QUAY_PASSWORD']
|
|
|
|
return TEMPLATE.render(
|
|
realm=realm,
|
|
token=token,
|
|
quay_username=quay_username,
|
|
quay_password=quay_password,
|
|
manager_hostname=manager_hostname,
|
|
websocket_scheme=self.websocket_scheme,
|
|
coreos_channel=coreos_channel,
|
|
worker_tag=self.executor_config['WORKER_TAG'],
|
|
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
|
|
volume_size=self.executor_config.get('VOLUME_SIZE', '42G'),
|
|
)
|
|
|
|
|
|
class EC2Executor(BuilderExecutor):
|
|
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
|
|
providers.
|
|
"""
|
|
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._loop = get_event_loop()
|
|
super(EC2Executor, self).__init__(*args, **kwargs)
|
|
|
|
def _get_conn(self):
|
|
""" Creates an ec2 connection which can be used to manage instances.
|
|
"""
|
|
return AsyncWrapper(boto.ec2.connect_to_region(
|
|
self.executor_config['EC2_REGION'],
|
|
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
|
|
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
|
|
))
|
|
|
|
@classmethod
|
|
@cachetools.ttl_cache(ttl=ONE_HOUR)
|
|
def _get_coreos_ami(cls, ec2_region, coreos_channel):
|
|
""" Retrieve the CoreOS AMI id from the canonical listing.
|
|
"""
|
|
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
|
|
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
|
|
return stack_amis[ec2_region]
|
|
|
|
@coroutine
|
|
@duration_collector_async(metric_queue.builder_time_to_start, ['ec2'])
|
|
def start_builder(self, realm, token, build_uuid):
|
|
region = self.executor_config['EC2_REGION']
|
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
|
|
|
coreos_ami = self.executor_config.get('COREOS_AMI', None)
|
|
if coreos_ami is None:
|
|
get_ami_callable = partial(self._get_coreos_ami, region, channel)
|
|
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
|
|
|
|
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
|
|
logger.debug('Generated cloud config for build %s: %s', build_uuid, user_data)
|
|
|
|
ec2_conn = self._get_conn()
|
|
|
|
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
|
|
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
|
|
volume_type='gp2',
|
|
delete_on_termination=True,
|
|
)
|
|
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
|
|
block_devices['/dev/xvda'] = ssd_root_ebs
|
|
|
|
interfaces = None
|
|
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
|
|
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
|
|
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
|
|
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
|
|
associate_public_ip_address=True,
|
|
)
|
|
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
|
|
|
|
try:
|
|
reservation = yield From(ec2_conn.run_instances(
|
|
coreos_ami,
|
|
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
|
|
key_name=self.executor_config.get('EC2_KEY_NAME', None),
|
|
user_data=user_data,
|
|
instance_initiated_shutdown_behavior='terminate',
|
|
block_device_map=block_devices,
|
|
network_interfaces=interfaces,
|
|
))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
logger.exception('Unable to spawn builder instance')
|
|
metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
|
|
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
|
|
raise ec2e
|
|
|
|
if not reservation.instances:
|
|
raise ExecutorException('Unable to spawn builder instance.')
|
|
elif len(reservation.instances) != 1:
|
|
raise ExecutorException('EC2 started wrong number of instances!')
|
|
|
|
launched = AsyncWrapper(reservation.instances[0])
|
|
|
|
# Sleep a few seconds to wait for AWS to spawn the instance.
|
|
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
|
|
|
|
# Tag the instance with its metadata.
|
|
for i in range(0, _TAG_RETRY_COUNT):
|
|
try:
|
|
yield From(launched.add_tags({
|
|
'Name': 'Quay Ephemeral Builder',
|
|
'Realm': realm,
|
|
'Token': token,
|
|
'BuildUUID': build_uuid,
|
|
}))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
if ec2e.error_code == 'InvalidInstanceID.NotFound':
|
|
if i < _TAG_RETRY_COUNT - 1:
|
|
logger.warning('Failed to write EC2 tags for instance %s for build %s (attempt #%s)',
|
|
launched.id, build_uuid, i)
|
|
yield From(trollius.sleep(_TAG_RETRY_SLEEP))
|
|
continue
|
|
|
|
raise ExecutorException('Unable to find builder instance.')
|
|
|
|
logger.exception('Failed to write EC2 tags (attempt #%s)', i)
|
|
|
|
logger.debug('Machine with ID %s started for build %s', launched.id, build_uuid)
|
|
raise Return(launched.id)
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
try:
|
|
ec2_conn = self._get_conn()
|
|
terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
|
|
except boto.exception.EC2ResponseError as ec2e:
|
|
if ec2e.error_code == 'InvalidInstanceID.NotFound':
|
|
logger.debug('Instance %s already terminated', builder_id)
|
|
return
|
|
|
|
logger.exception('Exception when trying to terminate instance %s', builder_id)
|
|
raise
|
|
|
|
if builder_id not in [si.id for si in terminated_instances]:
|
|
raise ExecutorException('Unable to terminate instance: %s' % builder_id)
|
|
|
|
|
|
class PopenExecutor(BuilderExecutor):
|
|
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
|
|
"""
|
|
def __init__(self, executor_config, manager_hostname):
|
|
self._jobs = {}
|
|
|
|
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
|
|
|
|
""" Executor which uses Popen to fork a quay-builder process.
|
|
"""
|
|
@coroutine
|
|
@duration_collector_async(metric_queue.builder_time_to_start, ['fork'])
|
|
def start_builder(self, realm, token, build_uuid):
|
|
# Now start a machine for this job, adding the machine id to the etcd information
|
|
logger.debug('Forking process for build')
|
|
import subprocess
|
|
|
|
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
|
|
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
|
|
builder_env = {
|
|
'TOKEN': token,
|
|
'REALM': realm,
|
|
'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
|
|
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
|
|
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
|
|
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
|
|
}
|
|
|
|
logpipe = LogPipe(logging.INFO)
|
|
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
|
|
env=builder_env)
|
|
|
|
builder_id = str(uuid.uuid4())
|
|
self._jobs[builder_id] = (spawned, logpipe)
|
|
logger.debug('Builder spawned with id: %s', builder_id)
|
|
raise Return(builder_id)
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
if builder_id not in self._jobs:
|
|
raise ExecutorException('Builder id not being tracked by executor.')
|
|
|
|
logger.debug('Killing builder with id: %s', builder_id)
|
|
spawned, logpipe = self._jobs[builder_id]
|
|
|
|
if spawned.poll() is None:
|
|
spawned.kill()
|
|
logpipe.close()
|
|
|
|
|
|
class KubernetesExecutor(BuilderExecutor):
|
|
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
|
|
machine in a pod """
|
|
def __init__(self, *args, **kwargs):
|
|
super(KubernetesExecutor, self).__init__(*args, **kwargs)
|
|
self._loop = get_event_loop()
|
|
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
|
|
self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE',
|
|
'quay.io/quay/quay-builder-qemu-coreos:stable')
|
|
|
|
@coroutine
|
|
def _request(self, method, path, **kwargs):
|
|
request_options = dict(kwargs)
|
|
|
|
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
|
|
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
|
|
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
|
|
|
|
if 'timeout' not in request_options:
|
|
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
|
|
|
|
if tls_cert and tls_key:
|
|
scheme = 'https'
|
|
request_options['cert'] = (tls_cert, tls_key)
|
|
if tls_ca:
|
|
request_options['verify'] = tls_ca
|
|
else:
|
|
scheme = 'http'
|
|
|
|
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
|
|
url = '%s://%s%s' % (scheme, server, path)
|
|
|
|
logger.debug('Executor config: %s', self.executor_config)
|
|
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
|
|
res = requests.request(method, url, **request_options)
|
|
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
|
|
raise Return(res)
|
|
|
|
def _jobs_path(self):
|
|
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
|
|
|
|
def _job_path(self, build_uuid):
|
|
return '%s/%s' % (self._jobs_path(), build_uuid)
|
|
|
|
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
|
|
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '4G')
|
|
vm_volume_size = self.executor_config.get('VOLUME_SIZE', '32G')
|
|
|
|
# Minimum acceptable free resources for this container to "fit" in a quota
|
|
# These may be lower than the aboslute limits if the cluster is knowingly
|
|
# oversubscribed by some amount.
|
|
container_requests = {
|
|
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '3968Mi'),
|
|
}
|
|
|
|
release_sha = release.GIT_HEAD or 'none'
|
|
if ' ' in release_sha:
|
|
release_sha = 'HEAD'
|
|
|
|
return {
|
|
'apiVersion': 'batch/v1',
|
|
'kind': 'Job',
|
|
'metadata': {
|
|
'namespace': self.namespace,
|
|
'generateName': build_uuid + '-',
|
|
'labels': {
|
|
'build': build_uuid,
|
|
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
|
'manager': socket.gethostname(),
|
|
'quay-sha': release_sha,
|
|
},
|
|
},
|
|
'spec' : {
|
|
'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200),
|
|
'template': {
|
|
'metadata': {
|
|
'labels': {
|
|
'build': build_uuid,
|
|
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
|
'manager': socket.gethostname(),
|
|
'quay-sha': release_sha,
|
|
},
|
|
},
|
|
'spec': {
|
|
# This volume is a hack to mask the token for the namespace's
|
|
# default service account, which is placed in a file mounted under
|
|
# `/var/run/secrets/kubernetes.io/serviceaccount` in all pods.
|
|
# There's currently no other way to just disable the service
|
|
# account at either the pod or namespace level.
|
|
#
|
|
# https://github.com/kubernetes/kubernetes/issues/16779
|
|
#
|
|
'volumes': [
|
|
{
|
|
'name': 'secrets-mask',
|
|
'emptyDir': {
|
|
'medium': 'Memory',
|
|
},
|
|
},
|
|
],
|
|
'containers': [
|
|
{
|
|
'name': 'builder',
|
|
'imagePullPolicy': 'Always',
|
|
'image': self.image,
|
|
'securityContext': {'privileged': True},
|
|
'env': [
|
|
{'name': 'USERDATA', 'value': user_data},
|
|
{'name': 'VM_MEMORY', 'value': vm_memory_limit},
|
|
{'name': 'VM_VOLUME_SIZE', 'value': vm_volume_size},
|
|
],
|
|
'resources': {
|
|
'requests': container_requests,
|
|
},
|
|
'volumeMounts': [
|
|
{
|
|
'name': 'secrets-mask',
|
|
'mountPath': '/var/run/secrets/kubernetes.io/serviceaccount',
|
|
},
|
|
],
|
|
},
|
|
],
|
|
'imagePullSecrets': [{'name': 'builder'}],
|
|
'restartPolicy': 'Never',
|
|
'dnsPolicy': 'Default',
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@coroutine
|
|
@duration_collector_async(metric_queue.builder_time_to_start, ['k8s'])
|
|
def start_builder(self, realm, token, build_uuid):
|
|
# generate resource
|
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
|
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
|
|
resource = self._job_resource(build_uuid, user_data, channel)
|
|
logger.debug('Generated kubernetes resource:\n%s', resource)
|
|
|
|
# schedule
|
|
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
|
|
if int(create_job.status_code / 100) != 2:
|
|
raise ExecutorException('Failed to create job: %s: %s: %s' %
|
|
(build_uuid, create_job.status_code, create_job.text))
|
|
|
|
job = create_job.json()
|
|
raise Return(job['metadata']['name'])
|
|
|
|
@coroutine
|
|
def stop_builder(self, builder_id):
|
|
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
|
|
|
|
# Delete the job itself.
|
|
try:
|
|
yield From(self._request('DELETE', self._job_path(builder_id)))
|
|
except:
|
|
logger.exception('Failed to send delete job call for job %s', builder_id)
|
|
|
|
# Delete the pod(s) for the job.
|
|
selectorString = "job-name=%s" % builder_id
|
|
try:
|
|
yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
|
|
except:
|
|
logger.exception("Failed to send delete pod call for job %s", builder_id)
|
|
|
|
|
|
class LogPipe(threading.Thread):
|
|
""" Adapted from http://codereview.stackexchange.com/a/17959
|
|
"""
|
|
def __init__(self, level):
|
|
"""Setup the object with a logger and a loglevel
|
|
and start the thread
|
|
"""
|
|
threading.Thread.__init__(self)
|
|
self.daemon = False
|
|
self.level = level
|
|
self.fd_read, self.fd_write = os.pipe()
|
|
self.pipe_reader = os.fdopen(self.fd_read)
|
|
self.start()
|
|
|
|
def fileno(self):
|
|
"""Return the write file descriptor of the pipe
|
|
"""
|
|
return self.fd_write
|
|
|
|
def run(self):
|
|
"""Run the thread, logging everything.
|
|
"""
|
|
for line in iter(self.pipe_reader.readline, ''):
|
|
logging.log(self.level, line.strip('\n'))
|
|
|
|
self.pipe_reader.close()
|
|
|
|
def close(self):
|
|
"""Close the write end of the pipe.
|
|
"""
|
|
os.close(self.fd_write)
|