Small code cleanup before whitelist addition

This commit is contained in:
Joseph Schorr 2016-07-08 13:01:02 -04:00
parent 1e3351f3f4
commit 7471d0e35f
3 changed files with 63 additions and 63 deletions

View file

@ -59,6 +59,9 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = None self._etcd_realm_prefix = None
self._etcd_builder_prefix = None self._etcd_builder_prefix = None
self._etcd_lock_prefix = Nopne
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {} self._component_to_job = {}
self._job_uuid_to_component = {} self._job_uuid_to_component = {}
self._component_to_builder = {} self._component_to_builder = {}
@ -95,7 +98,6 @@ class EphemeralBuilderManager(BaseManager):
# at the index we retrieved. We therefore start a new watch at HEAD and # at the index we retrieved. We therefore start a new watch at HEAD and
# (if specified) call the restarter method which should conduct a read and # (if specified) call the restarter method which should conduct a read and
# reset the state of the manager. # reset the state of the manager.
# TODO: Remove this hack once Etcd is fixed.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.') logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None new_index = None
if restarter is not None: if restarter is not None:
@ -321,12 +323,13 @@ class EphemeralBuilderManager(BaseManager):
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = { payload = {
'expiration': calendar.timegm(expiration.timetuple()), 'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce, 'nonce': nonce,
'had_heartbeat': False, 'had_heartbeat': False,
'job_queue_item': build_job.job_item, 'job_queue_item': build_job.job_item,
} }
lock_payload = json.dumps(payload) lock_payload = json.dumps(payload)
try: try:
@ -372,10 +375,10 @@ class EphemeralBuilderManager(BaseManager):
# Store the realm spec which will allow any manager to accept this builder when it connects # Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({ realm_spec = json.dumps({
'realm': realm, 'realm': realm,
'token': token, 'token': token,
'builder_id': builder_id, 'builder_id': builder_id,
'job_queue_item': build_job.job_item, 'job_queue_item': build_job.job_item,
}) })
try: try:
@ -388,6 +391,7 @@ class EphemeralBuilderManager(BaseManager):
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid) logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time) raise Return(False, setup_time)
self._job_to_executor[builder_id] = executor self._job_to_executor[builder_id] = executor
raise Return(True, None) raise Return(True, None)
@ -449,11 +453,11 @@ class EphemeralBuilderManager(BaseManager):
new_expiration = datetime.utcnow() + timedelta(seconds=ttl) new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = { payload = {
'expiration': calendar.timegm(new_expiration.timetuple()), 'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'], 'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item, 'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'], 'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True, 'had_heartbeat': True,
} }
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))

View file

@ -6,7 +6,6 @@ import boto.ec2
import requests import requests
import cachetools import cachetools
import trollius import trollius
import json
import datetime import datetime
import release import release
import socket import socket
@ -62,9 +61,6 @@ class BuilderExecutor(object):
""" """
raise NotImplementedError raise NotImplementedError
def get_manager_websocket_url(self):
return 'ws://{0}:'
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname, def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None): quay_username=None, quay_password=None):
if quay_username is None: if quay_username is None:
@ -101,9 +97,9 @@ class EC2Executor(BuilderExecutor):
""" Creates an ec2 connection which can be used to manage instances. """ Creates an ec2 connection which can be used to manage instances.
""" """
return AsyncWrapper(boto.ec2.connect_to_region( return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'], self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'], aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'], aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
)) ))
@classmethod @classmethod
@ -131,9 +127,9 @@ class EC2Executor(BuilderExecutor):
ec2_conn = self._get_conn() ec2_conn = self._get_conn()
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType( ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)), size=int(self.executor_config.get('BLOCK_DEVICE_SIZE', 48)),
volume_type='gp2', volume_type='gp2',
delete_on_termination=True, delete_on_termination=True,
) )
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping() block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
block_devices['/dev/xvda'] = ssd_root_ebs block_devices['/dev/xvda'] = ssd_root_ebs
@ -141,21 +137,21 @@ class EC2Executor(BuilderExecutor):
interfaces = None interfaces = None
if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None: if self.executor_config.get('EC2_VPC_SUBNET_ID', None) is not None:
interface = boto.ec2.networkinterface.NetworkInterfaceSpecification( interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'], subnet_id=self.executor_config['EC2_VPC_SUBNET_ID'],
groups=self.executor_config['EC2_SECURITY_GROUP_IDS'], groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
associate_public_ip_address=True, associate_public_ip_address=True,
) )
interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface) interfaces = boto.ec2.networkinterface.NetworkInterfaceCollection(interface)
try: try:
reservation = yield From(ec2_conn.run_instances( reservation = yield From(ec2_conn.run_instances(
coreos_ami, coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'], instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
key_name=self.executor_config.get('EC2_KEY_NAME', None), key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data, user_data=user_data,
instance_initiated_shutdown_behavior='terminate', instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices, block_device_map=block_devices,
network_interfaces=interfaces, network_interfaces=interfaces,
)) ))
except boto.exception.EC2ResponseError as ec2e: except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance') logger.exception('Unable to spawn builder instance')
@ -173,10 +169,10 @@ class EC2Executor(BuilderExecutor):
for i in range(0, _TAG_RETRY_COUNT): for i in range(0, _TAG_RETRY_COUNT):
try: try:
yield From(launched.add_tags({ yield From(launched.add_tags({
'Name': 'Quay Ephemeral Builder', 'Name': 'Quay Ephemeral Builder',
'Realm': realm, 'Realm': realm,
'Token': token, 'Token': token,
'BuildUUID': build_uuid, 'BuildUUID': build_uuid,
})) }))
except boto.exception.EC2ResponseError as ec2e: except boto.exception.EC2ResponseError as ec2e:
if ec2e.error_code == 'InvalidInstanceID.NotFound': if ec2e.error_code == 'InvalidInstanceID.NotFound':
@ -227,12 +223,12 @@ class PopenExecutor(BuilderExecutor):
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost") ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787") ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
builder_env = { builder_env = {
'TOKEN': token, 'TOKEN': token,
'REALM': realm, 'REALM': realm,
'ENDPOINT': 'ws://%s:%s' % (ws_host,ws_port), 'ENDPOINT': 'ws://%s:%s' % (ws_host, ws_port),
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''), 'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''), 'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''), 'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
} }
logpipe = LogPipe(logging.INFO) logpipe = LogPipe(logging.INFO)
@ -258,7 +254,8 @@ class PopenExecutor(BuilderExecutor):
class KubernetesExecutor(BuilderExecutor): class KubernetesExecutor(BuilderExecutor):
""" Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual machine in a pod """ """ Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual
machine in a pod """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._loop = get_event_loop() self._loop = get_event_loop()
super(KubernetesExecutor, self).__init__(*args, **kwargs) super(KubernetesExecutor, self).__init__(*args, **kwargs)
@ -286,7 +283,8 @@ class KubernetesExecutor(BuilderExecutor):
server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080') server = self.executor_config.get('K8S_API_SERVER', 'localhost:8080')
url = '%s://%s%s' % (scheme, server, path) url = '%s://%s%s' % (scheme, server, path)
logger.debug('EXEC CFG: %s',self.executor_config)
logger.debug('Executor config: %s', self.executor_config)
logger.debug('Kubernetes request: %s %s: %s', method, url, request_options) logger.debug('Kubernetes request: %s %s: %s', method, url, request_options)
res = requests.request(method, url, **request_options) res = requests.request(method, url, **request_options)
logger.debug('Kubernetes response: %s: %s', res.status_code, res.text) logger.debug('Kubernetes response: %s: %s', res.status_code, res.text)
@ -343,16 +341,16 @@ class KubernetesExecutor(BuilderExecutor):
'name': 'builder', 'name': 'builder',
'image': '%s:%s' % (self.image, coreos_channel), 'image': '%s:%s' % (self.image, coreos_channel),
'imagePullPolicy': 'Always', 'imagePullPolicy': 'Always',
'securityContext': { 'privileged': True }, 'securityContext': {'privileged': True},
'env': [ 'env': [
{ 'name': 'USERDATA', 'value': user_data }, {'name': 'USERDATA', 'value': user_data},
{ 'name': 'VM_MEMORY', 'value': vm_memory_limit }, {'name': 'VM_MEMORY', 'value': vm_memory_limit},
], ],
'limits' : container_limits, 'limits' : container_limits,
'requests' : container_requests, 'requests' : container_requests,
}, },
], ],
'imagePullSecrets': [{ 'name': 'builder' }], 'imagePullSecrets': [{'name': 'builder'}],
'restartPolicy': 'Never', 'restartPolicy': 'Never',
}, },
}, },
@ -370,8 +368,8 @@ class KubernetesExecutor(BuilderExecutor):
# schedule # schedule
create_job = yield From(self._request('POST', self._jobs_path(), json=resource)) create_job = yield From(self._request('POST', self._jobs_path(), json=resource))
if int(create_job.status_code / 100) != 2: if int(create_job.status_code / 100) != 2:
raise ExecutorException('Failed to create job: %s: %s: %s' % ( raise ExecutorException('Failed to create job: %s: %s: %s' %
build_uuid, create_job.status_code, create_job.text)) (build_uuid, create_job.status_code, create_job.text))
job = create_job.json() job = create_job.json()
raise Return(job['metadata']['name']) raise Return(job['metadata']['name'])
@ -380,21 +378,19 @@ class KubernetesExecutor(BuilderExecutor):
def stop_builder(self, builder_id): def stop_builder(self, builder_id):
pods_path = '/api/v1/namespaces/%s/pods' % self.namespace pods_path = '/api/v1/namespaces/%s/pods' % self.namespace
# Delete the pod(s) for the job.
selectorString = "job-name=%s" % builder_id selectorString = "job-name=%s" % builder_id
try: try:
delete_pod = yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString))) yield From(self._request('DELETE', pods_path, params=dict(labelSelector=selectorString)))
except: except:
# if the pod does not exist, we will not get an error here. this covers lack of api connectivity, etc logger.exception("Failed to send delete pod call for job %s", builder_id)
logger.exception("Failed to delete pod for job %s", builder_id)
raise
logger.debug("Got successful delete pod response: %s", delete_pod.text)
# Delete the job itself.
try: try:
delete_job = yield From(self._request('DELETE', self._job_path(builder_id))) yield From(self._request('DELETE', self._job_path(builder_id)))
except: except:
logger.exception('Exception when trying to terminate job %s', builder_id) logger.exception('Failed to send delete job call for job %s', builder_id)
raise
class LogPipe(threading.Thread): class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959 """ Adapted from http://codereview.stackexchange.com/a/17959

View file

@ -19,7 +19,7 @@ write_files:
content: | content: |
REALM={{ realm }} REALM={{ realm }}
TOKEN={{ token }} TOKEN={{ token }}
SERVER={{websocket_scheme}}://{{ manager_hostname }} SERVER={{ websocket_scheme }}://{{ manager_hostname }}
{% if logentries_token -%} {% if logentries_token -%}
LOGENTRIES_TOKEN={{ logentries_token }} LOGENTRIES_TOKEN={{ logentries_token }}
{%- endif %} {%- endif %}