773e73861f
Fixes #1046
462 lines
18 KiB
Python
462 lines
18 KiB
Python
import logging
|
|
import etcd
|
|
import uuid
|
|
import calendar
|
|
import os.path
|
|
import json
|
|
|
|
from datetime import datetime, timedelta
|
|
from trollius import From, coroutine, Return, async
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib3.exceptions import ReadTimeoutError, ProtocolError
|
|
|
|
from app import metric_queue
|
|
from buildman.manager.basemanager import BaseManager
|
|
from buildman.manager.executor import PopenExecutor, EC2Executor
|
|
from buildman.component.buildcomponent import BuildComponent
|
|
from buildman.jobutil.buildjob import BuildJob
|
|
from buildman.asyncutil import AsyncWrapper
|
|
from buildman.server import BuildJobResult
|
|
from util.morecollections import AttrDict
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ETCD_MAX_WATCH_TIMEOUT = 30
|
|
EC2_API_TIMEOUT = 20
|
|
RETRY_IMMEDIATELY_TIMEOUT = 0
|
|
|
|
|
|
class EtcdAction(object):
|
|
GET = 'get'
|
|
SET = 'set'
|
|
EXPIRE = 'expire'
|
|
UPDATE = 'update'
|
|
DELETE = 'delete'
|
|
CREATE = 'create'
|
|
COMPARE_AND_SWAP = 'compareAndSwap'
|
|
COMPARE_AND_DELETE = 'compareAndDelete'
|
|
|
|
|
|
class EphemeralBuilderManager(BaseManager):
|
|
""" Build manager implementation for the Enterprise Registry. """
|
|
_executors = {
|
|
'popen': PopenExecutor,
|
|
'ec2': EC2Executor,
|
|
}
|
|
|
|
_etcd_client_klass = etcd.Client
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._shutting_down = False
|
|
|
|
self._manager_config = None
|
|
self._async_thread_executor = None
|
|
self._etcd_client = None
|
|
|
|
self._etcd_realm_prefix = None
|
|
self._etcd_builder_prefix = None
|
|
|
|
self._component_to_job = {}
|
|
self._job_uuid_to_component = {}
|
|
self._component_to_builder = {}
|
|
|
|
self._executor = None
|
|
|
|
# Map of etcd keys being watched to the tasks watching them
|
|
self._watch_tasks = {}
|
|
|
|
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
|
|
|
|
def _watch_etcd(self, etcd_key, change_callback, start_index=None, recursive=True,
|
|
restarter=None):
|
|
watch_task_key = (etcd_key, recursive)
|
|
def callback_wrapper(changed_key_future):
|
|
new_index = start_index
|
|
etcd_result = None
|
|
|
|
if not changed_key_future.cancelled():
|
|
try:
|
|
etcd_result = changed_key_future.result()
|
|
existing_index = getattr(etcd_result, 'etcd_index', None)
|
|
new_index = etcd_result.modifiedIndex + 1
|
|
|
|
logger.debug('Got watch of key: %s%s at #%s with result: %s', etcd_key,
|
|
'*' if recursive else '', existing_index, etcd_result)
|
|
|
|
except ReadTimeoutError:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
|
|
except etcd.EtcdEventIndexCleared:
|
|
# This happens if etcd2 has moved forward too fast for us to start watching
|
|
# at the index we retrieved. We therefore start a new watch at HEAD and
|
|
# (if specified) call the restarter method which should conduct a read and
|
|
# reset the state of the manager.
|
|
# TODO: Remove this hack once Etcd is fixed.
|
|
logger.exception('Etcd moved forward too quickly. Restarting watch cycle.')
|
|
new_index = None
|
|
if restarter is not None:
|
|
async(restarter())
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Etcd key already cleared: %s', etcd_key)
|
|
return
|
|
|
|
except etcd.EtcdException as eex:
|
|
# TODO(jschorr): This is a quick and dirty hack and should be replaced
|
|
# with a proper exception check.
|
|
if str(eex.message).find('Read timed out') >= 0:
|
|
logger.debug('Read-timeout on etcd watch %s, rescheduling', etcd_key)
|
|
else:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
except ProtocolError:
|
|
logger.exception('Exception on etcd watch: %s', etcd_key)
|
|
|
|
|
|
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
|
|
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
|
|
|
|
if etcd_result:
|
|
change_callback(etcd_result)
|
|
|
|
if not self._shutting_down:
|
|
logger.debug('Scheduling watch of key: %s%s at start index %s', etcd_key,
|
|
'*' if recursive else '', start_index)
|
|
|
|
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive, index=start_index,
|
|
timeout=ETCD_MAX_WATCH_TIMEOUT)
|
|
watch_future.add_done_callback(callback_wrapper)
|
|
|
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
|
|
|
@coroutine
|
|
def _handle_builder_expiration(self, etcd_result):
|
|
if etcd_result is None:
|
|
return
|
|
|
|
if etcd_result.action == EtcdAction.EXPIRE:
|
|
# Handle the expiration
|
|
logger.debug('Builder expired, clean up the old build node')
|
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
|
|
|
if 'builder_id' in job_metadata:
|
|
builder_id = job_metadata['builder_id']
|
|
|
|
# Before we delete the build node, we take a lock to make sure that only one manager
|
|
# can terminate the node.
|
|
try:
|
|
lock_key = self._etcd_lock_key(builder_id)
|
|
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
|
|
return
|
|
|
|
if not job_metadata.get('had_heartbeat', True):
|
|
logger.warning('Build node failed to successfully boot: %s', builder_id)
|
|
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
|
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
|
|
|
logger.info('Terminating expired build node: %s', builder_id)
|
|
yield From(self._executor.stop_builder(builder_id))
|
|
|
|
|
|
def _handle_realm_change(self, etcd_result):
|
|
if etcd_result is None:
|
|
return
|
|
|
|
if etcd_result.action == EtcdAction.CREATE:
|
|
# We must listen on the realm created by ourselves or another worker
|
|
realm_spec = json.loads(etcd_result.value)
|
|
self._register_realm(realm_spec)
|
|
|
|
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
|
|
# We must stop listening for new connections on the specified realm, if we did not get the
|
|
# connection
|
|
realm_spec = json.loads(etcd_result._prev_node.value)
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None)
|
|
if component is not None:
|
|
# We were not the manager which the worker connected to, remove the bookkeeping for it
|
|
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm'])
|
|
del self._component_to_job[component]
|
|
del self._component_to_builder[component]
|
|
self.unregister_component(component)
|
|
|
|
else:
|
|
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
|
|
|
|
def _register_realm(self, realm_spec):
|
|
logger.debug('Registering realm with manager: %s', realm_spec['realm'])
|
|
component = self.register_component(realm_spec['realm'], BuildComponent,
|
|
token=realm_spec['token'])
|
|
|
|
if component in self._component_to_job:
|
|
logger.debug('Realm already registered with manager: %s', realm_spec['realm'])
|
|
return component
|
|
|
|
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
|
|
self._component_to_job[component] = build_job
|
|
self._component_to_builder[component] = realm_spec['builder_id']
|
|
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
|
|
return component
|
|
|
|
@coroutine
|
|
def _register_existing_realms(self):
|
|
try:
|
|
all_realms = yield From(self._etcd_client.read(self._etcd_realm_prefix, recursive=True))
|
|
|
|
# Register all existing realms found.
|
|
encountered = set()
|
|
for realm in all_realms.children:
|
|
if not realm.dir:
|
|
component = self._register_realm(json.loads(realm.value))
|
|
encountered.add(component)
|
|
|
|
# Remove any components not encountered so we can clean up.
|
|
for found in list(self._component_to_job.keys()):
|
|
if not found in encountered:
|
|
self._component_to_job.pop(component)
|
|
self._component_to_builder.pop(component)
|
|
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# no realms have been registered yet
|
|
pass
|
|
|
|
def initialize(self, manager_config):
|
|
logger.debug('Calling initialize')
|
|
self._manager_config = manager_config
|
|
|
|
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
|
|
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
|
|
self.manager_hostname)
|
|
|
|
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
|
|
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
|
|
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
|
|
|
|
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
|
|
if etcd_auth is not None:
|
|
etcd_auth = tuple(etcd_auth) # Convert YAML list to a tuple
|
|
|
|
etcd_protocol = 'http' if etcd_auth is None else 'https'
|
|
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
|
|
|
|
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
|
|
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
|
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
|
|
cert=etcd_auth, ca_cert=etcd_ca_cert,
|
|
protocol=etcd_protocol,
|
|
read_timeout=5),
|
|
executor=self._async_thread_executor)
|
|
|
|
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
|
self._watch_etcd(self._etcd_builder_prefix, self._handle_builder_expiration)
|
|
|
|
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
|
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
|
restarter=self._register_existing_realms)
|
|
|
|
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
|
|
|
|
# Load components for all realms currently known to the cluster
|
|
async(self._register_existing_realms())
|
|
|
|
def setup_time(self):
|
|
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
|
|
return setup_time
|
|
|
|
def shutdown(self):
|
|
logger.debug('Shutting down worker.')
|
|
self._shutting_down = True
|
|
|
|
for (etcd_key, _), task in self._watch_tasks.items():
|
|
if not task.done():
|
|
logger.debug('Canceling watch task for %s', etcd_key)
|
|
task.cancel()
|
|
|
|
if self._async_thread_executor is not None:
|
|
logger.debug('Shutting down thread pool executor.')
|
|
self._async_thread_executor.shutdown()
|
|
|
|
@coroutine
|
|
def schedule(self, build_job):
|
|
build_uuid = build_job.job_details['build_uuid']
|
|
logger.debug('Calling schedule with job: %s', build_uuid)
|
|
|
|
# Check if there are worker slots avialable by checking the number of jobs in etcd
|
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
|
try:
|
|
building = yield From(self._etcd_client.read(self._etcd_builder_prefix, recursive=True))
|
|
workers_alive = sum(1 for child in building.children if not child.dir)
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
workers_alive = 0
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
logger.debug('Total jobs: %s', workers_alive)
|
|
|
|
if workers_alive >= allowed_worker_count:
|
|
logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
|
|
build_uuid, workers_alive, allowed_worker_count)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
# First try to take a lock for this job, meaning we will be responsible for its lifeline
|
|
realm = str(uuid.uuid4())
|
|
token = str(uuid.uuid4())
|
|
nonce = str(uuid.uuid4())
|
|
setup_time = self.setup_time()
|
|
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
|
|
|
|
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(expiration.timetuple()),
|
|
'max_expiration': calendar.timegm(max_expiration.timetuple()),
|
|
'nonce': nonce,
|
|
'had_heartbeat': False,
|
|
'job_queue_item': build_job.job_item,
|
|
}
|
|
lock_payload = json.dumps(payload)
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
|
|
ttl=EC2_API_TIMEOUT))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
# The job was already taken by someone else, we are probably a retry
|
|
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
|
|
|
executor_type = self._executor.__class__.__name__
|
|
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
|
|
|
|
try:
|
|
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
|
|
metric_queue.put('EC2BuilderStarted', 1, unit='Count')
|
|
except:
|
|
logger.exception('Exception when starting builder for job: %s', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
|
|
# Store the builder in etcd associated with the job id
|
|
try:
|
|
payload['builder_id'] = builder_id
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
|
|
ttl=setup_time))
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
|
raise Return(False, EC2_API_TIMEOUT)
|
|
|
|
# Store the realm spec which will allow any manager to accept this builder when it connects
|
|
realm_spec = json.dumps({
|
|
'realm': realm,
|
|
'token': token,
|
|
'builder_id': builder_id,
|
|
'job_queue_item': build_job.job_item,
|
|
})
|
|
|
|
try:
|
|
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
|
ttl=setup_time))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.error('Realm %s already exists in etcd for job %s ' +
|
|
'UUID collision or something is very very wrong.', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
except etcd.EtcdException:
|
|
logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
|
|
raise Return(False, setup_time)
|
|
|
|
raise Return(True, None)
|
|
|
|
@coroutine
|
|
def build_component_ready(self, build_component):
|
|
try:
|
|
# Clean up the bookkeeping for allowing any manager to take the job
|
|
job = self._component_to_job.pop(build_component)
|
|
del self._job_uuid_to_component[job.job_details['build_uuid']]
|
|
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
|
|
|
logger.debug('Sending build %s to newly ready component on realm %s',
|
|
job.job_details['build_uuid'], build_component.builder_realm)
|
|
yield From(build_component.start_build(job))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.warning('Builder is asking for more work, but work already completed')
|
|
|
|
def build_component_disposed(self, build_component, timed_out):
|
|
logger.debug('Calling build_component_disposed.')
|
|
self.unregister_component(build_component)
|
|
|
|
@coroutine
|
|
def job_completed(self, build_job, job_status, build_component):
|
|
logger.debug('Calling job_completed with status: %s', job_status)
|
|
|
|
# Kill the ephmeral builder
|
|
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
|
|
|
|
# Release the lock in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
try:
|
|
yield From(self._etcd_client.delete(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.debug('Builder is asking for job to be removed, but work already completed')
|
|
|
|
self.job_complete_callback(build_job, job_status)
|
|
|
|
@coroutine
|
|
def job_heartbeat(self, build_job):
|
|
# Extend the deadline in etcd
|
|
job_key = self._etcd_job_key(build_job)
|
|
|
|
try:
|
|
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
|
|
except (KeyError, etcd.EtcdKeyError):
|
|
logger.info('Job %s no longer exists in etcd', build_job.job_details['build_uuid'])
|
|
return
|
|
|
|
build_job_metadata = json.loads(build_job_metadata_response.value)
|
|
|
|
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
|
|
|
|
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
|
|
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
|
|
|
|
payload = {
|
|
'expiration': calendar.timegm(new_expiration.timetuple()),
|
|
'builder_id': build_job_metadata['builder_id'],
|
|
'job_queue_item': build_job.job_item,
|
|
'max_expiration': build_job_metadata['max_expiration'],
|
|
'had_heartbeat': True,
|
|
}
|
|
|
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
|
|
|
|
self.job_heartbeat_callback(build_job)
|
|
|
|
def _etcd_job_key(self, build_job):
|
|
""" Create a key which is used to track a job in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid'])
|
|
|
|
def _etcd_lock_key(self, unique_lock_id):
|
|
""" Create a key which is used to create a temporary lock in etcd.
|
|
"""
|
|
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
|
|
|
|
def _etcd_realm_key(self, realm):
|
|
""" Create a key which is used to track an incoming connection on a realm.
|
|
"""
|
|
return os.path.join(self._etcd_realm_prefix, realm)
|
|
|
|
def num_workers(self):
|
|
""" Return the number of workers we're managing locally.
|
|
"""
|
|
return len(self._component_to_builder)
|