Add support to the build system for tracking if/when the build manager crashes and make sure builds are restarted within a few minutes
This commit is contained in:
parent
25b5062bb6
commit
b8e873b00b
13 changed files with 73 additions and 15 deletions
|
@ -258,7 +258,8 @@ class BuildComponent(BaseComponent):
|
|||
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
|
||||
|
||||
# Write the error to the log.
|
||||
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
|
||||
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(),
|
||||
internal_error=worker_error.is_internal_error())
|
||||
|
||||
# Mark the build as completed.
|
||||
if worker_error.is_internal_error():
|
||||
|
@ -326,6 +327,12 @@ class BuildComponent(BaseComponent):
|
|||
with build_status as status_dict:
|
||||
status_dict['heartbeat'] = int(time.time())
|
||||
|
||||
|
||||
# Mark the build item.
|
||||
current_job = self._current_job
|
||||
if current_job is not None:
|
||||
self.parent_manager.job_heartbeat(current_job)
|
||||
|
||||
# Check the heartbeat from the worker.
|
||||
LOGGER.debug('Checking heartbeat on realm %s', self.builder_realm)
|
||||
if not self._last_heartbeat:
|
||||
|
@ -348,7 +355,7 @@ class BuildComponent(BaseComponent):
|
|||
# manager.
|
||||
if self._current_job is not None:
|
||||
if timed_out:
|
||||
self._build_status.set_error('Build worker timed out. Build has been requeued.')
|
||||
self._build_status.set_error('Build worker timed out', internal_error=True)
|
||||
|
||||
self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self)
|
||||
self._build_status = None
|
||||
|
|
|
@ -28,8 +28,11 @@ class StatusHandler(object):
|
|||
def set_command(self, command, extra_data=None):
|
||||
self._append_log_message(command, self._build_logs.COMMAND, extra_data)
|
||||
|
||||
def set_error(self, error_message, extra_data=None):
|
||||
self.set_phase(BUILD_PHASE.ERROR)
|
||||
def set_error(self, error_message, extra_data=None, internal_error=False):
|
||||
self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error else BUILD_PHASE.ERROR)
|
||||
|
||||
extra_data = extra_data or {}
|
||||
extra_data['internal_error'] = internal_error
|
||||
self._append_log_message(error_message, self._build_logs.ERROR, extra_data)
|
||||
|
||||
def set_phase(self, phase, extra_data=None):
|
||||
|
|
|
@ -1,10 +1,23 @@
|
|||
class BaseManager(object):
|
||||
""" Base for all worker managers. """
|
||||
def __init__(self, register_component, unregister_component, job_complete_callback):
|
||||
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
|
||||
job_complete_callback):
|
||||
self.register_component = register_component
|
||||
self.unregister_component = unregister_component
|
||||
self.job_heartbeat_callback = job_heartbeat_callback
|
||||
self.job_complete_callback = job_complete_callback
|
||||
|
||||
def job_heartbeat(self, build_job):
|
||||
""" Method invoked to tell the manager that a job is still running. This method will be called
|
||||
every few minutes. """
|
||||
self.job_heartbeat_callback(build_job)
|
||||
|
||||
def setup_time(self):
|
||||
""" Returns the number of seconds that the build system should wait before allowing the job
|
||||
to be picked up again after called 'schedule'.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def shutdown(self):
|
||||
""" Indicates that the build controller server is in a shutdown state and that no new jobs
|
||||
or workers should be performed. Existing workers should be cleaned up once their jobs
|
||||
|
|
|
@ -36,6 +36,11 @@ class EnterpriseManager(BaseManager):
|
|||
# production, build workers in enterprise are long-lived and register dynamically.
|
||||
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
|
||||
|
||||
def setup_time(self):
|
||||
# Builders are already registered, so the setup time should be essentially instant. We therefore
|
||||
# only return a minute here.
|
||||
return 60
|
||||
|
||||
def add_build_component(self):
|
||||
""" Adds a new build component for an Enterprise Registry. """
|
||||
# Generate a new unique realm ID for the build worker.
|
||||
|
|
|
@ -9,14 +9,17 @@ from aiowsgi import create_server as create_wsgi_server
|
|||
from flask import Flask
|
||||
from threading import Event
|
||||
from trollius.coroutines import From
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from buildman.buildjob import BuildJob, BuildJobLoadException
|
||||
from data.queue import WorkQueue
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
WORK_CHECK_TIMEOUT = 10
|
||||
TIMEOUT_PERIOD_MINUTES = 20
|
||||
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
|
||||
JOB_TIMEOUT_SECONDS = 300
|
||||
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
||||
|
||||
class BuildJobResult(object):
|
||||
""" Build job result enum """
|
||||
|
@ -42,6 +45,7 @@ class BuilderServer(object):
|
|||
self._lifecycle_manager = lifecycle_manager_klass(
|
||||
self._register_component,
|
||||
self._unregister_component,
|
||||
self._job_heartbeat,
|
||||
self._job_complete
|
||||
)
|
||||
|
||||
|
@ -107,6 +111,10 @@ class BuilderServer(object):
|
|||
self._current_components.remove(component)
|
||||
self._session_factory.remove(component)
|
||||
|
||||
def _job_heartbeat(self, build_job):
|
||||
WorkQueue.extend_processing(build_job.job_item(), seconds_from_now=JOB_TIMEOUT_SECONDS,
|
||||
retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION)
|
||||
|
||||
def _job_complete(self, build_job, job_status):
|
||||
if job_status == BuildJobResult.INCOMPLETE:
|
||||
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
|
||||
|
@ -126,7 +134,7 @@ class BuilderServer(object):
|
|||
def _work_checker(self):
|
||||
while self._current_status == 'running':
|
||||
LOGGER.debug('Checking for more work')
|
||||
job_item = self._queue.get(processing_time=RESERVATION_SECONDS)
|
||||
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
|
||||
if job_item is None:
|
||||
LOGGER.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||
|
|
|
@ -73,9 +73,6 @@ class WorkerError(object):
|
|||
if handler.get('show_base_error', False) and self._base_message:
|
||||
message = message + ': ' + self._base_message
|
||||
|
||||
if handler.get('is_internal', False):
|
||||
message = message + '\nThe build will be retried shortly'
|
||||
|
||||
return message
|
||||
|
||||
def extra_data(self):
|
||||
|
|
Reference in a new issue