Revert "Merge pull request #1605 from coreos-inc/kubernetes-builder"

This reverts commit a69266c282, reversing
changes made to 3143da6392.
This commit is contained in:
Jimmy Zelinskie 2016-07-15 16:32:34 -04:00
parent 30fa1b5906
commit bef55f9f6d
10 changed files with 230 additions and 519 deletions

View file

@ -12,7 +12,7 @@ from urllib3.exceptions import ReadTimeoutError, ProtocolError
from app import metric_queue
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper
@ -24,14 +24,9 @@ logger = logging.getLogger(__name__)
ETCD_MAX_WATCH_TIMEOUT = 30
EC2_API_TIMEOUT = 20
RETRY_IMMEDIATELY_TIMEOUT = 0
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
class EtcdAction(object):
GET = 'get'
@ -46,6 +41,10 @@ class EtcdAction(object):
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
_etcd_client_klass = etcd.Client
@ -59,15 +58,11 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = None
self._etcd_builder_prefix = None
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._component_to_job = {}
self._job_uuid_to_component = {}
self._component_to_builder = {}
self._job_to_executor = {}
self._executors = []
self._executor = None
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
@ -98,6 +93,7 @@ class EphemeralBuilderManager(BaseManager):
# at the index we retrieved. We therefore start a new watch at HEAD and
# (if specified) call the restarter method which should conduct a read and
# reset the state of the manager.
# TODO: Remove this hack once Etcd is fixed.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
if restarter is not None:
@ -163,7 +159,8 @@ class EphemeralBuilderManager(BaseManager):
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
yield From(self._executor.stop_builder(builder_id))
def _handle_realm_change(self, etcd_result):
if etcd_result is None:
@ -205,10 +202,6 @@ class EphemeralBuilderManager(BaseManager):
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
return component
@property
def registered_executors(self):
return self._executors
@coroutine
def _register_existing_realms(self):
try:
@ -231,26 +224,13 @@ class EphemeralBuilderManager(BaseManager):
# no realms have been registered yet
pass
def _load_executor(self, executor_class_name, executor_config):
executor_klass = EXECUTORS.get(executor_class_name)
if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_class_name)
return
self._executors.append(executor_klass(executor_config, self.manager_hostname))
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
# or as a new set of executor configurations, with the order determining how we fallback. We
# check for both here to ensure backwards compatibility.
if manager_config.get('EXECUTORS'):
for executor_config in manager_config['EXECUTORS']:
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
else:
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.manager_hostname)
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
@ -279,14 +259,13 @@ class EphemeralBuilderManager(BaseManager):
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def setup_time(self):
return self._manager_config.get('MACHINE_SETUP_TIME', 300)
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
return setup_time
def shutdown(self):
logger.debug('Shutting down worker.')
@ -337,62 +316,35 @@ class EphemeralBuilderManager(BaseManager):
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = {
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
}
lock_payload = json.dumps(payload)
try:
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
ttl=self._ephemeral_api_timeout))
ttl=EC2_API_TIMEOUT))
except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
raise Return(False, EC2_API_TIMEOUT)
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
started_with_executor = None
logger.debug("executors are: %s", self._executors)
executor_type = self._executor.__class__.__name__
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
for executor in self._executors:
executor_type = executor.__class__.__name__
# Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor_type)
continue
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
logger.debug('Job %s cannot use executor %s due to not meeting retry threshold', build_uuid,
executor_type)
continue
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
try:
builder_id = yield From(executor.start_builder(realm, token, build_uuid))
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
started_with_executor = executor
break
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
if started_with_executor is None:
logger.error('Could not start ephemeral worker for build %s', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
logger.debug('Started builder for job: %s with executor: %s', build_uuid, executor_type)
try:
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False, EC2_API_TIMEOUT)
# Store the builder in etcd associated with the job id
try:
@ -401,14 +353,14 @@ class EphemeralBuilderManager(BaseManager):
ttl=setup_time))
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, self._ephemeral_api_timeout)
raise Return(False, EC2_API_TIMEOUT)
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
})
try:
@ -422,7 +374,6 @@ class EphemeralBuilderManager(BaseManager):
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time)
self._job_to_executor[builder_id] = started_with_executor
raise Return(True, None)
@coroutine
@ -448,9 +399,7 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Calling job_completed with status: %s', job_status)
# Kill the ephmeral builder
builder_id = self._component_to_builder.pop(build_component)
yield From(self._job_to_executor[builder_id].stop_builder(builder_id))
del self._job_to_executor[builder_id]
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
@ -482,11 +431,11 @@ class EphemeralBuilderManager(BaseManager):
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = {
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
}
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))