Add additional logging, timeouts, and exception checks
This commit is contained in:
parent
328de0201f
commit
5dd78f76c7
2 changed files with 27 additions and 2 deletions
|
@ -4,6 +4,7 @@ import uuid
|
||||||
import calendar
|
import calendar
|
||||||
import os.path
|
import os.path
|
||||||
import json
|
import json
|
||||||
|
import boto
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from trollius import From, coroutine, Return, async
|
from trollius import From, coroutine, Return, async
|
||||||
|
@ -45,6 +46,10 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
_etcd_client_klass = etcd.Client
|
_etcd_client_klass = etcd.Client
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
# Set a timeout so that boto connections don't hang indefinitely.
|
||||||
|
boto.config.add_section('Boto')
|
||||||
|
boto.config.set('Boto', 'http_socket_timeout', '10')
|
||||||
|
|
||||||
self._shutting_down = False
|
self._shutting_down = False
|
||||||
|
|
||||||
self._manager_config = None
|
self._manager_config = None
|
||||||
|
@ -90,6 +95,9 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
self._watch_tasks[watch_task_key] = async(watch_future)
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
||||||
|
|
||||||
def _handle_builder_expiration(self, etcd_result):
|
def _handle_builder_expiration(self, etcd_result):
|
||||||
|
if etcd_result is None:
|
||||||
|
return
|
||||||
|
|
||||||
if etcd_result.action == EtcdAction.EXPIRE:
|
if etcd_result.action == EtcdAction.EXPIRE:
|
||||||
# Handle the expiration
|
# Handle the expiration
|
||||||
logger.debug('Builder expired, clean up the old build node')
|
logger.debug('Builder expired, clean up the old build node')
|
||||||
|
@ -100,6 +108,9 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
async(self._executor.stop_builder(job_metadata['builder_id']))
|
async(self._executor.stop_builder(job_metadata['builder_id']))
|
||||||
|
|
||||||
def _handle_realm_change(self, etcd_result):
|
def _handle_realm_change(self, etcd_result):
|
||||||
|
if etcd_result is None:
|
||||||
|
return
|
||||||
|
|
||||||
if etcd_result.action == EtcdAction.CREATE:
|
if etcd_result.action == EtcdAction.CREATE:
|
||||||
# We must listen on the realm created by ourselves or another worker
|
# We must listen on the realm created by ourselves or another worker
|
||||||
realm_spec = json.loads(etcd_result.value)
|
realm_spec = json.loads(etcd_result.value)
|
||||||
|
@ -160,7 +171,8 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
|
||||||
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
|
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
|
||||||
cert=etcd_auth, ca_cert=etcd_ca_cert,
|
cert=etcd_auth, ca_cert=etcd_ca_cert,
|
||||||
protocol=etcd_protocol),
|
protocol=etcd_protocol,
|
||||||
|
read_timeout=5),
|
||||||
executor=self._async_thread_executor)
|
executor=self._async_thread_executor)
|
||||||
|
|
||||||
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
self._etcd_builder_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
||||||
|
@ -201,6 +213,9 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
workers_alive = sum(1 for child in building.children if not child.dir)
|
workers_alive = sum(1 for child in building.children if not child.dir)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
workers_alive = 0
|
workers_alive = 0
|
||||||
|
except etcd.EtcdException:
|
||||||
|
logger.exception('Exception when reading job count from etcd')
|
||||||
|
raise Return(False)
|
||||||
|
|
||||||
logger.debug('Total jobs: %s', workers_alive)
|
logger.debug('Total jobs: %s', workers_alive)
|
||||||
|
|
||||||
|
@ -231,6 +246,9 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
# The job was already taken by someone else, we are probably a retry
|
# The job was already taken by someone else, we are probably a retry
|
||||||
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
|
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
|
||||||
raise Return(False)
|
raise Return(False)
|
||||||
|
except etcd.EtcdException:
|
||||||
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
||||||
|
raise Return(False)
|
||||||
|
|
||||||
logger.debug('Starting builder with executor: %s', self._executor)
|
logger.debug('Starting builder with executor: %s', self._executor)
|
||||||
|
|
||||||
|
@ -255,12 +273,16 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
'builder_id': builder_id,
|
'builder_id': builder_id,
|
||||||
'job_queue_item': build_job.job_item,
|
'job_queue_item': build_job.job_item,
|
||||||
})
|
})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
|
||||||
ttl=ttl))
|
ttl=ttl))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.')
|
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.')
|
||||||
raise Return(False)
|
raise Return(False)
|
||||||
|
except etcd.EtcdException:
|
||||||
|
logger.exception('Exception when writing realm %s to etcd', realm)
|
||||||
|
raise Return(False)
|
||||||
|
|
||||||
raise Return(True)
|
raise Return(True)
|
||||||
|
|
||||||
|
|
|
@ -150,6 +150,7 @@ class BuilderServer(object):
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _work_checker(self):
|
def _work_checker(self):
|
||||||
|
logger.debug('Initializing work checker')
|
||||||
while self._current_status == BuildServerStatus.RUNNING:
|
while self._current_status == BuildServerStatus.RUNNING:
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||||
|
@ -192,10 +193,12 @@ class BuilderServer(object):
|
||||||
def _queue_metrics_updater(self):
|
def _queue_metrics_updater(self):
|
||||||
logger.debug('Initializing queue metrics updater')
|
logger.debug('Initializing queue metrics updater')
|
||||||
while self._current_status == BuildServerStatus.RUNNING:
|
while self._current_status == BuildServerStatus.RUNNING:
|
||||||
yield From(trollius.sleep(30))
|
|
||||||
logger.debug('Writing metrics')
|
logger.debug('Writing metrics')
|
||||||
self._queue.update_metrics()
|
self._queue.update_metrics()
|
||||||
|
|
||||||
|
logger.debug('Metrics going to sleep for 30 seconds')
|
||||||
|
yield From(trollius.sleep(30))
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
|
|
Reference in a new issue