diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index c7730bbae..fd2745075 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -59,6 +59,9 @@ class EphemeralBuilderManager(BaseManager): self._etcd_realm_prefix = None self._etcd_builder_prefix = None + self._etcd_lock_prefix = Nopne + self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT + self._component_to_job = {} self._job_uuid_to_component = {} self._component_to_builder = {} @@ -95,7 +98,6 @@ class EphemeralBuilderManager(BaseManager): # at the index we retrieved. We therefore start a new watch at HEAD and # (if specified) call the restarter method which should conduct a read and # reset the state of the manager. - # TODO: Remove this hack once Etcd is fixed. logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') new_index = None if restarter is not None: @@ -321,12 +323,13 @@ class EphemeralBuilderManager(BaseManager): max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) payload = { - 'expiration': calendar.timegm(expiration.timetuple()), - 'max_expiration': calendar.timegm(max_expiration.timetuple()), - 'nonce': nonce, - 'had_heartbeat': False, - 'job_queue_item': build_job.job_item, + 'expiration': calendar.timegm(expiration.timetuple()), + 'max_expiration': calendar.timegm(max_expiration.timetuple()), + 'nonce': nonce, + 'had_heartbeat': False, + 'job_queue_item': build_job.job_item, } + lock_payload = json.dumps(payload) try: @@ -372,10 +375,10 @@ class EphemeralBuilderManager(BaseManager): # Store the realm spec which will allow any manager to accept this builder when it connects realm_spec = json.dumps({ - 'realm': realm, - 'token': token, - 'builder_id': builder_id, - 'job_queue_item': build_job.job_item, + 'realm': realm, + 'token': token, + 'builder_id': builder_id, + 'job_queue_item': build_job.job_item, }) try: @@ -388,6 +391,7 @@ class EphemeralBuilderManager(BaseManager): except etcd.EtcdException: logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) raise Return(False, setup_time) + self._job_to_executor[builder_id] = executor raise Return(True, None) @@ -449,11 +453,11 @@ class EphemeralBuilderManager(BaseManager): new_expiration = datetime.utcnow() + timedelta(seconds=ttl) payload = { - 'expiration': calendar.timegm(new_expiration.timetuple()), - 'builder_id': build_job_metadata['builder_id'], - 'job_queue_item': build_job.job_item, - 'max_expiration': build_job_metadata['max_expiration'], - 'had_heartbeat': True, + 'expiration': calendar.timegm(new_expiration.timetuple()), + 'builder_id': build_job_metadata['builder_id'], + 'job_queue_item': build_job.job_item, + 'max_expiration': build_job_metadata['max_expiration'], + 'had_heartbeat': True, } yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index 664ff6535..e87aba451 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -6,7 +6,6 @@ import boto.ec2 import requests import cachetools import trollius -import json import datetime import release import socket @@ -62,9 +61,6 @@ class BuilderExecutor(object): """ raise NotImplementedError - def get_manager_websocket_url(self): - return 'ws://{0}:' - def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, quay_username=None, quay_password=None): if quay_username is None: @@ -101,9 +97,9 @@ class EC2Executor(BuilderExecutor): """ Creates an ec2 connection which can be used to manage instances. """ return AsyncWrapper(boto.ec2.connect_to_region( - self.executor_config['EC2_REGION'], - aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], - aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], + self.executor_config['EC2_REGION'], + aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], + aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], )) @classmethod @@ -131,9 +127,9 @@ class EC2Executor(BuilderExecutor): ec2_conn = self._get_conn() ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType( - size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), - volume_type='gp2', - delete_on_termination=True, + size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), + volume_type='gp2', + delete_on_termination=True, ) block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping() block_devices['/dev/xvda'] = ssd_root_ebs @@ -141,21 +137,21 @@ class EC2Executor(BuilderExecutor): interfaces = None if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None: interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( - subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], - groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], - associate_public_ip_address=True, + subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], + groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], + associate_public_ip_address=True, ) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) try: reservation = yield From(ec2_conn.run_instances( - coreos_ami, - instance_type=self.executor_config['EC2_INSTANCE_TYPE'], - key_name=self.executor_config.get('EC2_KEY_NAME', None), - user_data=user_data, - instance_initiated_shutdown_behavior='terminate', - block_device_map=block_devices, - network_interfaces=interfaces, + coreos_ami, + instance_type=self.executor_config['EC2_INSTANCE_TYPE'], + key_name=self.executor_config.get('EC2_KEY_NAME', None), + user_data=user_data, + instance_initiated_shutdown_behavior='terminate', + block_device_map=block_devices, + network_interfaces=interfaces, )) except boto.exception.EC2ResponseError as ec2e: logger.exception('Unable to spawn builder instance') @@ -173,10 +169,10 @@ class EC2Executor(BuilderExecutor): for i in range(0, _TAG_RETRY_COUNT): try: yield From(launched.add_tags({ - 'Name': 'Quay Ephemeral Builder', - 'Realm': realm, - 'Token': token, - 'BuildUUID': build_uuid, + 'Name': 'Quay Ephemeral Builder', + 'Realm': realm, + 'Token': token, + 'BuildUUID': build_uuid, })) except boto.exception.EC2ResponseError as ec2e: if ec2e.error_code == 'InvalidInstanceID.NotFound': @@ -227,12 +223,12 @@ class PopenExecutor(BuilderExecutor): ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost") ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787") builder_env = { - 'TOKEN': token, - 'REALM': realm, - 'ENDPOINT': 'ws://%s:%s' % (ws_host,ws_port), - 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), - 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), - 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), + 'TOKEN': token, + 'REALM': realm, + 'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port), + 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), + 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), + 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), } logpipe = LogPipe(logging.INFO) @@ -258,7 +254,8 @@ class PopenExecutor(BuilderExecutor): class KubernetesExecutor(BuilderExecutor): - """ Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual machine in a pod """ + """ Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual + machine in a pod """ def __init__(self, *args, **kwargs): self._loop = get_event_loop() super(KubernetesExecutor, self).__init__(*args, **kwargs) @@ -286,7 +283,8 @@ class KubernetesExecutor(BuilderExecutor): server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080') url = '%s://%s%s' % (scheme, server, path) - logger.debug('EXEC CFG: %s',self.executor_config) + + logger.debug('Executor config: %s', self.executor_config) logger.debug('Kubernetes request: %s %s: %s', method, url, request_options) res = requests.request(method, url, **request_options) logger.debug('Kubernetes response: %s: %s', res.status_code, res.text) @@ -343,16 +341,16 @@ class KubernetesExecutor(BuilderExecutor): 'name': 'builder', 'image': '%s:%s' % (self.image, coreos_channel), 'imagePullPolicy': 'Always', - 'securityContext': { 'privileged': True }, + 'securityContext': {'privileged': True}, 'env': [ - { 'name': 'USERDATA', 'value': user_data }, - { 'name': 'VM_MEMORY', 'value': vm_memory_limit }, + {'name': 'USERDATA', 'value': user_data}, + {'name': 'VM_MEMORY', 'value': vm_memory_limit}, ], 'limits' : container_limits, 'requests' : container_requests, }, ], - 'imagePullSecrets': [{ 'name': 'builder' }], + 'imagePullSecrets': [{'name': 'builder'}], 'restartPolicy': 'Never', }, }, @@ -370,8 +368,8 @@ class KubernetesExecutor(BuilderExecutor): # schedule create_job = yield From(self._request('POST', self._jobs_path(), json=resource)) if int(create_job.status_code / 100) != 2: - raise ExecutorException('Failed to create job: %s: %s: %s' % ( - build_uuid, create_job.status_code, create_job.text)) + raise ExecutorException('Failed to create job: %s: %s: %s' % + (build_uuid, create_job.status_code, create_job.text)) job = create_job.json() raise Return(job['metadata']['name']) @@ -380,21 +378,19 @@ class KubernetesExecutor(BuilderExecutor): def stop_builder(self, builder_id): pods_path = '/api/v1/namespaces/%s/pods' % self.namespace + # Delete the pod(s) for the job. selectorString = "job-name=%s" % builder_id try: - delete_pod = yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString))) + yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString))) except: - # if the pod does not exist, we will not get an error here. this covers lack of api connectivity, etc - logger.exception("Failed to delete pod for job %s", builder_id) - raise - - logger.debug("Got successful delete pod response: %s", delete_pod.text) + logger.exception("Failed to send delete pod call for job %s", builder_id) + # Delete the job itself. try: - delete_job = yield From(self._request('DELETE', self._job_path(builder_id))) + yield From(self._request('DELETE', self._job_path(builder_id))) except: - logger.exception('Exception when trying to terminate job %s', builder_id) - raise + logger.exception('Failed to send delete job call for job %s', builder_id) + class LogPipe(threading.Thread): """ Adapted from http://codereview.stackexchange.com/a/17959 diff --git a/buildman/templates/cloudconfig.yaml b/buildman/templates/cloudconfig.yaml index dcc9878ad..ede42973b 100644 --- a/buildman/templates/cloudconfig.yaml +++ b/buildman/templates/cloudconfig.yaml @@ -19,7 +19,7 @@ write_files: content: | REALM={{ realm }} TOKEN={{ token }} - SERVER={{websocket_scheme}}://{{ manager_hostname }} + SERVER={{ websocket_scheme }}://{{ manager_hostname }} {% if logentries_token -%} LOGENTRIES_TOKEN={{ logentries_token }} {%- endif %}