Merge pull request #1605 from coreos-inc/kubernetes-builder

Kubernetes builder
This commit is contained in:
josephschorr 2016-07-12 14:49:10 -04:00 committed by GitHub
commit a69266c282
10 changed files with 533 additions and 244 deletions

View file

@ -25,6 +25,10 @@ class BuildJob(object):
'Could not parse build queue item config with ID %s' % self.job_details['build_uuid']
)
@property
def retries_remaining(self):
return self.job_item.retries_remaining
def has_retries_remaining(self):
return self.job_item.retries_remaining > 0
@ -58,6 +62,11 @@ class BuildJob(object):
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self.job_details['build_uuid'])
@property
def namespace(self):
""" Returns the namespace under which this build is running. """
return self.repo_build.repository.namespace_user.username
@property
def repo_build(self):
return self._load_repo_build()

View file

@ -12,7 +12,7 @@ from urllib3.exceptions import ReadTimeoutError, ProtocolError
from app import metric_queue
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper
@ -24,9 +24,14 @@ logger = logging.getLogger(__name__)
ETCD_MAX_WATCH_TIMEOUT = 30
EC2_API_TIMEOUT = 20
RETRY_IMMEDIATELY_TIMEOUT = 0
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
class EtcdAction(object):
GET = 'get'
@ -41,10 +46,6 @@ class EtcdAction(object):
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
_etcd_client_klass = etcd.Client
@ -58,11 +59,15 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = None
self._etcd_builder_prefix = None
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {}
self._job_uuid_to_component = {}
self._component_to_builder = {}
self._job_to_executor = {}
self._executor = None
self._executors = []
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
@ -93,7 +98,6 @@ class EphemeralBuilderManager(BaseManager):
# at the index we retrieved. We therefore start a new watch at HEAD and
# (if specified) call the restarter method which should conduct a read and
# reset the state of the manager.
# TODO: Remove this hack once Etcd is fixed.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
if restarter is not None:
@ -159,8 +163,7 @@ class EphemeralBuilderManager(BaseManager):
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id)
yield From(self._executor.stop_builder(builder_id))
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
def _handle_realm_change(self, etcd_result):
if etcd_result is None:
@ -202,6 +205,10 @@ class EphemeralBuilderManager(BaseManager):
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
return component
@property
def registered_executors(self):
return self._executors
@coroutine
def _register_existing_realms(self):
try:
@ -224,13 +231,26 @@ class EphemeralBuilderManager(BaseManager):
# no realms have been registered yet
pass
def _load_executor(self, executor_class_name, executor_config):
executor_klass = EXECUTORS.get(executor_class_name)
if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_class_name)
return
self._executors.append(executor_klass(executor_config, self.manager_hostname))
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.manager_hostname)
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
# or as a new set of executor configurations, with the order determining how we fallback. We
# check for both here to ensure backwards compatibility.
if manager_config.get('EXECUTORS'):
for executor_config in manager_config['EXECUTORS']:
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
else:
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
@ -259,13 +279,14 @@ class EphemeralBuilderManager(BaseManager):
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
return setup_time
return self._manager_config.get('MACHINE_SETUP_TIME', 300)
def shutdown(self):
logger.debug('Shutting down worker.')
@ -316,35 +337,62 @@ class EphemeralBuilderManager(BaseManager):
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = {
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
}
lock_payload = json.dumps(payload)
try:
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
ttl=EC2_API_TIMEOUT))
ttl=self._ephemeral_api_timeout))
except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
raise Return(False, EC2_API_TIMEOUT)
raise Return(False, self._ephemeral_api_timeout)
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
executor_type = self._executor.__class__.__name__
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
started_with_executor = None
logger.debug("executors are: %s", self._executors)
try:
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False, EC2_API_TIMEOUT)
for executor in self._executors:
executor_type = executor.__class__.__name__
# Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor_type)
continue
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
logger.debug('Job %s cannot use executor %s due to not meeting retry threshold', build_uuid,
executor_type)
continue
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
try:
builder_id = yield From(executor.start_builder(realm, token, build_uuid))
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
started_with_executor = executor
break
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
if started_with_executor is None:
logger.error('Could not start ephemeral worker for build %s', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started builder for job: %s with executor: %s', build_uuid, executor_type)
# Store the builder in etcd associated with the job id
try:
@ -353,14 +401,14 @@ class EphemeralBuilderManager(BaseManager):
ttl=setup_time))
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, EC2_API_TIMEOUT)
raise Return(False, self._ephemeral_api_timeout)
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
})
try:
@ -374,6 +422,7 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time)
self._job_to_executor[builder_id] = started_with_executor
raise Return(True, None)
@coroutine
@ -399,7 +448,9 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Calling job_completed with status: %s', job_status)
# Kill the ephmeral builder
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
builder_id = self._component_to_builder.pop(build_component)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
del self._job_to_executor[builder_id]
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
@ -431,11 +482,11 @@ class EphemeralBuilderManager(BaseManager):
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = {
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
}
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))

View file

@ -6,7 +6,9 @@ import boto.ec2
import requests
import cachetools
import trollius
import datetime
import release
import socket
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
@ -14,7 +16,7 @@ from functools import partial
from buildman.asyncutil import AsyncWrapper
from container_cloud_config import CloudConfigContext
from app import metric_queue
from app import metric_queue, app
logger = logging.getLogger(__name__)
@ -37,12 +39,15 @@ class ExecutorException(Exception):
class BuilderExecutor(object):
def __init__(self, executor_config, manager_hostname):
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
self.executor_config = executor_config
self.manager_hostname = manager_hostname
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws'
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
@coroutine
def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
@ -56,8 +61,19 @@ class BuilderExecutor(object):
"""
raise NotImplementedError
def get_manager_websocket_url(self):
return 'ws://{0}:'
def allowed_for_namespace(self, namespace):
""" Returns true if this executor can be used for builds in the given namespace. """
namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST')
if namespace_whitelist is not None:
return namespace in namespace_whitelist
return True
@property
def minimum_retry_threshold(self):
""" Returns the minimum number of retries required for this executor to be used or 0 if
none. """
return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0)
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None):
@ -73,6 +89,7 @@ class BuilderExecutor(object):
quay_username=quay_username,
quay_password=quay_password,
manager_hostname=manager_hostname,
websocket_scheme=self.websocket_scheme,
coreos_channel=coreos_channel,
worker_tag=self.executor_config['WORKER_TAG'],
logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None),
@ -94,9 +111,9 @@ class EC2Executor(BuilderExecutor):
""" Creates an ec2 connection which can be used to manage instances.
"""
return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
))
@classmethod
@ -124,9 +141,9 @@ class EC2Executor(BuilderExecutor):
ec2_conn = self._get_conn()
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
volume_type='gp2',
delete_on_termination=True,
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
volume_type='gp2',
delete_on_termination=True,
)
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
block_devices['/dev/xvda'] = ssd_root_ebs
@ -134,21 +151,21 @@ class EC2Executor(BuilderExecutor):
interfaces = None
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
associate_public_ip_address=True,
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
associate_public_ip_address=True,
)
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
try:
reservation = yield From(ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices,
network_interfaces=interfaces,
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices,
network_interfaces=interfaces,
))
except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance')
@ -166,10 +183,10 @@ class EC2Executor(BuilderExecutor):
for i in range(0, _TAG_RETRY_COUNT):
try:
yield From(launched.add_tags({
'Name': 'Quay Ephemeral Builder',
'Realm': realm,
'Token': token,
'BuildUUID': build_uuid,
'Name': 'Quay Ephemeral Builder',
'Realm': realm,
'Token': token,
'BuildUUID': build_uuid,
}))
except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound':
@ -216,13 +233,16 @@ class PopenExecutor(BuilderExecutor):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://localhost:8787',
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
@ -247,6 +267,150 @@ class PopenExecutor(BuilderExecutor):
logpipe.close()
class KubernetesExecutor(BuilderExecutor):
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
machine in a pod """
def __init__(self, *args, **kwargs):
super(KubernetesExecutor, self).__init__(*args, **kwargs)
self._loop = get_event_loop()
self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder')
self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE',
'quay.io/quay/quay-builder-qemu-coreos:stable')
@coroutine
def _request(self, method, path, **kwargs):
request_options = dict(kwargs)
tls_cert = self.executor_config.get('K8S_API_TLS_CERT')
tls_key = self.executor_config.get('K8S_API_TLS_KEY')
tls_ca = self.executor_config.get('K8S_API_TLS_CA')
if 'timeout' not in request_options:
request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20)
if tls_cert and tls_key:
scheme = 'https'
request_options['cert'] = (tls_cert, tls_key)
if tls_ca:
request_options['verify'] = tls_ca
else:
scheme = 'http'
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
url = '%s://%s%s' % (scheme, server, path)
logger.debug('Executor config: %s', self.executor_config)
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
res = requests.request(method, url, **request_options)
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
raise Return(res)
def _jobs_path(self):
return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace
def _job_path(self, build_uuid):
return '%s/%s' % (self._jobs_path(), build_uuid)
def _job_resource(self, build_uuid, user_data, coreos_channel='stable'):
vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '8G')
# Max values for this container
container_limits = {
'memory' : self.executor_config.get('CONTAINER_MEMORY_LIMIT', '8Gi'),
'cpu' : self.executor_config.get('CONTAINER_CPU_LIMIT', "2"),
}
# Minimum acceptable free resources for this container to "fit" in a quota
container_requests = {
'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '8Gi'),
'cpu' : self.executor_config.get('CONTAINER_CPU_REQUEST', "2"),
}
release_sha = release.GIT_HEAD or 'none'
if ' ' in release_sha:
release_sha = 'HEAD'
return {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'namespace': self.namespace,
'generateName': build_uuid,
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
'manager': socket.gethostname(),
'quay-sha': release_sha,
},
},
'spec' : {
'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200),
'template': {
'metadata': {
'labels': {
'build': build_uuid,
'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'),
'manager': socket.gethostname(),
'quay-sha': release_sha,
},
},
'spec': {
'containers': [
{
'name': 'builder',
'imagePullPolicy': 'IfNotPresent',
'image': self.image,
'securityContext': {'privileged': True},
'env': [
{'name': 'USERDATA', 'value': user_data},
{'name': 'VM_MEMORY', 'value': vm_memory_limit},
],
'limits' : container_limits,
'requests' : container_requests,
},
],
'imagePullSecrets': [{'name': 'builder'}],
'restartPolicy': 'Never',
},
},
},
}
@coroutine
def start_builder(self, realm, token, build_uuid):
# generate resource
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
resource = self._job_resource(build_uuid, user_data, channel)
logger.debug('Generated kubernetes resource:\n%s', resource)
# schedule
create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
if int(create_job.status_code / 100) != 2:
raise ExecutorException('Failed to create job: %s: %s: %s' %
(build_uuid, create_job.status_code, create_job.text))
job = create_job.json()
raise Return(job['metadata']['name'])
@coroutine
def stop_builder(self, builder_id):
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
# Delete the pod(s) for the job.
selectorString = "job-name=%s" % builder_id
try:
yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
except:
logger.exception("Failed to send delete pod call for job %s", builder_id)
# Delete the job itself.
try:
yield From(self._request('DELETE', self._job_path(builder_id)))
except:
logger.exception('Failed to send delete job call for job %s', builder_id)
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""

View file

@ -0,0 +1,20 @@
FROM debian
RUN apt-get clean && apt-get update && apt-get install -y \
bzip2 \
curl \
openssh-client \
qemu-kvm
ARG channel=stable
RUN curl -s -O http://${channel}.release.core-os.net/amd64-usr/current/coreos_production_qemu_image.img.bz2 && \
bzip2 -d coreos_production_qemu_image.img.bz2
RUN apt-get remove -y curl bzip2 && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
COPY start.sh /start.sh
ENTRYPOINT ["/bin/bash", "/start.sh"]

View file

@ -0,0 +1,6 @@
# Builder Image
```
CHANNEL=stable
docker build --build-arg channel=${CHANNEL} -t quay.io/quay/quay-builder-qemu-coreos:${CHANNEL} .
```

View file

@ -0,0 +1,21 @@
#!/bin/bash
set -e
set -x
set -o nounset
mkdir -p /userdata/openstack/latest
echo "$USERDATA" > /userdata/openstack/latest/user_data
qemu-system-x86_64 \
-enable-kvm \
-cpu host \
-device virtio-9p-pci,fsdev=conf,mount_tag=config-2 \
-nographic \
-drive if=virtio,file=./coreos_production_qemu_image.img \
-fsdev local,id=conf,security_model=none,readonly,path=/userdata \
-m "${VM_MEMORY}" \
-machine accel=kvm \
-net nic,model=virtio \
-net user,hostfwd=tcp::2222-:22 \
-smp 2

View file

@ -1,11 +1,17 @@
#cloud-config
users:
groups:
- sudo
- docker
ssh_authorized_keys:
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey Schorr
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 Jimmy Zelinskie
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ Jake Moshenko
- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= Quentin Machu
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC964SY8ojXZVfWknF+Pz+pTHpyb66VBH7OLYnGP+Tm452YKJVFb/rXCpZYHFlzSQtzz9hko8qBoEFXuD2humojx0P7nEtTy8wUClnKcifIqD5b/V1r7ZDa/5hL9Xog11gOXZ17TW1qjN+00qgXwoSh+jM8mAxD7V2ZLnanIDqmpYamT3ZlICz1k4bwYj35gnpSFpijAXeF9LXOEUfDtzNBjeaCvyniYlQyKzpKr8x+oIHumPlxwkFOzGhBMRGrCQ1Kzija8vVZQ6/Tjvxl19jwfgcNT0Zd9vLbHNowJPWQZhLYXdGIb3NxEfAqkGPvGCsaLfsfETYhcFwxr2g+zvf4xvyKgK35PHA/5t7TQryDSKDrQ1qTDUp3dAjzwsBFwEoQ0x68shGC661n/+APMNtj8qR5M9ueIH5WEqdRW10kKzlEm/ESvjyjEVRhXiwWyKkPch/OIUPKexKaEeOBdKocSnNx1+5ntk8OXWRQgjfwtQvm1NE/qD7fViBVUlTRk0c1SVpZaybIZkiMWmA1hzsdUbDP2mzPek1ydsVffw0I8z/dRo5gXQSPq06WfNIKpsiQF8LqP+KU+462A2tbHxFzq9VozI9PeFV+xO59wlJogv6q2yA0Jfv9BFgVgNzItIsUMvStrfkUBTYgaG9djp/vAm+SwMdnLSXILJtMO/3eRQ== Evan Cordell
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC3Q9+JcjEck8CylGEekvskypE8lT3hYnCCfGUoMTAURokD8STtEaVxr197efitQkvwSYxOnDXo2Qr59FqlQ6QtFeCynX87VerN49LJ0pUA1NoYBUCvWRzwpaa8CXGhYPRpfku12mJ0qjqmGFaR5jqhXTNfXmRcWePsXqS+b3FFEqw8BhKg6By1z7NLvKeaEno4Kd0wPpxzs+hFRnk38k2p+1YO1vZzZ2mgEVp9/2577t4TmP8ucnsb9X4vURRpOJwjG8HIgmmQFUVxHRST8Zu3zOXfg9Yv/n3JYhXhvvPxkV4JB6ZbVq0cLHasexFAxz7nTmF1gDWaPbGxmdZtaDe/ Colin Hom
write_files:
- path: /root/overrides.list
@ -13,7 +19,7 @@ write_files:
content: |
REALM={{ realm }}
TOKEN={{ token }}
SERVER=wss://{{ manager_hostname }}
SERVER={{ websocket_scheme }}://{{ manager_hostname }}
{% if logentries_token -%}
LOGENTRIES_TOKEN={{ logentries_token }}
{%- endif %}

View file

@ -7,7 +7,15 @@ REPO=quay.io/quay/quay-dev
d ()
{
docker build -t $REPO -f dev.df --build-arg src_subdir=$(basename `pwd`) .
docker -- run --rm -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $*
#ENV_VARS="foo=bar key=value name=joe"
local envStr=""
if [[ "$ENV_VARS" != "" ]];then
for envVar in $ENV_VARS;do
envStr="${envStr} -e \"${envVar}\""
done
fi
docker -- run --rm $envStr -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $*
}
case $1 in

View file

@ -1,18 +1,15 @@
import unittest
import etcd
import os.path
import time
import json
import uuid
from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock, ANY
from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock
from threading import Event
from urllib3.exceptions import ReadTimeoutError
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
@ -27,6 +24,16 @@ def async_test(f):
loop.run_until_complete(future)
return wrapper
class TestExecutor(BuilderExecutor):
job_started = None
@coroutine
def start_builder(self, realm, token, build_uuid):
self.job_started = True
raise Return(str(uuid.uuid4))
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
@ -41,6 +48,8 @@ class TestEphemeral(unittest.TestCase):
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
self.etcd_client_mock.write = Mock()
return self.etcd_client_mock
def _create_completed_future(self, result=None):
@ -50,13 +59,7 @@ class TestEphemeral(unittest.TestCase):
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
return self.test_executor
def _create_build_job(self):
def _create_build_job(self, namespace='namespace', retries=3):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
@ -65,13 +68,17 @@ class TestEphemeral(unittest.TestCase):
'body': json.dumps(mock_job.job_details),
'id': 1,
}
mock_job.namespace = namespace
mock_job.retries_remaining = retries
return mock_job
def setUp(self):
EphemeralBuilderManager._executors['test'] = self._create_mock_executor
self._existing_executors = dict(EXECUTORS)
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
@ -88,175 +95,174 @@ class TestEphemeral(unittest.TestCase):
30,
)
self.manager.initialize({'EXECUTOR': 'test'})
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
self.etcd_wait_event.set()
self.manager.shutdown()
del EphemeralBuilderManager._executors['test']
EXECUTORS = self._existing_executors
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
def test_verify_executor_oldconfig(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTOR': 'test',
'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
})
self.manager._handle_realm_change(realm_created)
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEqual(self.register_component_callback.call_count, 1)
raise Return(test_component)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
def test_verify_executor_newconfig(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
}]
})
self.manager._handle_realm_change(realm_deleted)
# Ensure that we have a single test executor.
self.assertEquals(1, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.unregister_component_callback.assert_called_once_with(test_component)
def test_verify_multiple_executors(self):
EXECUTORS['test'] = TestExecutor
EXECUTORS['anotherexecutor'] = TestExecutor
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
yield From(self.manager._handle_builder_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
@unittest.skip('this test is flaky on Quay.io builders')
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'builder_id': '1234',
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 42
},
{
'EXECUTOR': 'anotherexecutor',
'MINIMUM_RETRY_THRESHOLD': 24
},
]
})
yield From(self.manager._handle_builder_expiration(expired_result))
# Ensure that we have a two test executors.
self.assertEquals(2, len(self.manager.registered_executors))
self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(spec=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_builder_expiration(set_result)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'builder_id': '123',
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
def test_skip_invalid_executor(self):
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'unknown',
'MINIMUM_RETRY_THRESHOLD': 42
},
]
})
self.etcd_client_mock.read = Mock(return_value=builder_result)
yield From(self.manager.job_heartbeat(self.mock_job))
self.assertEquals(0, len(self.manager.registered_executors))
# Wait for threads to complete
yield From(sleep(.01))
@async_test
def test_schedule_job_namespace_filter(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'NAMESPACE_WHITELIST': ['something'],
}]
})
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
# Try with a build job in an invalid namespace.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid namespace.
build_job = self._create_build_job(namespace='something')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_retries_filter(self):
EXECUTORS['test'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [{
'EXECUTOR': 'test',
'MINIMUM_RETRY_THRESHOLD': 2,
}]
})
# Try with a build job that has too few retries.
build_job = self._create_build_job(retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
# Try with a valid job.
build_job = self._create_build_job(retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
@async_test
def test_schedule_job_executor_fallback(self):
EXECUTORS['primary'] = TestExecutor
EXECUTORS['secondary'] = TestExecutor
self.manager.initialize({
'EXECUTORS': [
{
'EXECUTOR': 'primary',
'NAMESPACE_WHITELIST': ['something'],
'MINIMUM_RETRY_THRESHOLD': 3,
},
{
'EXECUTOR': 'secondary',
'MINIMUM_RETRY_THRESHOLD': 2,
},
]
})
# Try a job not matching the primary's namespace filter. Should schedule on secondary.
build_job = self._create_build_job(namespace='somethingelse')
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching the primary's retry minimum. Should schedule on secondary.
build_job = self._create_build_job(namespace='something', retries=2)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNotNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job matching the primary. Should schedule on the primary.
build_job = self._create_build_job(namespace='something', retries=3)
result = yield From(self.manager.schedule(build_job))
self.assertTrue(result[0])
self.assertIsNotNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
# Try a job not matching either's restrictions.
build_job = self._create_build_job(namespace='somethingelse', retries=1)
result = yield From(self.manager.schedule(build_job))
self.assertFalse(result[0])
self.assertIsNone(self.manager.registered_executors[0].job_started)
self.assertIsNone(self.manager.registered_executors[1].job_started)
self.manager.registered_executors[0].job_started = None
self.manager.registered_executors[1].job_started = None
if __name__ == '__main__':
unittest.main()

View file

@ -1,16 +1,14 @@
import unittest
import json
import os
from httmock import urlmatch, all_requests, HTTMock
from app import app, config_provider, storage, notification_queue
from app import app, storage, notification_queue
from endpoints.notificationevent import VulnerabilityFoundEvent
from initdb import setup_database_for_testing, finished_database_for_testing
from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
from util.secscan.analyzer import LayerAnalyzer
from util.secscan.notifier import process_notification_data
from data import model
from storage.basestorage import StoragePaths
from workers.security_notification_worker import SecurityNotificationWorker
from endpoints.v2 import v2_bp