Kubernetes build worker
This commit is contained in:
parent
3044f8ecbd
commit
bc13333f20
7 changed files with 255 additions and 34 deletions
|
@ -12,7 +12,7 @@ from urllib3.exceptions import ReadTimeoutError, ProtocolError
|
|||
|
||||
from app import metric_queue
|
||||
from buildman.manager.basemanager import BaseManager
|
||||
from buildman.manager.executor import PopenExecutor, EC2Executor
|
||||
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
|
||||
from buildman.component.buildcomponent import BuildComponent
|
||||
from buildman.jobutil.buildjob import BuildJob
|
||||
from buildman.asyncutil import AsyncWrapper
|
||||
|
@ -24,9 +24,14 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
ETCD_MAX_WATCH_TIMEOUT = 30
|
||||
EC2_API_TIMEOUT = 20
|
||||
RETRY_IMMEDIATELY_TIMEOUT = 0
|
||||
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
|
||||
|
||||
EXECUTORS = {
|
||||
'popen': PopenExecutor,
|
||||
'ec2': EC2Executor,
|
||||
'kubernetes': KubernetesExecutor,
|
||||
}
|
||||
|
||||
class EtcdAction(object):
|
||||
GET = 'get'
|
||||
|
@ -41,10 +46,6 @@ class EtcdAction(object):
|
|||
|
||||
class EphemeralBuilderManager(BaseManager):
|
||||
""" Build manager implementation for the Enterprise Registry. """
|
||||
_executors = {
|
||||
'popen': PopenExecutor,
|
||||
'ec2': EC2Executor,
|
||||
}
|
||||
|
||||
_etcd_client_klass = etcd.Client
|
||||
|
||||
|
@ -61,8 +62,9 @@ class EphemeralBuilderManager(BaseManager):
|
|||
self._component_to_job = {}
|
||||
self._job_uuid_to_component = {}
|
||||
self._component_to_builder = {}
|
||||
self._job_to_executor = {}
|
||||
|
||||
self._executor = None
|
||||
self._executors = []
|
||||
|
||||
# Map of etcd keys being watched to the tasks watching them
|
||||
self._watch_tasks = {}
|
||||
|
@ -159,8 +161,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
||||
|
||||
logger.info('Terminating expired build node: %s', builder_id)
|
||||
yield From(self._executor.stop_builder(builder_id))
|
||||
|
||||
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
|
||||
|
||||
def _handle_realm_change(self, etcd_result):
|
||||
if etcd_result is None:
|
||||
|
@ -228,9 +229,12 @@ class EphemeralBuilderManager(BaseManager):
|
|||
logger.debug('Calling initialize')
|
||||
self._manager_config = manager_config
|
||||
|
||||
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
|
||||
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
|
||||
self.manager_hostname)
|
||||
# TODO(jschorr): We need to make this backwards compatible with existing config, as well as test(s)
|
||||
for config in manager_config.get('EXECUTORS', []):
|
||||
executor_klass = EXECUTORS.get(config['EXECUTOR'])
|
||||
executor_config = config.get('CONFIG', {})
|
||||
executor_config.update(manager_config.get('EXECUTOR_CONFIG', {}))
|
||||
self._executors.append(executor_klass(executor_config, self.manager_hostname))
|
||||
|
||||
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
||||
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
||||
|
@ -259,6 +263,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
restarter=self._register_existing_realms)
|
||||
|
||||
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
|
||||
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', DEFAULT_EPHEMERAL_API_TIMEOUT)
|
||||
|
||||
# Load components for all realms currently known to the cluster
|
||||
async(self._register_existing_realms())
|
||||
|
@ -326,25 +331,35 @@ class EphemeralBuilderManager(BaseManager):
|
|||
|
||||
try:
|
||||
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
|
||||
ttl=EC2_API_TIMEOUT))
|
||||
ttl=self._ephemeral_api_timeout))
|
||||
except (KeyError, etcd.EtcdKeyError):
|
||||
# The job was already taken by someone else, we are probably a retry
|
||||
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
||||
raise Return(False, EC2_API_TIMEOUT)
|
||||
raise Return(False, self._ephemeral_api_timeout)
|
||||
except etcd.EtcdException:
|
||||
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
||||
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
||||
|
||||
executor_type = self._executor.__class__.__name__
|
||||
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
|
||||
started = False
|
||||
logger.debug("executors are: %s", self._executors)
|
||||
for executor in self._executors:
|
||||
# TODO(jschorr): gate on whitelist logic
|
||||
executor_type = executor.__class__.__name__
|
||||
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
|
||||
|
||||
try:
|
||||
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
|
||||
metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count')
|
||||
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
|
||||
except:
|
||||
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
||||
raise Return(False, EC2_API_TIMEOUT)
|
||||
try:
|
||||
builder_id = yield From(executor.start_builder(realm, token, build_uuid))
|
||||
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
|
||||
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
|
||||
started = True
|
||||
break
|
||||
except:
|
||||
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
||||
continue
|
||||
|
||||
if not started:
|
||||
logger.error('Could not start any ephemeral workers.')
|
||||
raise Return(False, self._ephemeral_api_timeout)
|
||||
|
||||
# Store the builder in etcd associated with the job id
|
||||
try:
|
||||
|
@ -353,7 +368,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
ttl=setup_time))
|
||||
except etcd.EtcdException:
|
||||
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
||||
raise Return(False, EC2_API_TIMEOUT)
|
||||
raise Return(False, self._ephemeral_api_timeout)
|
||||
|
||||
# Store the realm spec which will allow any manager to accept this builder when it connects
|
||||
realm_spec = json.dumps({
|
||||
|
@ -373,6 +388,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
except etcd.EtcdException:
|
||||
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
||||
raise Return(False, setup_time)
|
||||
self._job_to_executor[builder_id] = executor
|
||||
|
||||
raise Return(True, None)
|
||||
|
||||
|
@ -399,7 +415,9 @@ class EphemeralBuilderManager(BaseManager):
|
|||
logger.debug('Calling job_completed with status: %s', job_status)
|
||||
|
||||
# Kill the ephmeral builder
|
||||
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
|
||||
builder_id = self._component_to_builder.pop(build_component)
|
||||
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
|
||||
del self._job_to_executor[builder_id]
|
||||
|
||||
# Release the lock in etcd
|
||||
job_key = self._etcd_job_key(build_job)
|
||||
|
|
|
@ -6,7 +6,10 @@ import boto.ec2
|
|||
import requests
|
||||
import cachetools
|
||||
import trollius
|
||||
|
||||
import json
|
||||
import datetime
|
||||
import release
|
||||
import socket
|
||||
|
||||
from jinja2 import FileSystemLoader, Environment
|
||||
from trollius import coroutine, From, Return, get_event_loop
|
||||
|
@ -14,7 +17,7 @@ from functools import partial
|
|||
|
||||
from buildman.asyncutil import AsyncWrapper
|
||||
from container_cloud_config import CloudConfigContext
|
||||
from app import metric_queue
|
||||
from app import metric_queue, app
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -37,12 +40,15 @@ class ExecutorException(Exception):
|
|||
|
||||
class BuilderExecutor(object):
|
||||
def __init__(self, executor_config, manager_hostname):
|
||||
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
|
||||
starting and stopping builders.
|
||||
"""
|
||||
self.executor_config = executor_config
|
||||
self.manager_hostname = manager_hostname
|
||||
|
||||
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
|
||||
starting and stopping builders.
|
||||
"""
|
||||
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
|
||||
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
|
||||
|
||||
@coroutine
|
||||
def start_builder(self, realm, token, build_uuid):
|
||||
""" Create a builder with the specified config. Returns a unique id which can be used to manage
|
||||
|
@ -73,6 +79,7 @@ class BuilderExecutor(object):
|
|||
quay_username=quay_username,
|
||||
quay_password=quay_password,
|
||||
manager_hostname=manager_hostname,
|
||||
websocket_scheme=self.websocket_scheme,
|
||||
coreos_channel=coreos_channel,
|
||||
worker_tag=self.executor_config['WORKER_TAG'],
|
||||
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
|
||||
|
@ -216,10 +223,13 @@ class PopenExecutor(BuilderExecutor):
|
|||
# Now start a machine for this job, adding the machine id to the etcd information
|
||||
logger.debug('Forking process for build')
|
||||
import subprocess
|
||||
|
||||
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
|
||||
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
|
||||
builder_env = {
|
||||
'TOKEN': token,
|
||||
'REALM': realm,
|
||||
'ENDPOINT': 'ws://localhost:8787',
|
||||
'ENDPOINT': 'ws://%s:%s' % (ws_host,ws_port),
|
||||
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
|
||||
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
|
||||
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
|
||||
|
@ -247,6 +257,146 @@ class PopenExecutor(BuilderExecutor):
|
|||
logpipe.close()
|
||||
|
||||
|
||||
class KubernetesExecutor(BuilderExecutor):
|
||||
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual machine in a pod """
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._loop = get_event_loop()
|
||||
super(KubernetesExecutor, self).__init__(*args, **kwargs)
|
||||
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
|
||||
self.image = self.executor_config.get('BUILDER_IMAGE', 'quay.io/quay/quay-builder-qemu-coreos')
|
||||
|
||||
@coroutine
|
||||
def _request(self, method, path, **kwargs):
|
||||
request_options = dict(kwargs)
|
||||
|
||||
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
|
||||
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
|
||||
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
|
||||
|
||||
if 'timeout' not in request_options:
|
||||
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
|
||||
|
||||
if tls_cert and tls_key:
|
||||
scheme = 'https'
|
||||
request_options['cert'] = (tls_cert, tls_key)
|
||||
if tls_ca:
|
||||
request_options['verify'] = tls_ca
|
||||
else:
|
||||
scheme = 'http'
|
||||
|
||||
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
|
||||
url = '%s://%s%s' % (scheme, server, path)
|
||||
logger.debug('EXEC CFG: %s',self.executor_config)
|
||||
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
|
||||
res = requests.request(method, url, **request_options)
|
||||
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
|
||||
raise Return(res)
|
||||
|
||||
def _jobs_path(self):
|
||||
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
|
||||
|
||||
def _job_path(self, build_uuid):
|
||||
return '%s/%s' % (self._jobs_path(), build_uuid)
|
||||
|
||||
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
|
||||
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '8G')
|
||||
|
||||
# Max values for this container
|
||||
container_limits = {
|
||||
'memory' : self.executor_config.get('CONTAINER_MEMORY_LIMIT', '8Gi'),
|
||||
'cpu' : self.executor_config.get('CONTAINER_CPU_LIMIT', "2"),
|
||||
}
|
||||
|
||||
# Minimum acceptable free resources for this container to "fit" in a quota
|
||||
container_requests = {
|
||||
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '8Gi'),
|
||||
'cpu' : self.executor_config.get('CONTAINER_CPU_REQUEST', "2"),
|
||||
}
|
||||
|
||||
return {
|
||||
'apiVersion': 'batch/v1',
|
||||
'kind': 'Job',
|
||||
'metadata': {
|
||||
'namespace': self.namespace,
|
||||
'generateName': build_uuid,
|
||||
'labels': {
|
||||
'build': build_uuid,
|
||||
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
||||
'worker': socket.gethostname(),
|
||||
'quay-sha': release.GIT_HEAD or 'none',
|
||||
},
|
||||
},
|
||||
'spec' : {
|
||||
'activeDeadlineSeconds' : 7200,
|
||||
'template' : {
|
||||
'metadata': {
|
||||
'labels': {
|
||||
'build': build_uuid,
|
||||
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
|
||||
'worker': socket.gethostname(),
|
||||
'quay-sha': release.GIT_HEAD or 'none',
|
||||
},
|
||||
},
|
||||
'spec': {
|
||||
'containers': [
|
||||
{
|
||||
'name': 'builder',
|
||||
'image': '%s:%s' % (self.image, coreos_channel),
|
||||
'imagePullPolicy': 'Always',
|
||||
'securityContext': { 'privileged': True },
|
||||
'env': [
|
||||
{ 'name': 'USERDATA', 'value': user_data },
|
||||
{ 'name': 'VM_MEMORY', 'value': vm_memory_limit },
|
||||
],
|
||||
'limits' : container_limits,
|
||||
'requests' : container_requests,
|
||||
},
|
||||
],
|
||||
'imagePullSecrets': [{ 'name': 'builder' }],
|
||||
'restartPolicy': 'Never',
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@coroutine
|
||||
def start_builder(self, realm, token, build_uuid):
|
||||
# generate resource
|
||||
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
||||
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
|
||||
resource = self._job_resource(build_uuid, user_data, channel)
|
||||
logger.debug('Generated kubernetes resource:\n%s', resource)
|
||||
|
||||
# schedule
|
||||
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
|
||||
if int(create_job.status_code / 100) != 2:
|
||||
raise ExecutorException('Failed to create job: %s: %s: %s' % (
|
||||
build_uuid, create_job.status_code, create_job.text))
|
||||
|
||||
job = create_job.json()
|
||||
raise Return(job['metadata']['name'])
|
||||
|
||||
@coroutine
|
||||
def stop_builder(self, builder_id):
|
||||
|
||||
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
|
||||
|
||||
selectorString = "job-name=%s" % builder_id
|
||||
try:
|
||||
delete_pod = yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
|
||||
except:
|
||||
# if the pod does not exist, we will not get an error here. this covers lack of api connectivity, etc
|
||||
logger.exception("Failed to delete pod for job %s", builder_id)
|
||||
raise
|
||||
|
||||
logger.debug("Got successful delete pod response: %s", delete_pod.text)
|
||||
|
||||
try:
|
||||
delete_job = yield From(self._request('DELETE', self._job_path(builder_id)))
|
||||
except:
|
||||
logger.exception('Exception when trying to terminate job %s', builder_id)
|
||||
raise
|
||||
|
||||
class LogPipe(threading.Thread):
|
||||
""" Adapted from http://codereview.stackexchange.com/a/17959
|
||||
"""
|
||||
|
|
Reference in a new issue