Etcd can miss events on watches if they are occurring fast enough, so if we can get an exception indicating that we've missed an index, we reset the state of our local tracking structures by re-reading the *full* list and starting a new watch at HEAD
438 lines
17 KiB
Python
438 lines
17 KiB
Python
import logging
|
|
import etcd
|
|
import uuid
|
|
import calendar
|
|
import os.path
|
|
import json
|
|
|
|
from datetime import datetime, timedelta
|
|
from trollius import From, coroutine, Return, async
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib3.exceptions import ReadTimeoutError, ProtocolError
|
|
|
|
from buildman.manager.basemanager import BaseManager
|
|
from buildman.manager.executor import PopenExecutor, EC2Executor
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.jobutil.buildjob import BuildJob
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from buildman.server import BuildJobResult
|
|
from util.morecollections import AttrDict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ETCD_DISABLE_TIMEOUT = 0
|
|
EC2_API_TIMEOUT = 20
|
|
RETRY_IMMEDIATELY_TIMEOUT = 0
|
|
|
|
|
|
class EtcdAction(object):
|
|
GET = 'get'
|
|
SET = 'set'
|
|
EXPIRE = 'expire'
|
|
UPDATE = 'update'
|
|
DELETE = 'delete'
|
|
CREATE = 'create'
|
|
COMPARE_AND_SWAP = 'compareAndSwap'
|
|
COMPARE_AND_DELETE = 'compareAndDelete'
|
|
|
|
|
|
class EphemeralBuilderManager(BaseManager):
|
|
""" Build manager implementation for the Enterprise Registry. """
|
|
_executors = {
|
|
'popen': PopenExecutor,
|
|
'ec2': EC2Executor,
|
|
}
|
|
|
|
_etcd_client_klass = etcd.Client
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._shutting_down = False
|
|
|
|
self._manager_config = None
|
|
self._async_thread_executor = None
|
|
self._etcd_client = None
|
|
|
|
self._etcd_realm_prefix = None
|
|
self._etcd_builder_prefix = None
|
|
|
|
self._component_to_job = {}
|
|
self._job_uuid_to_component = {}
|
|
self._component_to_builder = {}
|
|
|
|
self._executor = None
|
|
|
|
# Map of etcd keys being watched to the tasks watching them
|
|
self._watch_tasks = {}
|
|
|
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
|
|
|
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True):
|
|
watch_task_key = (etcd_key, recursive)
|
|
def callback_wrapper(changed_key_future):
|
|
new_index = start_index
|
|
etcd_result = None
|
|
|
|
if not changed_key_future.cancelled():
|
|
try:
|
|
etcd_result = changed_key_future.result()
|
|
existing_index = getattr(etcd_result, 'etcd_index', None)
|
|
new_index = etcd_result.modifiedIndex + 1
|
|
|
|
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
|
|
'*' if recursive else '', existing_index, etcd_result)
|
|
|
|
except ReadTimeoutError:
|
|
logger.debug('Read-timeout on etcd watch: %s', etcd_key)
|
|
|
|
except (ProtocolError, etcd.EtcdException):
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
except etcd.EtcdEventIndexCleared:
|
|
# This happens if etcd2 has moved forward too fast for us to start watching
|
|
# at the index we retrieved. We therefore start a new watch at HEAD and
|
|
# call the method to read the key and load ALL realms to handle any we might
|
|
# have missed.
|
|
# TODO: Remove this hack once Etcd is fixed.
|
|
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
|
|
new_index = None
|
|
async(self._register_existing_realms())
|
|
|
|
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
|
self._watch_etcd(etcd_key, change_callback, start_index=new_index)
|
|
|
|
if etcd_result:
|
|
change_callback(etcd_result)
|
|
|
|
if not self._shutting_down:
|
|
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
|
|
'*' if recursive else '', start_index)
|
|
|
|
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
|
|
timeout=ETCD_DISABLE_TIMEOUT)
|
|
watch_future.add_done_callback(callback_wrapper)
|
|
|
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
|
|
|
@coroutine
|
|
def _handle_builder_expiration(self, etcd_result):
|
|
if etcd_result is None:
|
|
return
|
|
|
|
if etcd_result.action == EtcdAction.EXPIRE:
|
|
# Handle the expiration
|
|
logger.debug('Builder expired, clean up the old build node')
|
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
|
|
|
if 'builder_id' in job_metadata:
|
|
builder_id = job_metadata['builder_id']
|
|
|
|
# Before we delete the build node, we take a lock to make sure that only one manager
|
|
# can terminate the node.
|
|
try:
|
|
lock_key = self._etcd_lock_key(builder_id)
|
|
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
|
|
return
|
|
|
|
if not job_metadata.get('had_heartbeat', True):
|
|
logger.warning('Build node failed to successfully boot: %s', builder_id)
|
|
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
|
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
|
|
|
logger.info('Terminating expired build node: %s', builder_id)
|
|
yield From(self._executor.stop_builder(builder_id))
|
|
|
|
|
|
def _handle_realm_change(self, etcd_result):
|
|
if etcd_result is None:
|
|
return
|
|
|
|
if etcd_result.action == EtcdAction.CREATE:
|
|
# We must listen on the realm created by ourselves or another worker
|
|
realm_spec = json.loads(etcd_result.value)
|
|
self._register_realm(realm_spec)
|
|
|
|
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
|
|
# We must stop listening for new connections on the specified realm, if we did not get the
|
|
# connection
|
|
realm_spec = json.loads(etcd_result._prev_node.value)
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None)
|
|
if component is not None:
|
|
# We were not the manager which the worker connected to, remove the bookkeeping for it
|
|
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm'])
|
|
del self._component_to_job[component]
|
|
del self._component_to_builder[component]
|
|
self.unregister_component(component)
|
|
|
|
else:
|
|
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
|
|
|
|
def _register_realm(self, realm_spec):
|
|
logger.debug('Registering realm with manager: %s', realm_spec['realm'])
|
|
component = self.register_component(realm_spec['realm'], BuildComponent,
|
|
token=realm_spec['token'])
|
|
|
|
if component in self._component_to_job:
|
|
logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
|
|
return component
|
|
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
self._component_to_job[component] = build_job
|
|
self._component_to_builder[component] = realm_spec['builder_id']
|
|
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
|
|
return component
|
|
|
|
@coroutine
|
|
def _register_existing_realms(self):
|
|
try:
|
|
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
|
|
|
|
# Register all existing realms found.
|
|
encountered = set()
|
|
for realm in all_realms.children:
|
|
if not realm.dir:
|
|
component = self._register_realm(json.loads(realm.value))
|
|
encountered.add(component)
|
|
|
|
# Remove any components not encountered so we can clean up.
|
|
for found in list(self._component_to_job.keys()):
|
|
if not found in encountered:
|
|
self._component_to_job.pop(component)
|
|
self._component_to_builder.pop(component)
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# no realms have been registered yet
|
|
pass
|
|
|
|
def initialize(self, manager_config):
|
|
logger.debug('Calling initialize')
|
|
self._manager_config = manager_config
|
|
|
|
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
|
|
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
|
|
self.manager_hostname)
|
|
|
|
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
|
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
|
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
|
|
|
|
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
|
|
if etcd_auth is not None:
|
|
etcd_auth = tuple(etcd_auth) # Convert YAML list to a tuple
|
|
|
|
etcd_protocol = 'http' if etcd_auth is None else 'https'
|
|
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
|
|
|
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
|
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
|
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
|
|
cert=etcd_auth, ca_cert=etcd_ca_cert,
|
|
protocol=etcd_protocol,
|
|
read_timeout=5),
|
|
executor=self._async_thread_executor)
|
|
|
|
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
|
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration)
|
|
|
|
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
|
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change)
|
|
|
|
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
|
|
|
|
# Load components for all realms currently known to the cluster
|
|
async(self._register_existing_realms())
|
|
|
|
def setup_time(self):
|
|
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
|
|
return setup_time
|
|
|
|
def shutdown(self):
|
|
logger.debug('Shutting down worker.')
|
|
self._shutting_down = True
|
|
|
|
for (etcd_key, _), task in self._watch_tasks.items():
|
|
if not task.done():
|
|
logger.debug('Canceling watch task for %s', etcd_key)
|
|
task.cancel()
|
|
|
|
if self._async_thread_executor is not None:
|
|
logger.debug('Shutting down thread pool executor.')
|
|
self._async_thread_executor.shutdown()
|
|
|
|
@coroutine
|
|
def schedule(self, build_job):
|
|
build_uuid = build_job.job_details['build_uuid']
|
|
logger.debug('Calling schedule with job: %s', build_uuid)
|
|
|
|
# Check if there are worker slots avialable by checking the number of jobs in etcd
|
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
|
try:
|
|
building = yield From(self._etcd_client.read(self._etcd_builder_prefix, recursive=True))
|
|
workers_alive = sum(1 for child in building.children if not child.dir)
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
workers_alive = 0
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
logger.debug('Total jobs: %s', workers_alive)
|
|
|
|
if workers_alive >= allowed_worker_count:
|
|
logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
|
|
build_uuid, workers_alive, allowed_worker_count)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
|
realm = str(uuid.uuid4())
|
|
token = str(uuid.uuid4())
|
|
nonce = str(uuid.uuid4())
|
|
setup_time = self.setup_time()
|
|
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
|
|
|
|
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(expiration.timetuple()),
|
|
'max_expiration': calendar.timegm(max_expiration.timetuple()),
|
|
'nonce': nonce,
|
|
'had_heartbeat': False,
|
|
'job_queue_item': build_job.job_item,
|
|
}
|
|
lock_payload = json.dumps(payload)
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
|
|
ttl=EC2_API_TIMEOUT))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# The job was already taken by someone else, we are probably a retry
|
|
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
executor_type = self._executor.__class__.__name__
|
|
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
|
|
|
|
try:
|
|
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
|
|
except:
|
|
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
|
|
# Store the builder in etcd associated with the job id
|
|
try:
|
|
payload['builder_id'] = builder_id
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
|
|
ttl=setup_time))
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
|
|
# Store the realm spec which will allow any manager to accept this builder when it connects
|
|
realm_spec = json.dumps({
|
|
'realm': realm,
|
|
'token': token,
|
|
'builder_id': builder_id,
|
|
'job_queue_item': build_job.job_item,
|
|
})
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
|
ttl=setup_time))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.error('Realm %s already exists in etcd for job %s ' +
|
|
'UUID collision or something is very very wrong.', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
|
|
raise Return(True, None)
|
|
|
|
@coroutine
|
|
def build_component_ready(self, build_component):
|
|
try:
|
|
# Clean up the bookkeeping for allowing any manager to take the job
|
|
job = self._component_to_job.pop(build_component)
|
|
del self._job_uuid_to_component[job.job_details['build_uuid']]
|
|
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
|
|
|
logger.debug('Sending build %s to newly ready component on realm %s',
|
|
job.job_details['build_uuid'], build_component.builder_realm)
|
|
yield From(build_component.start_build(job))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.warning('Builder is asking for more work, but work already completed')
|
|
|
|
def build_component_disposed(self, build_component, timed_out):
|
|
logger.debug('Calling build_component_disposed.')
|
|
self.unregister_component(build_component)
|
|
|
|
@coroutine
|
|
def job_completed(self, build_job, job_status, build_component):
|
|
logger.debug('Calling job_completed with status: %s', job_status)
|
|
|
|
# Kill the ephmeral builder
|
|
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
|
|
|
|
# Release the lock in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
try:
|
|
yield From(self._etcd_client.delete(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.exception('Builder is asking for job to be removed, but work already completed')
|
|
|
|
self.job_complete_callback(build_job, job_status)
|
|
|
|
@coroutine
|
|
def job_heartbeat(self, build_job):
|
|
# Extend the deadline in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
|
|
build_job_metadata = json.loads(build_job_metadata_response.value)
|
|
|
|
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
|
|
|
|
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
|
|
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(new_expiration.timetuple()),
|
|
'builder_id': build_job_metadata['builder_id'],
|
|
'job_queue_item': build_job.job_item,
|
|
'max_expiration': build_job_metadata['max_expiration'],
|
|
'had_heartbeat': True,
|
|
}
|
|
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
|
|
|
|
self.job_heartbeat_callback(build_job)
|
|
|
|
def _etcd_job_key(self, build_job):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid'])
|
|
|
|
def _etcd_lock_key(self, unique_lock_id):
|
|
""" Create a key which is used to create a temporary lock in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
|
|
|
|
def _etcd_realm_key(self, realm):
|
|
""" Create a key which is used to track an incoming connection on a realm.
|
|
"""
|
|
return os.path.join(self._etcd_realm_prefix, realm)
|
|
|
|
def num_workers(self):
|
|
""" Return the number of workers we're managing locally.
|
|
"""
|
|
return len(self._component_to_builder)
|