740 lines
30 KiB
Python
740 lines
30 KiB
Python
import logging
|
|
import etcd
|
|
import uuid
|
|
import calendar
|
|
import os.path
|
|
import json
|
|
import time
|
|
|
|
from collections import namedtuple
|
|
from datetime import datetime, timedelta
|
|
from trollius import From, coroutine, Return, async
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib3.exceptions import ReadTimeoutError, ProtocolError
|
|
|
|
from app import metric_queue
|
|
from buildman.manager.basemanager import BaseManager
|
|
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.jobutil.buildjob import BuildJob
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from buildman.server import BuildJobResult
|
|
from util.morecollections import AttrDict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ETCD_MAX_WATCH_TIMEOUT = 30
|
|
ETCD_ATOMIC_OP_TIMEOUT = 10000
|
|
RETRY_IMMEDIATELY_TIMEOUT = 0
|
|
NO_WORKER_AVAILABLE_TIMEOUT = 10
|
|
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
|
|
DEFAULT_EPHEMERAL_SETUP_TIMEOUT = 500
|
|
|
|
|
|
class EtcdAction(object):
|
|
""" Enumeration of the various kinds of etcd actions we can observe via a watch. """
|
|
GET = 'get'
|
|
SET = 'set'
|
|
EXPIRE = 'expire'
|
|
UPDATE = 'update'
|
|
DELETE = 'delete'
|
|
CREATE = 'create'
|
|
COMPARE_AND_SWAP = 'compareAndSwap'
|
|
COMPARE_AND_DELETE = 'compareAndDelete'
|
|
|
|
BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name'])
|
|
|
|
def _create_async_etcd_client(worker_threads=1, **kwargs):
|
|
client = etcd.Client(**kwargs)
|
|
async_executor = ThreadPoolExecutor(worker_threads)
|
|
return AsyncWrapper(client, executor=async_executor), async_executor
|
|
|
|
|
|
class EphemeralBuilderManager(BaseManager):
|
|
""" Build manager implementation for the Enterprise Registry. """
|
|
|
|
EXECUTORS = {
|
|
'popen': PopenExecutor,
|
|
'ec2': EC2Executor,
|
|
'kubernetes': KubernetesExecutor,
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client)
|
|
|
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
|
|
|
self._shutting_down = False
|
|
|
|
self._manager_config = None
|
|
self._async_thread_executor = None
|
|
self._etcd_client = None
|
|
|
|
self._etcd_realm_prefix = None
|
|
self._etcd_job_prefix = None
|
|
self._etcd_lock_prefix = None
|
|
self._etcd_metric_prefix = None
|
|
self._etcd_cancel_build_prefix = None
|
|
|
|
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
|
|
self._ephemeral_setup_timeout = DEFAULT_EPHEMERAL_SETUP_TIMEOUT
|
|
|
|
# The registered executors available for running jobs, in order.
|
|
self._ordered_executors = []
|
|
|
|
# The registered executors, mapped by their unique name.
|
|
self._executor_name_to_executor = {}
|
|
|
|
# Map of etcd keys being watched to the tasks watching them
|
|
self._watch_tasks = {}
|
|
|
|
# Map from builder component to its associated job.
|
|
self._component_to_job = {}
|
|
|
|
# Map from build UUID to a BuildInfo tuple with information about the build.
|
|
self._build_uuid_to_info = {}
|
|
|
|
def _watch_etcd(self, etcd_key, change_coroutine_callback, start_index=None, recursive=True,
|
|
restarter=None):
|
|
watch_task_key = (etcd_key, recursive)
|
|
def callback_wrapper(changed_key_future):
|
|
new_index = start_index
|
|
etcd_result = None
|
|
|
|
if not changed_key_future.cancelled():
|
|
try:
|
|
etcd_result = changed_key_future.result()
|
|
existing_index = getattr(etcd_result, 'etcd_index', None)
|
|
new_index = etcd_result.modifiedIndex + 1
|
|
|
|
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
|
|
'*' if recursive else '', existing_index, etcd_result)
|
|
|
|
except ReadTimeoutError:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
|
|
except etcd.EtcdEventIndexCleared:
|
|
# This happens if etcd2 has moved forward too fast for us to start watching
|
|
# at the index we retrieved. We therefore start a new watch at HEAD and
|
|
# (if specified) call the restarter method which should conduct a read and
|
|
# reset the state of the manager.
|
|
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
|
|
new_index = None
|
|
if restarter is not None:
|
|
async(restarter())
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Etcd key already cleared: %s', etcd_key)
|
|
return
|
|
|
|
except etcd.EtcdException as eex:
|
|
# TODO(jschorr): This is a quick and dirty hack and should be replaced
|
|
# with a proper exception check.
|
|
if str(eex.message).find('Read timed out') >= 0:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
else:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
except ProtocolError:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
|
self._watch_etcd(etcd_key, change_coroutine_callback, start_index=new_index,
|
|
restarter=restarter)
|
|
|
|
if etcd_result:
|
|
async(change_coroutine_callback(etcd_result))
|
|
|
|
if not self._shutting_down:
|
|
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
|
|
'*' if recursive else '', start_index)
|
|
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT)
|
|
watch_future.add_done_callback(callback_wrapper)
|
|
|
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
|
|
|
@coroutine
|
|
def _handle_job_change(self, etcd_result):
|
|
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
|
if etcd_result is None:
|
|
raise Return()
|
|
|
|
if etcd_result.action in (EtcdAction.CREATE, EtcdAction.SET):
|
|
raise Return()
|
|
|
|
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
|
# Handle the expiration/deletion.
|
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
|
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
|
logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid)
|
|
|
|
# Get the build info.
|
|
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
|
if build_info is None:
|
|
logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager',
|
|
etcd_result.action, build_job.build_uuid, job_metadata)
|
|
raise Return()
|
|
|
|
# If the etcd action was not an expiration, then it was already deleted by some manager and
|
|
# the execution was therefore already shutdown.
|
|
if etcd_result.action != EtcdAction.EXPIRE:
|
|
# Build information will no longer be needed; pop it off.
|
|
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
|
raise Return()
|
|
|
|
executor_name = build_info.executor_name
|
|
execution_id = build_info.execution_id
|
|
|
|
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
|
# the job as incomplete here.
|
|
if not job_metadata.get('had_heartbeat', False):
|
|
logger.warning('Build executor failed to successfully boot with execution id %s',
|
|
execution_id)
|
|
|
|
# Take a lock to ensure that only one manager reports the build as incomplete for this
|
|
# execution.
|
|
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid,
|
|
execution_id))
|
|
if got_lock:
|
|
logger.error('[BUILD INTERNAL ERROR: etcd %s] Build ID: %s. Exec name: %s. Exec ID: %s',
|
|
etcd_result.action, build_job.build_uuid, executor_name, execution_id)
|
|
yield From(self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
|
|
update_phase=True))
|
|
|
|
# Finally, we terminate the build execution for the job. We don't do this under a lock as
|
|
# terminating a node is an atomic operation; better to make sure it is terminated than not.
|
|
logger.info('Terminating expired build executor for job %s with execution id %s',
|
|
build_job.build_uuid, execution_id)
|
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
|
|
|
else:
|
|
logger.warning('Unexpected action (%s) on job key: %s', etcd_result.action, etcd_result.key)
|
|
|
|
|
|
@coroutine
|
|
def _handle_realm_change(self, etcd_result):
|
|
if etcd_result is None:
|
|
raise Return()
|
|
|
|
if etcd_result.action == EtcdAction.CREATE:
|
|
# We must listen on the realm created by ourselves or another worker
|
|
realm_spec = json.loads(etcd_result.value)
|
|
self._register_realm(realm_spec)
|
|
|
|
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
|
# We must stop listening for new connections on the specified realm, if we did not get the
|
|
# connection
|
|
realm_spec = json.loads(etcd_result._prev_node.value)
|
|
realm_id = realm_spec['realm']
|
|
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
build_uuid = build_job.build_uuid
|
|
|
|
logger.debug('Realm key %s for build %s was %s', realm_id, build_uuid, etcd_result.action)
|
|
build_info = self._build_uuid_to_info.get(build_uuid, None)
|
|
if build_info is not None:
|
|
# Pop the component off. If we find one, then the build has not connected to this manager,
|
|
# so we can safely unregister its component.
|
|
component = self._component_to_job.pop(build_info.component, None)
|
|
if component is not None:
|
|
# We were not the manager which the worker connected to, remove the bookkeeping for it
|
|
logger.debug('Unregistering unused component for build %s', build_uuid)
|
|
self.unregister_component(build_info.component)
|
|
|
|
# If the realm has expired, then perform cleanup of the executor.
|
|
if etcd_result.action == EtcdAction.EXPIRE:
|
|
execution_id = realm_spec.get('execution_id', None)
|
|
executor_name = realm_spec.get('executor_name', 'EC2Executor')
|
|
|
|
logger.info('Realm %s expired for job %s, terminating executor %s with execution id %s',
|
|
realm_id, build_uuid, executor_name, execution_id)
|
|
yield From(self.terminate_executor(executor_name, execution_id))
|
|
|
|
else:
|
|
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
|
|
|
|
def _register_realm(self, realm_spec):
|
|
logger.debug('Got call to register realm %s with manager', realm_spec['realm'])
|
|
|
|
# Create the build information block for the registered realm.
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
execution_id = realm_spec.get('execution_id', None)
|
|
executor_name = realm_spec.get('executor_name', 'EC2Executor')
|
|
|
|
logger.debug('Registering realm %s with manager: %s', realm_spec['realm'], realm_spec)
|
|
component = self.register_component(realm_spec['realm'], BuildComponent,
|
|
token=realm_spec['token'])
|
|
|
|
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
|
|
executor_name=executor_name)
|
|
|
|
self._component_to_job[component] = build_job
|
|
self._build_uuid_to_info[build_job.build_uuid] = build_info
|
|
|
|
logger.debug('Registered realm %s with manager', realm_spec['realm'])
|
|
return component
|
|
|
|
@property
|
|
def registered_executors(self):
|
|
return self._ordered_executors
|
|
|
|
@coroutine
|
|
def _register_existing_realms(self):
|
|
try:
|
|
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
|
|
|
|
# Register all existing realms found.
|
|
encountered = set()
|
|
for realm in all_realms.children:
|
|
if not realm.dir:
|
|
component = self._register_realm(json.loads(realm.value))
|
|
encountered.add(component)
|
|
|
|
# Remove any components not encountered so we can clean up.
|
|
for component, job in list(self._component_to_job.items()):
|
|
if not component in encountered:
|
|
self._component_to_job.pop(component, None)
|
|
self._build_uuid_to_info.pop(job.build_uuid, None)
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# no realms have been registered yet
|
|
pass
|
|
|
|
def _load_executor(self, executor_kind_name, executor_config):
|
|
executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
|
|
if executor_klass is None:
|
|
logger.error('Unknown executor %s; skipping install', executor_kind_name)
|
|
return
|
|
|
|
executor = executor_klass(executor_config, self.manager_hostname)
|
|
if executor.name in self._executor_name_to_executor:
|
|
raise Exception('Executor with name %s already registered' % executor.name)
|
|
|
|
self._ordered_executors.append(executor)
|
|
self._executor_name_to_executor[executor.name] = executor
|
|
|
|
def initialize(self, manager_config):
|
|
logger.debug('Calling initialize')
|
|
self._manager_config = manager_config
|
|
|
|
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
|
|
# or as a new set of executor configurations, with the order determining how we fallback. We
|
|
# check for both here to ensure backwards compatibility.
|
|
if manager_config.get('EXECUTORS'):
|
|
for executor_config in manager_config['EXECUTORS']:
|
|
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
|
|
else:
|
|
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
|
|
|
|
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
|
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
|
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
|
|
|
|
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
|
|
if etcd_auth is not None:
|
|
etcd_auth = tuple(etcd_auth) # Convert YAML list to a tuple
|
|
|
|
etcd_protocol = 'http' if etcd_auth is None else 'https'
|
|
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
|
|
|
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
|
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(
|
|
worker_threads,
|
|
host=etcd_host,
|
|
port=etcd_port,
|
|
cert=etcd_auth,
|
|
ca_cert=etcd_ca_cert,
|
|
protocol=etcd_protocol,
|
|
read_timeout=5,
|
|
)
|
|
|
|
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
|
self._watch_etcd(self._etcd_job_prefix, self._handle_job_change)
|
|
|
|
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
|
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
|
restarter=self._register_existing_realms)
|
|
|
|
self._etcd_cancel_build_prefix = self._manager_config.get('ETCD_CANCEL_PREFIX', 'cancel/')
|
|
self._watch_etcd(self._etcd_cancel_build_prefix, self._cancel_build)
|
|
|
|
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'lock/')
|
|
self._etcd_metric_prefix = self._manager_config.get('ETCD_METRIC_PREFIX', 'metric/')
|
|
|
|
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
|
|
DEFAULT_EPHEMERAL_API_TIMEOUT)
|
|
|
|
self._ephemeral_setup_timeout = self._manager_config.get('SETUP_TIMEOUT',
|
|
DEFAULT_EPHEMERAL_SETUP_TIMEOUT)
|
|
|
|
# Load components for all realms currently known to the cluster
|
|
async(self._register_existing_realms())
|
|
|
|
def overall_setup_time(self):
|
|
return self._ephemeral_setup_timeout
|
|
|
|
def shutdown(self):
|
|
logger.debug('Shutting down worker.')
|
|
self._shutting_down = True
|
|
|
|
for (etcd_key, _), task in self._watch_tasks.items():
|
|
if not task.done():
|
|
logger.debug('Canceling watch task for %s', etcd_key)
|
|
task.cancel()
|
|
|
|
if self._async_thread_executor is not None:
|
|
logger.debug('Shutting down thread pool executor.')
|
|
self._async_thread_executor.shutdown()
|
|
|
|
@coroutine
|
|
def schedule(self, build_job):
|
|
build_uuid = build_job.job_details['build_uuid']
|
|
logger.debug('Calling schedule with job: %s', build_uuid)
|
|
|
|
# Check if there are worker slots available by checking the number of jobs in etcd
|
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
|
try:
|
|
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
|
|
workers_alive = sum(1 for child in active_jobs.children if not child.dir)
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
workers_alive = 0
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
logger.debug('Total jobs (scheduling job %s): %s', build_uuid, workers_alive)
|
|
|
|
if workers_alive >= allowed_worker_count:
|
|
logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
|
|
build_uuid, workers_alive, allowed_worker_count)
|
|
raise Return(False, NO_WORKER_AVAILABLE_TIMEOUT)
|
|
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
|
realm = str(uuid.uuid4())
|
|
token = str(uuid.uuid4())
|
|
nonce = str(uuid.uuid4())
|
|
|
|
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
|
|
|
|
payload = {
|
|
'max_expiration': calendar.timegm(max_expiration.timetuple()),
|
|
'nonce': nonce,
|
|
'had_heartbeat': False,
|
|
'job_queue_item': build_job.job_item,
|
|
}
|
|
|
|
lock_payload = json.dumps(payload)
|
|
logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid,
|
|
self._ephemeral_setup_timeout)
|
|
try:
|
|
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
|
|
ttl=self._ephemeral_setup_timeout))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# The job was already taken by someone else, we are probably a retry
|
|
logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
|
raise Return(False, self._ephemeral_api_timeout)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
# Got a lock, now lets boot the job via one of the registered executors.
|
|
started_with_executor = None
|
|
execution_id = None
|
|
|
|
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
|
|
for executor in self._ordered_executors:
|
|
# Check if we can use this executor based on its whitelist, by namespace.
|
|
namespace = build_job.namespace
|
|
if not executor.allowed_for_namespace(namespace):
|
|
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
|
|
executor.name)
|
|
continue
|
|
|
|
# Check if we can use this executor based on the retries remaining.
|
|
if executor.minimum_retry_threshold > build_job.retries_remaining:
|
|
metric_queue.builder_fallback.Inc()
|
|
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
|
|
build_uuid, executor.name, executor.minimum_retry_threshold,
|
|
build_job.retries_remaining)
|
|
continue
|
|
|
|
logger.debug('Starting builder for job %s with selected executor: %s', build_uuid,
|
|
executor.name)
|
|
|
|
try:
|
|
execution_id = yield From(executor.start_builder(realm, token, build_uuid))
|
|
except:
|
|
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
|
continue
|
|
|
|
try:
|
|
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
|
|
metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid])
|
|
except:
|
|
logger.exception('Exception when writing start metrics for execution %s for job %s',
|
|
execution_id, build_uuid)
|
|
|
|
started_with_executor = executor
|
|
|
|
# Break out of the loop now that we've started a builder successfully.
|
|
break
|
|
|
|
# If we didn't start the job, cleanup and return it to the queue.
|
|
if started_with_executor is None:
|
|
logger.error('Could not start ephemeral worker for build %s', build_uuid)
|
|
|
|
# Delete the associated build job record.
|
|
yield From(self.delete_etcd_key(job_key))
|
|
raise Return(False, self._ephemeral_api_timeout)
|
|
|
|
# Job was started!
|
|
logger.debug('Started execution with ID %s for job: %s with executor: %s',
|
|
execution_id, build_uuid, started_with_executor.name)
|
|
|
|
# Store metric data
|
|
metric_spec = json.dumps({
|
|
'executor_name': started_with_executor.name,
|
|
'start_time': time.time(),
|
|
})
|
|
try:
|
|
yield From(self._etcd_client.write(self._etcd_metric_key(realm), metric_spec, prevExist=False,
|
|
ttl=machine_max_expiration + 10))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.error('Realm %s already exists in etcd for job %s ' +
|
|
'UUID collision or something is very very wrong.', realm, build_uuid)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
|
|
|
# Store the realm spec which will allow any manager to accept this builder when it connects
|
|
realm_spec = json.dumps({
|
|
'realm': realm,
|
|
'token': token,
|
|
'execution_id': execution_id,
|
|
'executor_name': started_with_executor.name,
|
|
'job_queue_item': build_job.job_item,
|
|
})
|
|
|
|
try:
|
|
setup_time = started_with_executor.setup_time or self.overall_setup_time()
|
|
logger.debug('Writing job key for job %s using executor %s with ID %s and ttl %s', build_uuid,
|
|
started_with_executor.name, execution_id, setup_time)
|
|
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
|
ttl=setup_time))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.error('Realm %s already exists in etcd for job %s ' +
|
|
'UUID collision or something is very very wrong.', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
|
|
logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid,
|
|
started_with_executor.name, execution_id)
|
|
raise Return(True, None)
|
|
|
|
@coroutine
|
|
def build_component_ready(self, build_component):
|
|
logger.debug('Got component ready for component with realm %s', build_component.builder_realm)
|
|
|
|
# Pop off the job for the component. We do so before we send out the etcd watch below,
|
|
# as it will also remove this mapping.
|
|
job = self._component_to_job.pop(build_component, None)
|
|
if job is None:
|
|
# This will occur once the build finishes, so no need to worry about it. We log in case it
|
|
# happens outside of the expected flow.
|
|
logger.debug('Could not find job for the build component on realm %s; component is ready',
|
|
build_component.builder_realm)
|
|
raise Return()
|
|
|
|
# Start the build job.
|
|
logger.debug('Sending build %s to newly ready component on realm %s',
|
|
job.build_uuid, build_component.builder_realm)
|
|
yield From(build_component.start_build(job))
|
|
|
|
yield From(self._write_duration_metric(metric_queue.builder_time_to_build,
|
|
build_component.builder_realm))
|
|
|
|
try:
|
|
# Clean up the bookkeeping for allowing any manager to take the job.
|
|
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.warning('Could not delete realm key %s', build_component.builder_realm)
|
|
|
|
def build_component_disposed(self, build_component, timed_out):
|
|
logger.debug('Calling build_component_disposed.')
|
|
self.unregister_component(build_component)
|
|
|
|
@coroutine
|
|
def job_completed(self, build_job, job_status, build_component):
|
|
logger.debug('Calling job_completed for job %s with status: %s',
|
|
build_job.build_uuid, job_status)
|
|
|
|
yield From(self._write_duration_metric(metric_queue.build_time, build_component.builder_realm))
|
|
|
|
# Mark the job as completed. Since this is being invoked from the component, we don't need
|
|
# to ask for the phase to be updated as well.
|
|
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
|
executor_name = build_info.executor_name if build_info else None
|
|
yield From(self.job_complete_callback(build_job, job_status, executor_name, update_phase=False))
|
|
|
|
# Kill the ephemeral builder.
|
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
|
|
|
# Delete the build job from etcd.
|
|
job_key = self._etcd_job_key(build_job)
|
|
try:
|
|
yield From(self._etcd_client.delete(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Builder is asking for job to be removed, but work already completed')
|
|
|
|
# Delete the metric from etcd
|
|
metric_key = self._etcd_metric_key(build_component.builder_realm)
|
|
try:
|
|
yield From(self._etcd_client.delete(metric_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Builder is asking for metric to be removed, but key not found')
|
|
|
|
logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status)
|
|
|
|
@coroutine
|
|
def kill_builder_executor(self, build_uuid):
|
|
logger.info('Starting termination of executor for job %s', build_uuid)
|
|
build_info = self._build_uuid_to_info.pop(build_uuid, None)
|
|
if build_info is None:
|
|
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
|
|
raise Return()
|
|
|
|
# Remove the build's component.
|
|
self._component_to_job.pop(build_info.component, None)
|
|
|
|
# Stop the build node/executor itself.
|
|
yield From(self.terminate_executor(build_info.executor_name, build_info.execution_id))
|
|
|
|
@coroutine
|
|
def terminate_executor(self, executor_name, execution_id):
|
|
executor = self._executor_name_to_executor.get(executor_name)
|
|
if executor is None:
|
|
logger.error('Could not find registered executor %s', executor_name)
|
|
raise Return()
|
|
|
|
# Terminate the executor's execution.
|
|
logger.info('Terminating executor %s with execution id %s', executor_name, execution_id)
|
|
yield From(executor.stop_builder(execution_id))
|
|
|
|
@coroutine
|
|
def job_heartbeat(self, build_job):
|
|
# Extend the queue item.
|
|
self.job_heartbeat_callback(build_job)
|
|
|
|
# Extend the deadline in etcd.
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
try:
|
|
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
|
|
raise Return()
|
|
|
|
build_job_metadata = json.loads(build_job_metadata_response.value)
|
|
|
|
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
|
|
|
|
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
|
|
payload = {
|
|
'job_queue_item': build_job.job_item,
|
|
'max_expiration': build_job_metadata['max_expiration'],
|
|
'had_heartbeat': True,
|
|
}
|
|
|
|
# Note: A TTL of < 0 in etcd results in the key *never being expired*. We use a max here
|
|
# to ensure that if the TTL is < 0, the key will expire immediately.
|
|
etcd_ttl = max(ttl, 0)
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
|
|
|
|
|
|
@coroutine
|
|
def _take_etcd_atomic_lock(self, path, *args):
|
|
""" Takes a lock for atomic operations via etcd over the given path. Returns true if the lock
|
|
was granted and false otherwise.
|
|
"""
|
|
pieces = [self._etcd_lock_prefix, path]
|
|
pieces.extend(args)
|
|
|
|
lock_key = os.path.join(*pieces)
|
|
try:
|
|
yield From(self._etcd_client.write(lock_key, {}, prevExist=False, ttl=ETCD_ATOMIC_OP_TIMEOUT))
|
|
raise Return(True)
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
raise Return(False)
|
|
|
|
@coroutine
|
|
def _write_duration_metric(self, metric, realm):
|
|
""" Returns true if the metric was written and and false otherwise.
|
|
"""
|
|
try:
|
|
metric_data = yield From(self._etcd_client.read(self._etcd_metric_key(realm)))
|
|
parsed_metric_data = json.loads(metric_data.value)
|
|
start_time = parsed_metric_data['start_time']
|
|
metric.Observe(time.time() - start_time,
|
|
labelvalues=[parsed_metric_data.get('executor_name',
|
|
'unknown')])
|
|
except Exception:
|
|
logger.exception("Could not write metric for realm %s", realm)
|
|
|
|
def _etcd_metric_key(self, realm):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_metric_prefix, realm)
|
|
|
|
def _etcd_job_key(self, build_job):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid'])
|
|
|
|
def _etcd_realm_key(self, realm):
|
|
""" Create a key which is used to track an incoming connection on a realm.
|
|
"""
|
|
return os.path.join(self._etcd_realm_prefix, realm)
|
|
|
|
def num_workers(self):
|
|
""" Return the number of workers we're managing locally.
|
|
"""
|
|
return len(self._component_to_job)
|
|
|
|
@coroutine
|
|
def _cancel_build(self, etcd_result):
|
|
""" Listens for etcd event and then cancels the build
|
|
"""
|
|
if etcd_result is None:
|
|
raise Return(False)
|
|
|
|
if etcd_result.action not in (EtcdAction.CREATE, EtcdAction.SET):
|
|
raise Return(False)
|
|
|
|
build_uuid = etcd_result.value
|
|
build_info = self._build_uuid_to_info.get(build_uuid, None)
|
|
|
|
if build_info is None:
|
|
logger.debug('No build info for "%s" job %s', etcd_result.action, build_uuid)
|
|
raise Return(False)
|
|
got_lock = yield From(self._take_etcd_atomic_lock('job-cancelled', build_uuid, build_info.execution_id))
|
|
if got_lock:
|
|
yield From(self.delete_etcd_key(self._etcd_realm_key(build_info.component.builder_realm)))
|
|
yield From(self.delete_etcd_key(self._etcd_metric_key(build_info.component.builder_realm)))
|
|
yield From(self.delete_etcd_key(os.path.join(self._etcd_job_prefix, build_uuid)))
|
|
yield From(self.kill_builder_executor(build_uuid))
|
|
|
|
@coroutine
|
|
def delete_etcd_key(self, etcd_key):
|
|
try:
|
|
yield From(self._etcd_client.delete(etcd_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.warning('Could not delete etcd key %s', etcd_key)
|