This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/manager/ephemeral.py

735 lines
30 KiB
Python
Raw Normal View History

import logging
import etcd
import uuid
import calendar
import os.path
import json
import time
from collections import namedtuple
from datetime import datetime, timedelta
from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor
from urllib3.exceptions import ReadTimeoutError, ProtocolError
from app import metric_queue
from buildman.manager.basemanager import BaseManager
2015-11-20 20:32:32 +00:00
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper
from buildman.server import BuildJobResult
from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
2015-06-26 03:08:49 +00:00
ETCD_MAX_WATCH_TIMEOUT = 30
ETCD_ATOMIC_OP_TIMEOUT = 10000
RETRY_IMMEDIATELY_TIMEOUT = 0
2016-07-14 15:49:01 +00:00
NO_WORKER_AVAILABLE_TIMEOUT = 10
2015-11-20 20:32:32 +00:00
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
DEFAULT_EPHEMERAL_SETUP_TIMEOUT = 500
2016-08-17 20:01:28 +00:00
class EtcdAction(object):
2016-08-17 20:01:28 +00:00
""" Enumeration of the various kinds of etcd actions we can observe via a watch. """
GET = 'get'
SET = 'set'
EXPIRE = 'expire'
UPDATE = 'update'
DELETE = 'delete'
CREATE = 'create'
COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete'
BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name'])
def _create_async_etcd_client(worker_threads=1, **kwargs):
client = etcd.Client(**kwargs)
async_executor = ThreadPoolExecutor(worker_threads)
return AsyncWrapper(client, executor=async_executor), async_executor
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
EXECUTORS = {
'popen': PopenExecutor,
'ec2': EC2Executor,
'kubernetes': KubernetesExecutor,
}
def __init__(self, *args, **kwargs):
self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client)
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
self._shutting_down = False
self._manager_config = None
self._async_thread_executor = None
self._etcd_client = None
self._etcd_realm_prefix = None
self._etcd_job_prefix = None
2016-08-17 20:01:28 +00:00
self._etcd_lock_prefix = None
self._etcd_metric_prefix = None
self._etcd_cancel_build_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._ephemeral_setup_timeout = DEFAULT_EPHEMERAL_SETUP_TIMEOUT
# The registered executors available for running jobs, in order.
self._ordered_executors = []
# The registered executors, mapped by their unique name.
self._executor_name_to_executor = {}
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
# Map from builder component to its associated job.
self._component_to_job = {}
# Map from build UUID to a BuildInfo tuple with information about the build.
self._build_uuid_to_info = {}
def _watch_etcd(self, etcd_key, change_coroutine_callback, start_index=None, recursive=True,
restarter=None):
watch_task_key = (etcd_key, recursive)
def callback_wrapper(changed_key_future):
new_index = start_index
etcd_result = None
if not changed_key_future.cancelled():
try:
etcd_result = changed_key_future.result()
existing_index = getattr(etcd_result, 'etcd_index', None)
new_index = etcd_result.modifiedIndex + 1
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
'*' if recursive else '', existing_index, etcd_result)
except ReadTimeoutError:
2015-06-26 03:08:49 +00:00
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
except etcd.EtcdEventIndexCleared:
# This happens if etcd2 has moved forward too fast for us to start watching
# at the index we retrieved. We therefore start a new watch at HEAD and
# (if specified) call the restarter method which should conduct a read and
# reset the state of the manager.
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
new_index = None
if restarter is not None:
async(restarter())
except (KeyError, etcd.EtcdKeyError):
logger.debug('Etcd key already cleared: %s', etcd_key)
return
except etcd.EtcdException as eex:
# TODO(jschorr): This is a quick and dirty hack and should be replaced
# with a proper exception check.
if str(eex.message).find('Read timed out') >= 0:
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
else:
logger.exception('Exception on etcd watch: %s', etcd_key)
except ProtocolError:
logger.exception('Exception on etcd watch: %s', etcd_key)
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_coroutine_callback, start_index=new_index,
restarter=restarter)
if etcd_result:
async(change_coroutine_callback(etcd_result))
if not self._shutting_down:
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
'*' if recursive else '', start_index)
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
2015-06-26 03:08:49 +00:00
timeout=ETCD_MAX_WATCH_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine
def _handle_job_change(self, etcd_result):
""" Handler invoked whenever a job expires or is deleted in etcd. """
if etcd_result is None:
raise Return()
2016-08-17 20:01:28 +00:00
if etcd_result.action in (EtcdAction.CREATE, EtcdAction.SET):
raise Return()
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
2016-08-17 20:01:28 +00:00
# Handle the expiration/deletion.
job_metadata = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid)
2016-08-17 20:01:28 +00:00
# Get the build info.
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
if build_info is None:
logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager',
etcd_result.action, build_job.build_uuid, job_metadata)
raise Return()
# If the etcd action was not an expiration, then it was already deleted by some manager and
# the execution was therefore already shutdown.
if etcd_result.action != EtcdAction.EXPIRE:
# Build information will no longer be needed; pop it off.
self._build_uuid_to_info.pop(build_job.build_uuid, None)
raise Return()
2016-09-16 20:26:04 +00:00
executor_name = build_info.executor_name
execution_id = build_info.execution_id
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
# the job as incomplete here.
if not job_metadata.get('had_heartbeat', False):
logger.warning('Build executor failed to successfully boot with execution id %s',
execution_id)
# Take a lock to ensure that only one manager reports the build as incomplete for this
# execution.
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid,
execution_id))
2016-08-17 20:01:28 +00:00
if got_lock:
logger.error('[BUILD INTERNAL ERROR: etcd %s] Build ID: %s. Exec name: %s. Exec ID: %s',
etcd_result.action, build_job.build_uuid, executor_name, execution_id)
yield From(self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
update_phase=True))
2016-08-17 20:01:28 +00:00
# Finally, we terminate the build execution for the job. We don't do this under a lock as
# terminating a node is an atomic operation; better to make sure it is terminated than not.
logger.info('Terminating expired build executor for job %s with execution id %s',
build_job.build_uuid, execution_id)
yield From(self.kill_builder_executor(build_job.build_uuid))
else:
logger.warning('Unexpected action (%s) on job key: %s', etcd_result.action, etcd_result.key)
@coroutine
def _handle_realm_change(self, etcd_result):
if etcd_result is None:
raise Return()
if etcd_result.action == EtcdAction.CREATE:
# We must listen on the realm created by ourselves or another worker
realm_spec = json.loads(etcd_result.value)
self._register_realm(realm_spec)
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
# We must stop listening for new connections on the specified realm, if we did not get the
# connection
realm_spec = json.loads(etcd_result._prev_node.value)
realm_id = realm_spec['realm']
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
build_uuid = build_job.build_uuid
logger.debug('Realm key %s for build %s was %s', realm_id, build_uuid, etcd_result.action)
build_info = self._build_uuid_to_info.get(build_uuid, None)
if build_info is not None:
# Pop the component off. If we find one, then the build has not connected to this manager,
# so we can safely unregister its component.
component = self._component_to_job.pop(build_info.component, None)
if component is not None:
# We were not the manager which the worker connected to, remove the bookkeeping for it
logger.debug('Unregistering unused component for build %s', build_uuid)
self.unregister_component(build_info.component)
# If the realm has expired, then perform cleanup of the executor.
if etcd_result.action == EtcdAction.EXPIRE:
execution_id = realm_spec.get('execution_id', None)
executor_name = realm_spec.get('executor_name', 'EC2Executor')
logger.info('Realm %s expired for job %s, terminating executor %s with execution id %s',
realm_id, build_uuid, executor_name, execution_id)
yield From(self.terminate_executor(executor_name, execution_id))
else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
def _register_realm(self, realm_spec):
logger.debug('Got call to register realm %s with manager', realm_spec['realm'])
# Create the build information block for the registered realm.
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
execution_id = realm_spec.get('execution_id', None)
executor_name = realm_spec.get('executor_name', 'EC2Executor')
logger.debug('Registering realm %s with manager: %s', realm_spec['realm'], realm_spec)
component = self.register_component(realm_spec['realm'], BuildComponent,
token=realm_spec['token'])
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
executor_name=executor_name)
self._component_to_job[component] = build_job
self._build_uuid_to_info[build_job.build_uuid] = build_info
logger.debug('Registered realm %s with manager', realm_spec['realm'])
return component
@property
def registered_executors(self):
return self._ordered_executors
@coroutine
def _register_existing_realms(self):
try:
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
# Register all existing realms found.
encountered = set()
for realm in all_realms.children:
if not realm.dir:
component = self._register_realm(json.loads(realm.value))
encountered.add(component)
# Remove any components not encountered so we can clean up.
for component, job in list(self._component_to_job.items()):
if not component in encountered:
self._component_to_job.pop(component, None)
self._build_uuid_to_info.pop(job.build_uuid, None)
except (KeyError, etcd.EtcdKeyError):
# no realms have been registered yet
pass
def _load_executor(self, executor_kind_name, executor_config):
executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
if executor_klass is None:
logger.error('Unknown executor %s; skipping install', executor_kind_name)
return
executor = executor_klass(executor_config, self.manager_hostname)
if executor.name in self._executor_name_to_executor:
raise Exception('Executor with name %s already registered' % executor.name)
self._ordered_executors.append(executor)
self._executor_name_to_executor[executor.name] = executor
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
# or as a new set of executor configurations, with the order determining how we fallback. We
# check for both here to ensure backwards compatibility.
if manager_config.get('EXECUTORS'):
for executor_config in manager_config['EXECUTORS']:
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
else:
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
if etcd_auth is not None:
etcd_auth = tuple(etcd_auth) # Convert YAML list to a tuple
etcd_protocol = 'http' if etcd_auth is None else 'https'
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(
worker_threads,
host=etcd_host,
port=etcd_port,
cert=etcd_auth,
ca_cert=etcd_ca_cert,
protocol=etcd_protocol,
read_timeout=5,
)
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
self._watch_etcd(self._etcd_job_prefix, self._handle_job_change)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms)
2016-10-27 19:10:03 +00:00
self._etcd_cancel_build_prefix = self._manager_config.get('ETCD_CANCEL_PREFIX', 'cancel/')
self._watch_etcd(self._etcd_cancel_build_prefix, self._cancel_build)
2016-08-17 20:01:28 +00:00
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'lock/')
self._etcd_metric_prefix = self._manager_config.get('ETCD_METRIC_PREFIX', 'metric/')
2016-08-17 20:01:28 +00:00
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
self._ephemeral_setup_timeout = self._manager_config.get('SETUP_TIMEOUT',
DEFAULT_EPHEMERAL_SETUP_TIMEOUT)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def overall_setup_time(self):
return self._ephemeral_setup_timeout
def shutdown(self):
logger.debug('Shutting down worker.')
self._shutting_down = True
for (etcd_key, _), task in self._watch_tasks.items():
if not task.done():
logger.debug('Canceling watch task for %s', etcd_key)
task.cancel()
if self._async_thread_executor is not None:
logger.debug('Shutting down thread pool executor.')
self._async_thread_executor.shutdown()
@coroutine
def schedule(self, build_job):
build_uuid = build_job.job_details['build_uuid']
logger.debug('Calling schedule with job: %s', build_uuid)
# Check if there are worker slots available by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
try:
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
workers_alive = sum(1 for child in active_jobs.children if not child.dir)
except (KeyError, etcd.EtcdKeyError):
workers_alive = 0
except etcd.EtcdException:
logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
2016-07-14 15:49:01 +00:00
logger.debug('Total jobs (scheduling job %s): %s', build_uuid, workers_alive)
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
build_uuid, workers_alive, allowed_worker_count)
2016-07-14 15:49:01 +00:00
raise Return(False, NO_WORKER_AVAILABLE_TIMEOUT)
job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4())
token = str(uuid.uuid4())
nonce = str(uuid.uuid4())
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = {
'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
}
lock_payload = json.dumps(payload)
logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid,
self._ephemeral_setup_timeout)
try:
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
ttl=self._ephemeral_setup_timeout))
except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry
logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
2015-11-20 20:32:32 +00:00
raise Return(False, self._ephemeral_api_timeout)
except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
# Got a lock, now lets boot the job via one of the registered executors.
started_with_executor = None
execution_id = None
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
for executor in self._ordered_executors:
# Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace
if not executor.allowed_for_namespace(namespace):
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
executor.name)
continue
# Check if we can use this executor based on the retries remaining.
if executor.minimum_retry_threshold > build_job.retries_remaining:
metric_queue.builder_fallback.Inc()
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
build_uuid, executor.name, executor.minimum_retry_threshold,
build_job.retries_remaining)
continue
2016-07-14 15:49:01 +00:00
logger.debug('Starting builder for job %s with selected executor: %s', build_uuid,
2016-10-04 14:42:31 +00:00
executor.name)
2015-11-20 20:32:32 +00:00
try:
execution_id = yield From(executor.start_builder(realm, token, build_uuid))
2015-11-20 20:32:32 +00:00
except:
logger.exception('Exception when starting builder for job: %s', build_uuid)
continue
2016-07-14 15:49:01 +00:00
try:
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid])
2016-07-14 15:49:01 +00:00
except:
logger.exception('Exception when writing start metrics for execution %s for job %s',
execution_id, build_uuid)
2016-07-14 15:49:01 +00:00
started_with_executor = executor
# Break out of the loop now that we've started a builder successfully.
break
# If we didn't start the job, cleanup and return it to the queue.
if started_with_executor is None:
logger.error('Could not start ephemeral worker for build %s', build_uuid)
# Delete the associated build job record.
try:
yield From(self._etcd_client.delete(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.warning('Could not delete job key %s', job_key)
2015-11-20 20:32:32 +00:00
raise Return(False, self._ephemeral_api_timeout)
# Job was started!
logger.debug('Started execution with ID %s for job: %s with executor: %s',
execution_id, build_uuid, started_with_executor.name)
2016-10-04 14:42:31 +00:00
# Store metric data
metric_spec = json.dumps({
'executor_name': started_with_executor.name,
'start_time': time.time(),
})
try:
yield From(self._etcd_client.write(self._etcd_metric_key(realm), metric_spec, prevExist=False,
ttl=machine_max_expiration + 10))
except (KeyError, etcd.EtcdKeyError):
logger.error('Realm %s already exists in etcd for job %s ' +
'UUID collision or something is very very wrong.', realm, build_uuid)
except etcd.EtcdException:
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'execution_id': execution_id,
'executor_name': started_with_executor.name,
'job_queue_item': build_job.job_item,
})
try:
setup_time = started_with_executor.setup_time or self.overall_setup_time()
logger.debug('Writing job key for job %s using executor %s with ID %s and ttl %s', build_uuid,
started_with_executor.name, execution_id, setup_time)
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
ttl=setup_time))
except (KeyError, etcd.EtcdKeyError):
logger.error('Realm %s already exists in etcd for job %s ' +
'UUID collision or something is very very wrong.', realm, build_uuid)
raise Return(False, setup_time)
except etcd.EtcdException:
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False, setup_time)
2016-07-14 15:49:01 +00:00
logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid,
started_with_executor.name, execution_id)
raise Return(True, None)
@coroutine
def build_component_ready(self, build_component):
logger.debug('Got component ready for component with realm %s', build_component.builder_realm)
# Pop off the job for the component. We do so before we send out the etcd watch below,
# as it will also remove this mapping.
job = self._component_to_job.pop(build_component, None)
if job is None:
# This will occur once the build finishes, so no need to worry about it. We log in case it
# happens outside of the expected flow.
logger.debug('Could not find job for the build component on realm %s; component is ready',
build_component.builder_realm)
raise Return()
# Start the build job.
logger.debug('Sending build %s to newly ready component on realm %s',
job.build_uuid, build_component.builder_realm)
yield From(build_component.start_build(job))
2016-10-04 14:42:31 +00:00
yield From(self._write_duration_metric(metric_queue.builder_time_to_build,
build_component.builder_realm))
try:
# Clean up the bookkeeping for allowing any manager to take the job.
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
except (KeyError, etcd.EtcdKeyError):
logger.warning('Could not delete realm key %s', build_component.builder_realm)
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
self.unregister_component(build_component)
@coroutine
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed for job %s with status: %s',
build_job.build_uuid, job_status)
yield From(self._write_duration_metric(metric_queue.build_time, build_component.builder_realm))
2016-10-04 14:42:31 +00:00
# Mark the job as completed. Since this is being invoked from the component, we don't need
# to ask for the phase to be updated as well.
2016-09-16 20:26:04 +00:00
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
executor_name = build_info.executor_name if build_info else None
yield From(self.job_complete_callback(build_job, job_status, executor_name, update_phase=False))
# Kill the ephemeral builder.
yield From(self.kill_builder_executor(build_job.build_uuid))
# Delete the build job from etcd.
job_key = self._etcd_job_key(build_job)
try:
yield From(self._etcd_client.delete(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Builder is asking for job to be removed, but work already completed')
2016-10-04 14:42:31 +00:00
# Delete the metric from etcd
metric_key = self._etcd_metric_key(build_component.builder_realm)
try:
yield From(self._etcd_client.delete(metric_key))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Builder is asking for metric to be removed, but key not found')
logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status)
@coroutine
def kill_builder_executor(self, build_uuid):
logger.info('Starting termination of executor for job %s', build_uuid)
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is None:
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
raise Return()
# Remove the build's component.
self._component_to_job.pop(build_info.component, None)
# Stop the build node/executor itself.
yield From(self.terminate_executor(build_info.executor_name, build_info.execution_id))
@coroutine
def terminate_executor(self, executor_name, execution_id):
executor = self._executor_name_to_executor.get(executor_name)
if executor is None:
logger.error('Could not find registered executor %s', executor_name)
raise Return()
# Terminate the executor's execution.
logger.info('Terminating executor %s with execution id %s', executor_name, execution_id)
yield From(executor.stop_builder(execution_id))
@coroutine
def job_heartbeat(self, build_job):
# Extend the queue item.
self.job_heartbeat_callback(build_job)
# Extend the deadline in etcd.
job_key = self._etcd_job_key(build_job)
try:
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
raise Return()
build_job_metadata = json.loads(build_job_metadata_response.value)
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
max_expiration_remaining = max_expiration - datetime.utcnow()
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
payload = {
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
}
# Note: A TTL of < 0 in etcd results in the key *never being expired*. We use a max here
# to ensure that if the TTL is < 0, the key will expire immediately.
etcd_ttl = max(ttl, 0)
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
2016-08-17 20:01:28 +00:00
@coroutine
def _take_etcd_atomic_lock(self, path, *args):
""" Takes a lock for atomic operations via etcd over the given path. Returns true if the lock
was granted and false otherwise.
"""
pieces = [self._etcd_lock_prefix, path]
pieces.extend(args)
2016-08-17 20:01:28 +00:00
lock_key = os.path.join(*pieces)
try:
yield From(self._etcd_client.write(lock_key, {}, prevExist=False, ttl=ETCD_ATOMIC_OP_TIMEOUT))
raise Return(True)
except (KeyError, etcd.EtcdKeyError):
raise Return(False)
2016-10-04 14:42:31 +00:00
@coroutine
def _write_duration_metric(self, metric, realm):
""" Returns true if the metric was written and and false otherwise.
"""
try:
metric_data = yield From(self._etcd_client.read(self._etcd_metric_key(realm)))
parsed_metric_data = json.loads(metric_data.value)
start_time = parsed_metric_data['start_time']
2016-10-04 14:42:31 +00:00
metric.Observe(time.time() - start_time,
labelvalues=[parsed_metric_data.get('executor_name',
'unknown')])
except Exception:
2016-10-04 14:42:31 +00:00
logger.exception("Could not write metric for realm %s", realm)
2016-08-17 20:01:28 +00:00
def _etcd_metric_key(self, realm):
""" Create a key which is used to track a job in etcd.
"""
return os.path.join(self._etcd_metric_prefix, realm)
2016-08-17 20:01:28 +00:00
def _etcd_job_key(self, build_job):
""" Create a key which is used to track a job in etcd.
"""
return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid'])
def _etcd_realm_key(self, realm):
""" Create a key which is used to track an incoming connection on a realm.
"""
return os.path.join(self._etcd_realm_prefix, realm)
def num_workers(self):
""" Return the number of workers we're managing locally.
"""
return len(self._component_to_job)
@coroutine
def _cancel_build(self, etcd_result):
""" Listens for etcd event and then cancels the build
"""
if etcd_result is None:
raise Return(False)
if etcd_result.action not in (EtcdAction.CREATE, EtcdAction.SET):
raise Return(False)
build_uuid = etcd_result.value
build_info = self._build_uuid_to_info.get(build_uuid, None)
if build_info is None:
logger.debug('No build info for "%s" job %s', etcd_result.action, build_uuid)
raise Return(False)
got_lock = yield From(self._take_etcd_atomic_lock('job-cancelled', build_uuid, build_info.execution_id))
if got_lock:
yield From(self.kill_builder_executor(build_uuid))