c29f9ccc7f
Until now, once the heartbeat has expired, we would issue a TTL that is negative, which causes etcd to either raise an exception or simply ignore the expiration (depending on the version of etcd). This change ensures that once the key is expired, it is removed immediately via a set of a TTL of 0. Also adds tests for this case and the normal expiration case.
596 lines
24 KiB
Python
596 lines
24 KiB
Python
import logging
|
|
import etcd
|
|
import uuid
|
|
import calendar
|
|
import os.path
|
|
import json
|
|
|
|
from collections import namedtuple
|
|
from datetime import datetime, timedelta
|
|
from trollius import From, coroutine, Return, async
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib3.exceptions import ReadTimeoutError, ProtocolError
|
|
|
|
from app import metric_queue
|
|
from buildman.manager.basemanager import BaseManager
|
|
from buildman.manager.executor import PopenExecutor, EC2Executor, KubernetesExecutor
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.jobutil.buildjob import BuildJob
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from buildman.server import BuildJobResult
|
|
from util.morecollections import AttrDict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ETCD_MAX_WATCH_TIMEOUT = 30
|
|
RETRY_IMMEDIATELY_TIMEOUT = 0
|
|
NO_WORKER_AVAILABLE_TIMEOUT = 10
|
|
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
|
|
DEFAULT_EPHEMERAL_SETUP_TIMEOUT = 300
|
|
|
|
class EtcdAction(object):
|
|
GET = 'get'
|
|
SET = 'set'
|
|
EXPIRE = 'expire'
|
|
UPDATE = 'update'
|
|
DELETE = 'delete'
|
|
CREATE = 'create'
|
|
COMPARE_AND_SWAP = 'compareAndSwap'
|
|
COMPARE_AND_DELETE = 'compareAndDelete'
|
|
|
|
BuildInfo = namedtuple('BuildInfo', ['component', 'build_job', 'execution_id', 'executor_name'])
|
|
|
|
def _create_async_etcd_client(worker_threads=1, **kwargs):
|
|
client = etcd.Client(**kwargs)
|
|
async_executor = ThreadPoolExecutor(worker_threads)
|
|
return AsyncWrapper(client, executor=async_executor), async_executor
|
|
|
|
|
|
class EphemeralBuilderManager(BaseManager):
|
|
""" Build manager implementation for the Enterprise Registry. """
|
|
|
|
EXECUTORS = {
|
|
'popen': PopenExecutor,
|
|
'ec2': EC2Executor,
|
|
'kubernetes': KubernetesExecutor,
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._etcd_client_creator = kwargs.pop('etcd_creator', _create_async_etcd_client)
|
|
|
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
|
|
|
self._shutting_down = False
|
|
|
|
self._manager_config = None
|
|
self._async_thread_executor = None
|
|
self._etcd_client = None
|
|
|
|
self._etcd_realm_prefix = None
|
|
self._etcd_job_prefix = None
|
|
|
|
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
|
|
self._ephemeral_setup_timeout = DEFAULT_EPHEMERAL_SETUP_TIMEOUT
|
|
|
|
# The registered executors available for running jobs, in order.
|
|
self._ordered_executors = []
|
|
|
|
# The registered executors, mapped by their unique name.
|
|
self._executor_name_to_executor = {}
|
|
|
|
# Map of etcd keys being watched to the tasks watching them
|
|
self._watch_tasks = {}
|
|
|
|
# Map from builder component to its associated job.
|
|
self._component_to_job = {}
|
|
|
|
# Map from build UUID to a BuildInfo tuple with information about the build.
|
|
self._build_uuid_to_info = {}
|
|
|
|
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
|
|
restarter=None):
|
|
watch_task_key = (etcd_key, recursive)
|
|
def callback_wrapper(changed_key_future):
|
|
new_index = start_index
|
|
etcd_result = None
|
|
|
|
if not changed_key_future.cancelled():
|
|
try:
|
|
etcd_result = changed_key_future.result()
|
|
existing_index = getattr(etcd_result, 'etcd_index', None)
|
|
new_index = etcd_result.modifiedIndex + 1
|
|
|
|
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
|
|
'*' if recursive else '', existing_index, etcd_result)
|
|
|
|
except ReadTimeoutError:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
|
|
except etcd.EtcdEventIndexCleared:
|
|
# This happens if etcd2 has moved forward too fast for us to start watching
|
|
# at the index we retrieved. We therefore start a new watch at HEAD and
|
|
# (if specified) call the restarter method which should conduct a read and
|
|
# reset the state of the manager.
|
|
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
|
|
new_index = None
|
|
if restarter is not None:
|
|
async(restarter())
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Etcd key already cleared: %s', etcd_key)
|
|
return
|
|
|
|
except etcd.EtcdException as eex:
|
|
# TODO(jschorr): This is a quick and dirty hack and should be replaced
|
|
# with a proper exception check.
|
|
if str(eex.message).find('Read timed out') >= 0:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
else:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
except ProtocolError:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
|
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
|
|
|
|
if etcd_result:
|
|
change_callback(etcd_result)
|
|
|
|
if not self._shutting_down:
|
|
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
|
|
'*' if recursive else '', start_index)
|
|
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT)
|
|
watch_future.add_done_callback(callback_wrapper)
|
|
|
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
|
|
|
@coroutine
|
|
def _handle_job_expiration_or_delete(self, etcd_result):
|
|
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
|
if etcd_result is None:
|
|
return
|
|
|
|
# Handle the expiration/deletion
|
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
|
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
|
logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid)
|
|
|
|
# Pop the build info.
|
|
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
|
if build_info is None:
|
|
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
|
|
etcd_result.action, build_job.build_uuid, job_metadata)
|
|
return
|
|
|
|
# If the etcd action was not an expiration, then it was already deleted and the execution
|
|
# shutdown.
|
|
if etcd_result.action != EtcdAction.EXPIRE:
|
|
# Build information will no longer be needed; pop it off.
|
|
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
|
return
|
|
|
|
execution_id = build_info.execution_id
|
|
|
|
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
|
# the job as incomplete here.
|
|
if not job_metadata.get('had_heartbeat', True):
|
|
logger.warning('Build executor failed to successfully boot with execution id %s',
|
|
execution_id)
|
|
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
|
|
|
# Finally, we terminate the build execution for the job.
|
|
logger.info('Terminating expired build executor for job %s with execution id %s',
|
|
build_job.build_uuid, execution_id)
|
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
|
|
|
def _handle_realm_change(self, etcd_result):
|
|
if etcd_result is None:
|
|
return
|
|
|
|
if etcd_result.action == EtcdAction.CREATE:
|
|
# We must listen on the realm created by ourselves or another worker
|
|
realm_spec = json.loads(etcd_result.value)
|
|
self._register_realm(realm_spec)
|
|
|
|
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
|
|
# We must stop listening for new connections on the specified realm, if we did not get the
|
|
# connection
|
|
realm_spec = json.loads(etcd_result._prev_node.value)
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
build_uuid = build_job.build_uuid
|
|
|
|
logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid)
|
|
build_info = self._build_uuid_to_info.get(build_uuid, None)
|
|
if build_info is not None:
|
|
# Pop the component off. If we find one, then the build has not connected to this manager,
|
|
# so we can safely unregister its component.
|
|
component = self._component_to_job.pop(build_info.component, None)
|
|
if component is not None:
|
|
# We were not the manager which the worker connected to, remove the bookkeeping for it
|
|
logger.debug('Unregistering unused component for build %s', build_uuid)
|
|
self.unregister_component(build_info.component)
|
|
|
|
else:
|
|
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
|
|
|
|
def _register_realm(self, realm_spec):
|
|
logger.debug('Registering realm with manager: %s', realm_spec['realm'])
|
|
component = self.register_component(realm_spec['realm'], BuildComponent,
|
|
token=realm_spec['token'])
|
|
|
|
if component in self._component_to_job:
|
|
logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
|
|
return component
|
|
|
|
# Create the build information block for the registered realm.
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
|
|
# TODO(jschorr): Remove the back-compat lookups once we've finished the rollout.
|
|
execution_id = realm_spec.get('execution_id', realm_spec.get('builder_id', None))
|
|
executor_name = realm_spec.get('executor_name', 'EC2Executor')
|
|
|
|
build_info = BuildInfo(component=component, build_job=build_job, execution_id=execution_id,
|
|
executor_name=executor_name)
|
|
|
|
self._component_to_job[component] = build_job
|
|
self._build_uuid_to_info[build_job.build_uuid] = build_info
|
|
return component
|
|
|
|
@property
|
|
def registered_executors(self):
|
|
return self._ordered_executors
|
|
|
|
@coroutine
|
|
def _register_existing_realms(self):
|
|
try:
|
|
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
|
|
|
|
# Register all existing realms found.
|
|
encountered = set()
|
|
for realm in all_realms.children:
|
|
if not realm.dir:
|
|
component = self._register_realm(json.loads(realm.value))
|
|
encountered.add(component)
|
|
|
|
# Remove any components not encountered so we can clean up.
|
|
for component, job in list(self._component_to_job.items()):
|
|
if not component in encountered:
|
|
self._component_to_job.pop(component, None)
|
|
self._build_uuid_to_info.pop(job.build_uuid, None)
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# no realms have been registered yet
|
|
pass
|
|
|
|
def _load_executor(self, executor_kind_name, executor_config):
|
|
executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
|
|
if executor_klass is None:
|
|
logger.error('Unknown executor %s; skipping install', executor_kind_name)
|
|
return
|
|
|
|
executor = executor_klass(executor_config, self.manager_hostname)
|
|
if executor.name in self._executor_name_to_executor:
|
|
raise Exception('Executor with name %s already registered' % executor.name)
|
|
|
|
self._ordered_executors.append(executor)
|
|
self._executor_name_to_executor[executor.name] = executor
|
|
|
|
def initialize(self, manager_config):
|
|
logger.debug('Calling initialize')
|
|
self._manager_config = manager_config
|
|
|
|
# Note: Executor config can be defined either as a single block of EXECUTOR_CONFIG (old style)
|
|
# or as a new set of executor configurations, with the order determining how we fallback. We
|
|
# check for both here to ensure backwards compatibility.
|
|
if manager_config.get('EXECUTORS'):
|
|
for executor_config in manager_config['EXECUTORS']:
|
|
self._load_executor(executor_config.get('EXECUTOR'), executor_config)
|
|
else:
|
|
self._load_executor(manager_config.get('EXECUTOR'), manager_config.get('EXECUTOR_CONFIG'))
|
|
|
|
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
|
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
|
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
|
|
|
|
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
|
|
if etcd_auth is not None:
|
|
etcd_auth = tuple(etcd_auth) # Convert YAML list to a tuple
|
|
|
|
etcd_protocol = 'http' if etcd_auth is None else 'https'
|
|
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
|
|
|
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
|
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(
|
|
worker_threads,
|
|
host=etcd_host,
|
|
port=etcd_port,
|
|
cert=etcd_auth,
|
|
ca_cert=etcd_ca_cert,
|
|
protocol=etcd_protocol,
|
|
read_timeout=5,
|
|
)
|
|
|
|
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
|
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete)
|
|
|
|
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
|
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
|
restarter=self._register_existing_realms)
|
|
|
|
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
|
|
DEFAULT_EPHEMERAL_API_TIMEOUT)
|
|
|
|
self._ephemeral_setup_timeout = self._manager_config.get('SETUP_TIMEOUT',
|
|
DEFAULT_EPHEMERAL_SETUP_TIMEOUT)
|
|
|
|
# Load components for all realms currently known to the cluster
|
|
async(self._register_existing_realms())
|
|
|
|
def setup_time(self):
|
|
return self._manager_config.get('MACHINE_SETUP_TIME', 300)
|
|
|
|
def shutdown(self):
|
|
logger.debug('Shutting down worker.')
|
|
self._shutting_down = True
|
|
|
|
for (etcd_key, _), task in self._watch_tasks.items():
|
|
if not task.done():
|
|
logger.debug('Canceling watch task for %s', etcd_key)
|
|
task.cancel()
|
|
|
|
if self._async_thread_executor is not None:
|
|
logger.debug('Shutting down thread pool executor.')
|
|
self._async_thread_executor.shutdown()
|
|
|
|
@coroutine
|
|
def schedule(self, build_job):
|
|
build_uuid = build_job.job_details['build_uuid']
|
|
logger.debug('Calling schedule with job: %s', build_uuid)
|
|
|
|
# Check if there are worker slots avialable by checking the number of jobs in etcd
|
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
|
try:
|
|
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
|
|
workers_alive = sum(1 for child in active_jobs.children if not child.dir)
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
workers_alive = 0
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
logger.debug('Total jobs (scheduling job %s): %s', build_uuid, workers_alive)
|
|
|
|
if workers_alive >= allowed_worker_count:
|
|
logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
|
|
build_uuid, workers_alive, allowed_worker_count)
|
|
raise Return(False, NO_WORKER_AVAILABLE_TIMEOUT)
|
|
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
|
realm = str(uuid.uuid4())
|
|
token = str(uuid.uuid4())
|
|
nonce = str(uuid.uuid4())
|
|
setup_time = self.setup_time()
|
|
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
|
|
|
|
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
|
|
|
|
payload = {
|
|
# TODO: remove expiration (but not max_expiration) after migration; not used.
|
|
'expiration': calendar.timegm(expiration.timetuple()),
|
|
'max_expiration': calendar.timegm(max_expiration.timetuple()),
|
|
'nonce': nonce,
|
|
'had_heartbeat': False,
|
|
'job_queue_item': build_job.job_item,
|
|
}
|
|
|
|
lock_payload = json.dumps(payload)
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
|
|
ttl=self._ephemeral_setup_timeout))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# The job was already taken by someone else, we are probably a retry
|
|
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
|
raise Return(False, self._ephemeral_api_timeout)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
started_with_executor = None
|
|
execution_id = None
|
|
|
|
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
|
|
|
|
for executor in self._ordered_executors:
|
|
# Check if we can use this executor based on its whitelist, by namespace.
|
|
namespace = build_job.namespace
|
|
if not executor.allowed_for_namespace(namespace):
|
|
logger.debug('Job %s (namespace: %s) cannot use executor %s', build_uuid, namespace,
|
|
executor.name)
|
|
continue
|
|
|
|
# Check if we can use this executor based on the retries remaining.
|
|
if executor.minimum_retry_threshold > build_job.retries_remaining:
|
|
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
|
|
build_uuid, executor.name, executor.minimum_retry_threshold,
|
|
build_job.retries_remaining)
|
|
continue
|
|
|
|
logger.debug('Starting builder for job %s with selected executor: %s', build_uuid,
|
|
executor.name)
|
|
|
|
try:
|
|
execution_id = yield From(executor.start_builder(realm, token, build_uuid))
|
|
except:
|
|
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
|
continue
|
|
|
|
try:
|
|
metric_queue.put_deprecated('EphemeralBuilderStarted', 1, unit='Count')
|
|
metric_queue.ephemeral_build_workers.Inc(labelvalues=[execution_id, build_uuid])
|
|
except:
|
|
logger.exception('Exception when writing start metrics for execution %s for job %s',
|
|
execution_id, build_uuid)
|
|
|
|
started_with_executor = executor
|
|
|
|
# Break out of the loop now that we've started a builder successfully.
|
|
break
|
|
|
|
if started_with_executor is None:
|
|
logger.error('Could not start ephemeral worker for build %s', build_uuid)
|
|
raise Return(False, self._ephemeral_api_timeout)
|
|
|
|
logger.debug('Started execution with ID %s for job: %s with executor: %s',
|
|
execution_id, build_uuid, started_with_executor.name)
|
|
|
|
# Store the realm spec which will allow any manager to accept this builder when it connects
|
|
realm_spec = json.dumps({
|
|
'realm': realm,
|
|
'token': token,
|
|
'execution_id': execution_id,
|
|
'executor_name': started_with_executor.name,
|
|
'job_queue_item': build_job.job_item,
|
|
|
|
# TODO: remove this back-compat field once we finish the rollout.
|
|
'builder_id': execution_id,
|
|
})
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
|
ttl=setup_time))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.error('Realm %s already exists in etcd for job %s ' +
|
|
'UUID collision or something is very very wrong.', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
|
|
logger.debug('Builder spawn complete for job %s using executor %s with ID %s ', build_uuid,
|
|
started_with_executor.name, execution_id)
|
|
raise Return(True, None)
|
|
|
|
@coroutine
|
|
def build_component_ready(self, build_component):
|
|
try:
|
|
# Pop off the job for the component. We do so before we send out the etcd watch below,
|
|
# as it will also remove this mapping.
|
|
job = self._component_to_job.pop(build_component)
|
|
if job is None:
|
|
logger.error('Could not find job for the build component on realm %s',
|
|
build_component.builder_realm)
|
|
return
|
|
|
|
# Clean up the bookkeeping for allowing any manager to take the job.
|
|
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
|
|
|
# Start the build job.
|
|
logger.debug('Sending build %s to newly ready component on realm %s',
|
|
job.build_uuid, build_component.builder_realm)
|
|
yield From(build_component.start_build(job))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.warning('Builder is asking for more work, but work already completed')
|
|
|
|
def build_component_disposed(self, build_component, timed_out):
|
|
logger.debug('Calling build_component_disposed.')
|
|
self.unregister_component(build_component)
|
|
|
|
@coroutine
|
|
def job_completed(self, build_job, job_status, build_component):
|
|
logger.debug('Calling job_completed for job %s with status: %s',
|
|
build_job.build_uuid, job_status)
|
|
|
|
# Mark the job as completed.
|
|
self.job_complete_callback(build_job, job_status)
|
|
|
|
# Kill the ephmeral builder.
|
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
|
|
|
# Delete the build job from etcd.
|
|
job_key = self._etcd_job_key(build_job)
|
|
try:
|
|
yield From(self._etcd_client.delete(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Builder is asking for job to be removed, but work already completed')
|
|
|
|
logger.debug('job_completed for job %s with status: %s', build_job.build_uuid, job_status)
|
|
|
|
@coroutine
|
|
def kill_builder_executor(self, build_uuid):
|
|
logger.info('Starting termination of executor for job %s', build_uuid)
|
|
build_info = self._build_uuid_to_info.pop(build_uuid, None)
|
|
if build_info is None:
|
|
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
|
|
return
|
|
|
|
# Remove the build's component.
|
|
self._component_to_job.pop(build_info.component, None)
|
|
|
|
# Stop the build node/executor itself.
|
|
executor = self._executor_name_to_executor.get(build_info.executor_name)
|
|
if executor is None:
|
|
logger.error('Could not find registered executor %s for build %s',
|
|
build_info.executor_name, build_uuid)
|
|
return
|
|
|
|
# Terminate the executor's execution.
|
|
logger.info('Terminating executor for job %s with execution id %s',
|
|
build_uuid, build_info.execution_id)
|
|
yield From(executor.stop_builder(build_info.execution_id))
|
|
|
|
@coroutine
|
|
def job_heartbeat(self, build_job):
|
|
# Extend the deadline in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
try:
|
|
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.info('Job %s no longer exists in etcd', build_job.build_uuid)
|
|
return
|
|
|
|
build_job_metadata = json.loads(build_job_metadata_response.value)
|
|
|
|
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
|
|
|
|
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
|
|
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
|
|
|
payload = {
|
|
# TODO: remove expiration (but not max_expiration) after migration; not used.
|
|
'expiration': calendar.timegm(new_expiration.timetuple()),
|
|
'job_queue_item': build_job.job_item,
|
|
'max_expiration': build_job_metadata['max_expiration'],
|
|
'had_heartbeat': True,
|
|
}
|
|
|
|
# Note: A TTL of < 0 in etcd results in the key *never being expired*. We use a max here
|
|
# to ensure that if the TTL is < 0, the key will expire immediately.
|
|
etcd_ttl = max(ttl, 0)
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
|
|
self.job_heartbeat_callback(build_job)
|
|
|
|
def _etcd_job_key(self, build_job):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_job_prefix, build_job.job_details['build_uuid'])
|
|
|
|
def _etcd_realm_key(self, realm):
|
|
""" Create a key which is used to track an incoming connection on a realm.
|
|
"""
|
|
return os.path.join(self._etcd_realm_prefix, realm)
|
|
|
|
def num_workers(self):
|
|
""" Return the number of workers we're managing locally.
|
|
"""
|
|
return len(self._component_to_job)
|