From bef55f9f6d1596948cc7efcad54ebb5eba08b2ab Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 15 Jul 2016 16:32:34 -0400 Subject: [PATCH] Revert "Merge pull request #1605 from coreos-inc/kubernetes-builder" This reverts commit a69266c2820f8984d66ccef23b6d0dae3ed45d9e, reversing changes made to 3143da63928d71f1a28c009d5c0283243a8487a8. --- buildman/jobutil/buildjob.py | 9 - buildman/manager/ephemeral.py | 135 ++++-------- buildman/manager/executor.py | 230 +++------------------ buildman/qemu-coreos/Dockerfile | 20 -- buildman/qemu-coreos/README.md | 6 - buildman/qemu-coreos/start.sh | 21 -- buildman/templates/cloudconfig.yaml | 8 +- local-docker.sh | 10 +- test/test_buildman.py | 306 ++++++++++++++-------------- test/test_secscan.py | 4 +- 10 files changed, 230 insertions(+), 519 deletions(-) delete mode 100644 buildman/qemu-coreos/Dockerfile delete mode 100644 buildman/qemu-coreos/README.md delete mode 100644 buildman/qemu-coreos/start.sh diff --git a/buildman/jobutil/buildjob.py b/buildman/jobutil/buildjob.py index f5971018f..dbbb8113f 100644 --- a/buildman/jobutil/buildjob.py +++ b/buildman/jobutil/buildjob.py @@ -25,10 +25,6 @@ class BuildJob(object): 'Could not parse build queue item config with ID %s' % self.job_details['build_uuid'] ) - @property - def retries_remaining(self): - return self.job_item.retries_remaining - def has_retries_remaining(self): return self.job_item.retries_remaining > 0 @@ -62,11 +58,6 @@ class BuildJob(object): raise BuildJobLoadException( 'Could not load repository build with ID %s' % self.job_details['build_uuid']) - @property - def namespace(self): - """ Returns the namespace under which this build is running. """ - return self.repo_build.repository.namespace_user.username - @property def repo_build(self): return self._load_repo_build() diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index e6d3403e8..689d1fbea 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -12,7 +12,7 @@ from urllib3.exceptions import ReadTimeoutError, ProtocolError from app import metric_queue from buildman.manager.basemanager import BaseManager -from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor +from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent from buildman.jobutil.buildjob import BuildJob from buildman.asyncutil import AsyncWrapper @@ -24,14 +24,9 @@ logger = logging.getLogger(__name__) ETCD_MAX_WATCH_TIMEOUT = 30 +EC2_API_TIMEOUT = 20 RETRY_IMMEDIATELY_TIMEOUT = 0 -DEFAULT_EPHEMERAL_API_TIMEOUT = 20 -EXECUTORS = { - 'popen': PopenExecutor, - 'ec2': EC2Executor, - 'kubernetes': KubernetesExecutor, -} class EtcdAction(object): GET = 'get' @@ -46,6 +41,10 @@ class EtcdAction(object): class EphemeralBuilderManager(BaseManager): """ Build manager implementation for the Enterprise Registry. """ + _executors = { + 'popen': PopenExecutor, + 'ec2': EC2Executor, + } _etcd_client_klass = etcd.Client @@ -59,15 +58,11 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = None self._etcd_builder_prefix = None - self._etcd_lock_prefix = None - self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT - self._component_to_job = {} self._job_uuid_to_component = {} self._component_to_builder = {} - self._job_to_executor = {} - self._executors = [] + self._executor = None # Map of etcd keys being watched to the tasks watching them self._watch_tasks = {} @@ -98,6 +93,7 @@ class EphemeralBuilderManager(BaseManager): # at the index we retrieved. We therefore start a new watch at HEAD and # (if specified) call the restarter method which should conduct a read and # reset the state of the manager. + # TODO: Remove this hack once Etcd is fixed. logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') new_index = None if restarter is not None: @@ -163,7 +159,8 @@ class EphemeralBuilderManager(BaseManager): self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) logger.info('Terminating expired build node: %s', builder_id) - yield From(self._job_to_executor[builder_id].stop_builder(builder_id)) + yield From(self._executor.stop_builder(builder_id)) + def _handle_realm_change(self, etcd_result): if etcd_result is None: @@ -205,10 +202,6 @@ class EphemeralBuilderManager(BaseManager): self._job_uuid_to_component[build_job.job_details['build_uuid']] = component return component - @property - def registered_executors(self): - return self._executors - @coroutine def _register_existing_realms(self): try: @@ -231,26 +224,13 @@ class EphemeralBuilderManager(BaseManager): # no realms have been registered yet pass - def _load_executor(self, executor_class_name, executor_config): - executor_klass = EXECUTORS.get(executor_class_name) - if executor_klass is None: - logger.error('Unknown executor %s; skipping install', executor_class_name) - return - - self._executors.append(executor_klass(executor_config, self.manager_hostname)) - def initialize(self, manager_config): logger.debug('Calling initialize') self._manager_config = manager_config - # Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style) - # or as a new set of executor configurations, with the order determining how we fallback. We - # check for both here to ensure backwards compatibility. - if manager_config.get('EXECUTORS'): - for executor_config in manager_config['EXECUTORS']: - self._load_executor(executor_config.get('EXECUTOR'), executor_config) - else: - self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG')) + executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor) + self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}), + self.manager_hostname) etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1') etcd_port = self._manager_config.get('ETCD_PORT', 2379) @@ -279,14 +259,13 @@ class EphemeralBuilderManager(BaseManager): restarter=self._register_existing_realms) self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/') - self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT', - DEFAULT_EPHEMERAL_API_TIMEOUT) # Load components for all realms currently known to the cluster async(self._register_existing_realms()) def setup_time(self): - return self._manager_config.get('MACHINE_SETUP_TIME', 300) + setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300) + return setup_time def shutdown(self): logger.debug('Shutting down worker.') @@ -337,62 +316,35 @@ class EphemeralBuilderManager(BaseManager): max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) payload = { - 'expiration': calendar.timegm(expiration.timetuple()), - 'max_expiration': calendar.timegm(max_expiration.timetuple()), - 'nonce': nonce, - 'had_heartbeat': False, - 'job_queue_item': build_job.job_item, + 'expiration': calendar.timegm(expiration.timetuple()), + 'max_expiration': calendar.timegm(max_expiration.timetuple()), + 'nonce': nonce, + 'had_heartbeat': False, + 'job_queue_item': build_job.job_item, } - lock_payload = json.dumps(payload) try: yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False, - ttl=self._ephemeral_api_timeout)) + ttl=EC2_API_TIMEOUT)) except (KeyError, etcd.EtcdKeyError): # The job was already taken by someone else, we are probably a retry logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) - raise Return(False, self._ephemeral_api_timeout) + raise Return(False, EC2_API_TIMEOUT) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) - started_with_executor = None - logger.debug("executors are: %s", self._executors) + executor_type = self._executor.__class__.__name__ + logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type) - for executor in self._executors: - executor_type = executor.__class__.__name__ - - # Check if we can use this executor based on its whitelist, by namespace. - namespace = build_job.namespace - if not executor.allowed_for_namespace(namespace): - logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace, - executor_type) - continue - - # Check if we can use this executor based on the retries remaining. - if executor.minimum_retry_threshold > build_job.retries_remaining: - logger.debug('Job %s cannot use executor %s due to not meeting retry threshold', build_uuid, - executor_type) - continue - - logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type) - - try: - builder_id = yield From(executor.start_builder(realm, token, build_uuid)) - metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count') - metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) - started_with_executor = executor - break - except: - logger.exception('Exception when starting builder for job: %s', build_uuid) - continue - - if started_with_executor is None: - logger.error('Could not start ephemeral worker for build %s', build_uuid) - raise Return(False, self._ephemeral_api_timeout) - - logger.debug('Started builder for job: %s with executor: %s', build_uuid, executor_type) + try: + builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) + metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count') + metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) + except: + logger.exception('Exception when starting builder for job: %s', build_uuid) + raise Return(False, EC2_API_TIMEOUT) # Store the builder in etcd associated with the job id try: @@ -401,14 +353,14 @@ class EphemeralBuilderManager(BaseManager): ttl=setup_time)) except etcd.EtcdException: logger.exception('Exception when writing job %s to etcd', build_uuid) - raise Return(False, self._ephemeral_api_timeout) + raise Return(False, EC2_API_TIMEOUT) # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ - 'realm': realm, - 'token': token, - 'builder_id': builder_id, - 'job_queue_item': build_job.job_item, + 'realm': realm, + 'token': token, + 'builder_id': builder_id, + 'job_queue_item': build_job.job_item, }) try: @@ -422,7 +374,6 @@ class EphemeralBuilderManager(BaseManager): logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) raise Return(False, setup_time) - self._job_to_executor[builder_id] = started_with_executor raise Return(True, None) @coroutine @@ -448,9 +399,7 @@ class EphemeralBuilderManager(BaseManager): logger.debug('Calling job_completed with status: %s', job_status) # Kill the ephmeral builder - builder_id = self._component_to_builder.pop(build_component) - yield From(self._job_to_executor[builder_id].stop_builder(builder_id)) - del self._job_to_executor[builder_id] + yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component))) # Release the lock in etcd job_key = self._etcd_job_key(build_job) @@ -482,11 +431,11 @@ class EphemeralBuilderManager(BaseManager): new_expiration = datetime.utcnow() + timedelta(seconds=ttl) payload = { - 'expiration': calendar.timegm(new_expiration.timetuple()), - 'builder_id': build_job_metadata['builder_id'], - 'job_queue_item': build_job.job_item, - 'max_expiration': build_job_metadata['max_expiration'], - 'had_heartbeat': True, + 'expiration': calendar.timegm(new_expiration.timetuple()), + 'builder_id': build_job_metadata['builder_id'], + 'job_queue_item': build_job.job_item, + 'max_expiration': build_job_metadata['max_expiration'], + 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index c9b222f12..557128ac6 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -6,9 +6,7 @@ import boto.ec2 import requests import cachetools import trollius -import datetime -import release -import socket + from jinja2 import FileSystemLoader, Environment from trollius import coroutine, From, Return, get_event_loop @@ -16,7 +14,7 @@ from functools import partial from buildman.asyncutil import AsyncWrapper from container_cloud_config import CloudConfigContext -from app import metric_queue, app +from app import metric_queue logger = logging.getLogger(__name__) @@ -39,15 +37,12 @@ class ExecutorException(Exception): class BuilderExecutor(object): def __init__(self, executor_config, manager_hostname): - """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for - starting and stopping builders. - """ self.executor_config = executor_config self.manager_hostname = manager_hostname - default_websocket_scheme = 'wss' if app.config['PREFERRED_URL_SCHEME'] == 'https' else 'ws' - self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme) - + """ Interface which can be plugged into the EphemeralNodeManager to provide a strategy for + starting and stopping builders. + """ @coroutine def start_builder(self, realm, token, build_uuid): """ Create a builder with the specified config. Returns a unique id which can be used to manage @@ -61,19 +56,8 @@ class BuilderExecutor(object): """ raise NotImplementedError - def allowed_for_namespace(self, namespace): - """ Returns true if this executor can be used for builds in the given namespace. """ - namespace_whitelist = self.executor_config.get('NAMESPACE_WHITELIST') - if namespace_whitelist is not None: - return namespace in namespace_whitelist - - return True - - @property - def minimum_retry_threshold(self): - """ Returns the minimum number of retries required for this executor to be used or 0 if - none. """ - return self.executor_config.get('MINIMUM_RETRY_THRESHOLD', 0) + def get_manager_websocket_url(self): + return 'ws://{0}:' def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, quay_username=None, quay_password=None): @@ -89,7 +73,6 @@ class BuilderExecutor(object): quay_username=quay_username, quay_password=quay_password, manager_hostname=manager_hostname, - websocket_scheme=self.websocket_scheme, coreos_channel=coreos_channel, worker_tag=self.executor_config['WORKER_TAG'], logentries_token=self.executor_config.get('LOGENTRIES_TOKEN', None), @@ -111,9 +94,9 @@ class EC2Executor(BuilderExecutor): """ Creates an ec2 connection which can be used to manage instances. """ return AsyncWrapper(boto.ec2.connect_to_region( - self.executor_config['EC2_REGION'], - aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], - aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], + self.executor_config['EC2_REGION'], + aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], + aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], )) @classmethod @@ -141,9 +124,9 @@ class EC2Executor(BuilderExecutor): ec2_conn = self._get_conn() ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType( - size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), - volume_type='gp2', - delete_on_termination=True, + size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), + volume_type='gp2', + delete_on_termination=True, ) block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping() block_devices['/dev/xvda'] = ssd_root_ebs @@ -151,21 +134,21 @@ class EC2Executor(BuilderExecutor): interfaces = None if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None: interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( - subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], - groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], - associate_public_ip_address=True, + subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], + groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], + associate_public_ip_address=True, ) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) try: reservation = yield From(ec2_conn.run_instances( - coreos_ami, - instance_type=self.executor_config['EC2_INSTANCE_TYPE'], - key_name=self.executor_config.get('EC2_KEY_NAME', None), - user_data=user_data, - instance_initiated_shutdown_behavior='terminate', - block_device_map=block_devices, - network_interfaces=interfaces, + coreos_ami, + instance_type=self.executor_config['EC2_INSTANCE_TYPE'], + key_name=self.executor_config.get('EC2_KEY_NAME', None), + user_data=user_data, + instance_initiated_shutdown_behavior='terminate', + block_device_map=block_devices, + network_interfaces=interfaces, )) except boto.exception.EC2ResponseError as ec2e: logger.exception('Unable to spawn builder instance') @@ -183,10 +166,10 @@ class EC2Executor(BuilderExecutor): for i in range(0, _TAG_RETRY_COUNT): try: yield From(launched.add_tags({ - 'Name': 'Quay Ephemeral Builder', - 'Realm': realm, - 'Token': token, - 'BuildUUID': build_uuid, + 'Name': 'Quay Ephemeral Builder', + 'Realm': realm, + 'Token': token, + 'BuildUUID': build_uuid, })) except boto.exception.EC2ResponseError as ec2e: if ec2e.error_code == 'InvalidInstanceID.NotFound': @@ -233,16 +216,13 @@ class PopenExecutor(BuilderExecutor): # Now start a machine for this job, adding the machine id to the etcd information logger.debug('Forking process for build') import subprocess - - ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost") - ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787") builder_env = { - 'TOKEN': token, - 'REALM': realm, - 'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port), - 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), - 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), - 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), + 'TOKEN': token, + 'REALM': realm, + 'ENDPOINT': 'ws://localhost:8787', + 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), + 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), + 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), } logpipe = LogPipe(logging.INFO) @@ -267,150 +247,6 @@ class PopenExecutor(BuilderExecutor): logpipe.close() -class KubernetesExecutor(BuilderExecutor): - """ Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual - machine in a pod """ - def __init__(self, *args, **kwargs): - super(KubernetesExecutor, self).__init__(*args, **kwargs) - self._loop = get_event_loop() - self.namespace = self.executor_config.get('BUILDER_NAMESPACE', 'builder') - self.image = self.executor_config.get('BUILDER_VM_CONTAINER_IMAGE', - 'quay.io/quay/quay-builder-qemu-coreos:stable') - - @coroutine - def _request(self, method, path, **kwargs): - request_options = dict(kwargs) - - tls_cert = self.executor_config.get('K8S_API_TLS_CERT') - tls_key = self.executor_config.get('K8S_API_TLS_KEY') - tls_ca = self.executor_config.get('K8S_API_TLS_CA') - - if 'timeout' not in request_options: - request_options['timeout'] = self.executor_config.get("K8S_API_TIMEOUT", 20) - - if tls_cert and tls_key: - scheme = 'https' - request_options['cert'] = (tls_cert, tls_key) - if tls_ca: - request_options['verify'] = tls_ca - else: - scheme = 'http' - - server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080') - url = '%s://%s%s' % (scheme, server, path) - - logger.debug('Executor config: %s', self.executor_config) - logger.debug('Kubernetes request: %s %s: %s', method, url, request_options) - res = requests.request(method, url, **request_options) - logger.debug('Kubernetes response: %s: %s', res.status_code, res.text) - raise Return(res) - - def _jobs_path(self): - return '/apis/batch/v1/namespaces/%s/jobs' % self.namespace - - def _job_path(self, build_uuid): - return '%s/%s' % (self._jobs_path(), build_uuid) - - def _job_resource(self, build_uuid, user_data, coreos_channel='stable'): - vm_memory_limit = self.executor_config.get('VM_MEMORY_LIMIT', '8G') - - # Max values for this container - container_limits = { - 'memory' : self.executor_config.get('CONTAINER_MEMORY_LIMIT', '8Gi'), - 'cpu' : self.executor_config.get('CONTAINER_CPU_LIMIT', "2"), - } - - # Minimum acceptable free resources for this container to "fit" in a quota - container_requests = { - 'memory' : self.executor_config.get('CONTAINER_MEMORY_REQUEST', '8Gi'), - 'cpu' : self.executor_config.get('CONTAINER_CPU_REQUEST', "2"), - } - - release_sha = release.GIT_HEAD or 'none' - if ' ' in release_sha: - release_sha = 'HEAD' - - return { - 'apiVersion': 'batch/v1', - 'kind': 'Job', - 'metadata': { - 'namespace': self.namespace, - 'generateName': build_uuid, - 'labels': { - 'build': build_uuid, - 'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'), - 'manager': socket.gethostname(), - 'quay-sha': release_sha, - }, - }, - 'spec' : { - 'activeDeadlineSeconds': self.executor_config.get('MAXIMUM_JOB_TIME', 7200), - 'template': { - 'metadata': { - 'labels': { - 'build': build_uuid, - 'time': datetime.datetime.now().strftime('%Y-%m-%d-%H'), - 'manager': socket.gethostname(), - 'quay-sha': release_sha, - }, - }, - 'spec': { - 'containers': [ - { - 'name': 'builder', - 'imagePullPolicy': 'IfNotPresent', - 'image': self.image, - 'securityContext': {'privileged': True}, - 'env': [ - {'name': 'USERDATA', 'value': user_data}, - {'name': 'VM_MEMORY', 'value': vm_memory_limit}, - ], - 'limits' : container_limits, - 'requests' : container_requests, - }, - ], - 'imagePullSecrets': [{'name': 'builder'}], - 'restartPolicy': 'Never', - }, - }, - }, - } - - @coroutine - def start_builder(self, realm, token, build_uuid): - # generate resource - channel = self.executor_config.get('COREOS_CHANNEL', 'stable') - user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname) - resource = self._job_resource(build_uuid, user_data, channel) - logger.debug('Generated kubernetes resource:\n%s', resource) - - # schedule - create_job = yield From(self._request('POST', self._jobs_path(), json=resource)) - if int(create_job.status_code / 100) != 2: - raise ExecutorException('Failed to create job: %s: %s: %s' % - (build_uuid, create_job.status_code, create_job.text)) - - job = create_job.json() - raise Return(job['metadata']['name']) - - @coroutine - def stop_builder(self, builder_id): - pods_path = '/api/v1/namespaces/%s/pods' % self.namespace - - # Delete the pod(s) for the job. - selectorString = "job-name=%s" % builder_id - try: - yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString))) - except: - logger.exception("Failed to send delete pod call for job %s", builder_id) - - # Delete the job itself. - try: - yield From(self._request('DELETE', self._job_path(builder_id))) - except: - logger.exception('Failed to send delete job call for job %s', builder_id) - - class LogPipe(threading.Thread): """ Adapted from http://codereview.stackexchange.com/a/17959 """ diff --git a/buildman/qemu-coreos/Dockerfile b/buildman/qemu-coreos/Dockerfile deleted file mode 100644 index 92f757102..000000000 --- a/buildman/qemu-coreos/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM debian - -RUN apt-get clean && apt-get update && apt-get install -y \ - bzip2 \ - curl \ - openssh-client \ - qemu-kvm - -ARG channel=stable - -RUN curl -s -O http://${channel}.release.core-os.net/amd64-usr/current/coreos_production_qemu_image.img.bz2 && \ - bzip2 -d coreos_production_qemu_image.img.bz2 - -RUN apt-get remove -y curl bzip2 && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* - -COPY start.sh /start.sh - -ENTRYPOINT ["/bin/bash", "/start.sh"] diff --git a/buildman/qemu-coreos/README.md b/buildman/qemu-coreos/README.md deleted file mode 100644 index 2d7ca53a7..000000000 --- a/buildman/qemu-coreos/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# Builder Image - -``` -CHANNEL=stable -docker build --build-arg channel=${CHANNEL} -t quay.io/quay/quay-builder-qemu-coreos:${CHANNEL} . -``` diff --git a/buildman/qemu-coreos/start.sh b/buildman/qemu-coreos/start.sh deleted file mode 100644 index b49c4f6e5..000000000 --- a/buildman/qemu-coreos/start.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash - -set -e -set -x -set -o nounset - -mkdir -p /userdata/openstack/latest -echo "$USERDATA" > /userdata/openstack/latest/user_data - -qemu-system-x86_64 \ - -enable-kvm \ - -cpu host \ - -device virtio-9p-pci,fsdev=conf,mount_tag=config-2 \ - -nographic \ - -drive if=virtio,file=./coreos_production_qemu_image.img \ - -fsdev local,id=conf,security_model=none,readonly,path=/userdata \ - -m "${VM_MEMORY}" \ - -machine accel=kvm \ - -net nic,model=virtio \ - -net user,hostfwd=tcp::2222-:22 \ - -smp 2 diff --git a/buildman/templates/cloudconfig.yaml b/buildman/templates/cloudconfig.yaml index ede42973b..bee09a67a 100644 --- a/buildman/templates/cloudconfig.yaml +++ b/buildman/templates/cloudconfig.yaml @@ -1,17 +1,11 @@ #cloud-config -users: - groups: - - sudo - - docker - ssh_authorized_keys: - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey Schorr - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 Jimmy Zelinskie - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ Jake Moshenko - ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= Quentin Machu - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC964SY8ojXZVfWknF+Pz+pTHpyb66VBH7OLYnGP+Tm452YKJVFb/rXCpZYHFlzSQtzz9hko8qBoEFXuD2humojx0P7nEtTy8wUClnKcifIqD5b/V1r7ZDa/5hL9Xog11gOXZ17TW1qjN+00qgXwoSh+jM8mAxD7V2ZLnanIDqmpYamT3ZlICz1k4bwYj35gnpSFpijAXeF9LXOEUfDtzNBjeaCvyniYlQyKzpKr8x+oIHumPlxwkFOzGhBMRGrCQ1Kzija8vVZQ6/Tjvxl19jwfgcNT0Zd9vLbHNowJPWQZhLYXdGIb3NxEfAqkGPvGCsaLfsfETYhcFwxr2g+zvf4xvyKgK35PHA/5t7TQryDSKDrQ1qTDUp3dAjzwsBFwEoQ0x68shGC661n/+APMNtj8qR5M9ueIH5WEqdRW10kKzlEm/ESvjyjEVRhXiwWyKkPch/OIUPKexKaEeOBdKocSnNx1+5ntk8OXWRQgjfwtQvm1NE/qD7fViBVUlTRk0c1SVpZaybIZkiMWmA1hzsdUbDP2mzPek1ydsVffw0I8z/dRo5gXQSPq06WfNIKpsiQF8LqP+KU+462A2tbHxFzq9VozI9PeFV+xO59wlJogv6q2yA0Jfv9BFgVgNzItIsUMvStrfkUBTYgaG9djp/vAm+SwMdnLSXILJtMO/3eRQ== Evan Cordell -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC3Q9+JcjEck8CylGEekvskypE8lT3hYnCCfGUoMTAURokD8STtEaVxr197efitQkvwSYxOnDXo2Qr59FqlQ6QtFeCynX87VerN49LJ0pUA1NoYBUCvWRzwpaa8CXGhYPRpfku12mJ0qjqmGFaR5jqhXTNfXmRcWePsXqS+b3FFEqw8BhKg6By1z7NLvKeaEno4Kd0wPpxzs+hFRnk38k2p+1YO1vZzZ2mgEVp9/2577t4TmP8ucnsb9X4vURRpOJwjG8HIgmmQFUVxHRST8Zu3zOXfg9Yv/n3JYhXhvvPxkV4JB6ZbVq0cLHasexFAxz7nTmF1gDWaPbGxmdZtaDe/ Colin Hom write_files: - path: /root/overrides.list @@ -19,7 +13,7 @@ write_files: content: | REALM={{ realm }} TOKEN={{ token }} - SERVER={{ websocket_scheme }}://{{ manager_hostname }} + SERVER=wss://{{ manager_hostname }} {% if logentries_token -%} LOGENTRIES_TOKEN={{ logentries_token }} {%- endif %} diff --git a/local-docker.sh b/local-docker.sh index 884346ec3..4545331a7 100755 --- a/local-docker.sh +++ b/local-docker.sh @@ -7,15 +7,7 @@ REPO=quay.io/quay/quay-dev d () { docker build -t $REPO -f dev.df --build-arg src_subdir=$(basename `pwd`) . - - #ENV_VARS="foo=bar key=value name=joe" - local envStr="" - if [[ "$ENV_VARS" != "" ]];then - for envVar in $ENV_VARS;do - envStr="${envStr} -e \"${envVar}\"" - done - fi - docker -- run --rm $envStr -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $* + docker -- run --rm -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $* } case $1 in diff --git a/test/test_buildman.py b/test/test_buildman.py index 163ef37ec..16dcc7754 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -1,15 +1,18 @@ import unittest import etcd +import os.path import time import json -import uuid -from trollius import coroutine, get_event_loop, From, Future, Return -from mock import Mock +from trollius import coroutine, get_event_loop, From, Future, sleep, Return +from mock import Mock, ANY from threading import Event +from urllib3.exceptions import ReadTimeoutError from buildman.manager.executor import BuilderExecutor -from buildman.manager.ephemeral import EphemeralBuilderManager, EXECUTORS +from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction +from buildman.server import BuildJobResult +from buildman.component.buildcomponent import BuildComponent BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead' @@ -24,16 +27,6 @@ def async_test(f): loop.run_until_complete(future) return wrapper - -class TestExecutor(BuilderExecutor): - job_started = None - - @coroutine - def start_builder(self, realm, token, build_uuid): - self.job_started = True - raise Return(str(uuid.uuid4)) - - class TestEphemeral(unittest.TestCase): def __init__(self, *args, **kwargs): self.etcd_client_mock = None @@ -48,8 +41,6 @@ class TestEphemeral(unittest.TestCase): self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client') self.etcd_client_mock.watch = Mock(side_effect=hang_until_event) - self.etcd_client_mock.read = Mock(side_effect=KeyError) - self.etcd_client_mock.write = Mock() return self.etcd_client_mock def _create_completed_future(self, result=None): @@ -59,7 +50,13 @@ class TestEphemeral(unittest.TestCase): return new_future return inner - def _create_build_job(self, namespace='namespace', retries=3): + def _create_mock_executor(self, *args, **kwargs): + self.test_executor = Mock(spec=BuilderExecutor) + self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123')) + self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future()) + return self.test_executor + + def _create_build_job(self): mock_job = Mock() mock_job.job_details = { 'build_uuid': BUILD_UUID, @@ -68,17 +65,13 @@ class TestEphemeral(unittest.TestCase): 'body': json.dumps(mock_job.job_details), 'id': 1, } - - mock_job.namespace = namespace - mock_job.retries_remaining = retries return mock_job def setUp(self): - self._existing_executors = dict(EXECUTORS) + EphemeralBuilderManager._executors['test'] = self._create_mock_executor self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client - self.etcd_wait_event.clear() self.register_component_callback = Mock() @@ -95,174 +88,175 @@ class TestEphemeral(unittest.TestCase): 30, ) + self.manager.initialize({'EXECUTOR': 'test'}) + + self.mock_job = self._create_build_job() + self.mock_job_key = os.path.join('building/', BUILD_UUID) + def tearDown(self): self.etcd_wait_event.set() + self.manager.shutdown() - EXECUTORS = self._existing_executors + del EphemeralBuilderManager._executors['test'] EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass - def test_verify_executor_oldconfig(self): - EXECUTORS['test'] = TestExecutor - self.manager.initialize({ - 'EXECUTOR': 'test', - 'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42) + @coroutine + def _setup_job_for_managers(self): + # Test that we are watching the realm location before anything else happens + self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None) + + self.etcd_client_mock.read = Mock(side_effect=KeyError) + test_component = Mock(spec=BuildComponent) + test_component.builder_realm = REALM_ID + test_component.start_build = Mock(side_effect=self._create_completed_future()) + self.register_component_callback.return_value = test_component + + # Ask for a builder to be scheduled + is_scheduled = yield From(self.manager.schedule(self.mock_job)) + + self.assertTrue(is_scheduled) + + self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True) + self.assertEqual(self.test_executor.start_builder.call_count, 1) + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) + self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key) + + # Right now the job is not registered with any managers because etcd has not accepted the job + self.assertEqual(self.register_component_callback.call_count, 0) + + realm_created = Mock(spec=etcd.EtcdResult) + realm_created.action = EtcdAction.CREATE + realm_created.key = os.path.join('realm/', REALM_ID) + realm_created.value = json.dumps({ + 'realm': REALM_ID, + 'token': 'beef', + 'builder_id': '123', + 'job_queue_item': self.mock_job.job_item, }) - # Ensure that we have a single test executor. - self.assertEquals(1, len(self.manager.registered_executors)) - self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) + self.manager._handle_realm_change(realm_created) - def test_verify_executor_newconfig(self): - EXECUTORS['test'] = TestExecutor - self.manager.initialize({ - 'EXECUTORS': [{ - 'EXECUTOR': 'test', - 'MINIMUM_RETRY_THRESHOLD': 42 - }] - }) + self.assertEqual(self.register_component_callback.call_count, 1) - # Ensure that we have a single test executor. - self.assertEquals(1, len(self.manager.registered_executors)) - self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) - - def test_verify_multiple_executors(self): - EXECUTORS['test'] = TestExecutor - EXECUTORS['anotherexecutor'] = TestExecutor - - self.manager.initialize({ - 'EXECUTORS': [ - { - 'EXECUTOR': 'test', - 'MINIMUM_RETRY_THRESHOLD': 42 - }, - { - 'EXECUTOR': 'anotherexecutor', - 'MINIMUM_RETRY_THRESHOLD': 24 - }, - ] - }) - - # Ensure that we have a two test executors. - self.assertEquals(2, len(self.manager.registered_executors)) - self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold) - self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold) - - def test_skip_invalid_executor(self): - self.manager.initialize({ - 'EXECUTORS': [ - { - 'EXECUTOR': 'unknown', - 'MINIMUM_RETRY_THRESHOLD': 42 - }, - ] - }) - - self.assertEquals(0, len(self.manager.registered_executors)) + raise Return(test_component) @async_test - def test_schedule_job_namespace_filter(self): - EXECUTORS['test'] = TestExecutor - self.manager.initialize({ - 'EXECUTORS': [{ - 'EXECUTOR': 'test', - 'NAMESPACE_WHITELIST': ['something'], - }] - }) + @unittest.skip('this test is flaky on Quay.io builders') + def test_schedule_and_complete(self): + # Test that a job is properly registered with all of the managers + test_component = yield From(self._setup_job_for_managers()) - # Try with a build job in an invalid namespace. - build_job = self._create_build_job(namespace='somethingelse') - result = yield From(self.manager.schedule(build_job)) - self.assertFalse(result[0]) + # Take the job ourselves + yield From(self.manager.build_component_ready(test_component)) - # Try with a valid namespace. - build_job = self._create_build_job(namespace='something') - result = yield From(self.manager.schedule(build_job)) - self.assertTrue(result[0]) + self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID)) + self.etcd_client_mock.delete.reset_mock() + + # Finish the job + yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component)) + + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key) @async_test - def test_schedule_job_retries_filter(self): - EXECUTORS['test'] = TestExecutor - self.manager.initialize({ - 'EXECUTORS': [{ - 'EXECUTOR': 'test', - 'MINIMUM_RETRY_THRESHOLD': 2, - }] + @unittest.skip('this test is flaky on Quay.io builders') + def test_another_manager_takes_job(self): + # Prepare a job to be taken by another manager + test_component = yield From(self._setup_job_for_managers()) + + realm_deleted = Mock(spec=etcd.EtcdResult) + realm_deleted.action = EtcdAction.DELETE + realm_deleted.key = os.path.join('realm/', REALM_ID) + + realm_deleted._prev_node = Mock(spec=etcd.EtcdResult) + realm_deleted._prev_node.value = json.dumps({ + 'realm': REALM_ID, + 'token': 'beef', + 'builder_id': '123', + 'job_queue_item': self.mock_job.job_item, }) - # Try with a build job that has too few retries. - build_job = self._create_build_job(retries=1) - result = yield From(self.manager.schedule(build_job)) - self.assertFalse(result[0]) - - # Try with a valid job. - build_job = self._create_build_job(retries=2) - result = yield From(self.manager.schedule(build_job)) - self.assertTrue(result[0]) + self.manager._handle_realm_change(realm_deleted) + self.unregister_component_callback.assert_called_once_with(test_component) @async_test - def test_schedule_job_executor_fallback(self): - EXECUTORS['primary'] = TestExecutor - EXECUTORS['secondary'] = TestExecutor + @unittest.skip('this test is flaky on Quay.io builders') + def test_expiring_worker(self): + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) - self.manager.initialize({ - 'EXECUTORS': [ - { - 'EXECUTOR': 'primary', - 'NAMESPACE_WHITELIST': ['something'], - 'MINIMUM_RETRY_THRESHOLD': 3, - }, - { - 'EXECUTOR': 'secondary', - 'MINIMUM_RETRY_THRESHOLD': 2, - }, - ] + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) + + yield From(self.manager._handle_builder_expiration(expired_result)) + + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) + + @async_test + @unittest.skip('this test is flaky on Quay.io builders') + def test_builder_never_starts(self): + test_component = yield From(self._setup_job_for_managers()) + + # Test that we are watching before anything else happens + self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None) + + # Send a signal to the callback that a worker has expired + expired_result = Mock(spec=etcd.EtcdResult) + expired_result.action = EtcdAction.EXPIRE + expired_result.key = self.mock_job_key + expired_result._prev_node = Mock(spec=etcd.EtcdResult) + expired_result._prev_node.value = json.dumps({ + 'builder_id': '1234', + 'had_heartbeat': False, + 'job_queue_item': self.mock_job.job_item, }) - # Try a job not matching the primary's namespace filter. Should schedule on secondary. - build_job = self._create_build_job(namespace='somethingelse') - result = yield From(self.manager.schedule(build_job)) - self.assertTrue(result[0]) + yield From(self.manager._handle_builder_expiration(expired_result)) - self.assertIsNone(self.manager.registered_executors[0].job_started) - self.assertIsNotNone(self.manager.registered_executors[1].job_started) + self.test_executor.stop_builder.assert_called_once_with('1234') + self.assertEqual(self.test_executor.stop_builder.call_count, 1) - self.manager.registered_executors[0].job_started = None - self.manager.registered_executors[1].job_started = None + self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE) - # Try a job not matching the primary's retry minimum. Should schedule on secondary. - build_job = self._create_build_job(namespace='something', retries=2) - result = yield From(self.manager.schedule(build_job)) - self.assertTrue(result[0]) + @async_test + def test_change_worker(self): + # Send a signal to the callback that a worker key has been changed + set_result = Mock(spec=etcd.EtcdResult) + set_result.action = 'set' + set_result.key = self.mock_job_key - self.assertIsNone(self.manager.registered_executors[0].job_started) - self.assertIsNotNone(self.manager.registered_executors[1].job_started) + self.manager._handle_builder_expiration(set_result) - self.manager.registered_executors[0].job_started = None - self.manager.registered_executors[1].job_started = None + yield From(sleep(.01)) - # Try a job matching the primary. Should schedule on the primary. - build_job = self._create_build_job(namespace='something', retries=3) - result = yield From(self.manager.schedule(build_job)) - self.assertTrue(result[0]) + self.assertEquals(self.test_executor.stop_builder.call_count, 0) - self.assertIsNotNone(self.manager.registered_executors[0].job_started) - self.assertIsNone(self.manager.registered_executors[1].job_started) + @async_test + def test_heartbeat_response(self): + expiration_timestamp = time.time() + 60 + builder_result = Mock(spec=etcd.EtcdResult) + builder_result.value = json.dumps({ + 'builder_id': '123', + 'expiration': expiration_timestamp, + 'max_expiration': expiration_timestamp, + }) + self.etcd_client_mock.read = Mock(return_value=builder_result) - self.manager.registered_executors[0].job_started = None - self.manager.registered_executors[1].job_started = None + yield From(self.manager.job_heartbeat(self.mock_job)) - # Try a job not matching either's restrictions. - build_job = self._create_build_job(namespace='somethingelse', retries=1) - result = yield From(self.manager.schedule(build_job)) - self.assertFalse(result[0]) + # Wait for threads to complete + yield From(sleep(.01)) - self.assertIsNone(self.manager.registered_executors[0].job_started) - self.assertIsNone(self.manager.registered_executors[1].job_started) - - self.manager.registered_executors[0].job_started = None - self.manager.registered_executors[1].job_started = None + self.job_heartbeat_callback.assert_called_once_with(self.mock_job) + self.assertEqual(self.etcd_client_mock.write.call_count, 1) + self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key) if __name__ == '__main__': unittest.main() diff --git a/test/test_secscan.py b/test/test_secscan.py index 03251d7bd..e63b70c5f 100644 --- a/test/test_secscan.py +++ b/test/test_secscan.py @@ -1,14 +1,16 @@ import unittest import json +import os from httmock import urlmatch, all_requests, HTTMock -from app import app, storage, notification_queue +from app import app, config_provider, storage, notification_queue from endpoints.notificationevent import VulnerabilityFoundEvent from initdb import setup_database_for_testing, finished_database_for_testing from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException from util.secscan.analyzer import LayerAnalyzer from util.secscan.notifier import process_notification_data from data import model +from storage.basestorage import StoragePaths from workers.security_notification_worker import SecurityNotificationWorker from endpoints.v2 import v2_bp