Merge pull request #169 from jzelinskie/TROLLius
buildcomponent: use consistent trollius imports
This commit is contained in:
commit
92d6daa8ad
1 changed files with 28 additions and 25 deletions
|
@ -6,6 +6,7 @@ import trollius
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from autobahn.wamp.exception import ApplicationError
|
from autobahn.wamp.exception import ApplicationError
|
||||||
|
from trollius import From, Return
|
||||||
|
|
||||||
from buildman.server import BuildJobResult
|
from buildman.server import BuildJobResult
|
||||||
from buildman.component.basecomponent import BaseComponent
|
from buildman.component.basecomponent import BaseComponent
|
||||||
|
@ -58,15 +59,15 @@ class BuildComponent(BaseComponent):
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def onJoin(self, details):
|
def onJoin(self, details):
|
||||||
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
|
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
|
||||||
yield trollius.From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
||||||
yield trollius.From(self.register(self._determine_cache_tag,
|
yield From(self.register(self._determine_cache_tag,
|
||||||
u'io.quay.buildworker.determinecachetag'))
|
u'io.quay.buildworker.determinecachetag'))
|
||||||
yield trollius.From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
||||||
|
|
||||||
yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
||||||
yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
||||||
|
|
||||||
yield trollius.From(self._set_status(ComponentStatus.WAITING))
|
yield From(self._set_status(ComponentStatus.WAITING))
|
||||||
|
|
||||||
def is_ready(self):
|
def is_ready(self):
|
||||||
""" Determines whether a build component is ready to begin a build. """
|
""" Determines whether a build component is ready to begin a build. """
|
||||||
|
@ -82,7 +83,7 @@ class BuildComponent(BaseComponent):
|
||||||
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
|
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
|
||||||
self._image_info = {}
|
self._image_info = {}
|
||||||
|
|
||||||
yield trollius.From(self._set_status(ComponentStatus.BUILDING))
|
yield From(self._set_status(ComponentStatus.BUILDING))
|
||||||
|
|
||||||
# Send the notification that the build has started.
|
# Send the notification that the build has started.
|
||||||
build_job.send_notification('build_start')
|
build_job.send_notification('build_start')
|
||||||
|
@ -193,6 +194,7 @@ class BuildComponent(BaseComponent):
|
||||||
status_dict[status_completion_key] = \
|
status_dict[status_completion_key] = \
|
||||||
BuildComponent._total_completion(images, max(len(images), num_images))
|
BuildComponent._total_completion(images, max(len(images), num_images))
|
||||||
|
|
||||||
|
@trollius.coroutine
|
||||||
def _on_log_message(self, phase, json_data):
|
def _on_log_message(self, phase, json_data):
|
||||||
""" Tails log messages and updates the build status. """
|
""" Tails log messages and updates the build status. """
|
||||||
# Update the heartbeat.
|
# Update the heartbeat.
|
||||||
|
@ -254,7 +256,7 @@ class BuildComponent(BaseComponent):
|
||||||
base_image_name, base_image_tag, base_image_id)
|
base_image_name, base_image_tag, base_image_id)
|
||||||
|
|
||||||
tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments)
|
tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments)
|
||||||
raise trollius.Return(tag_found or '')
|
raise Return(tag_found or '')
|
||||||
|
|
||||||
def _build_failure(self, error_message, exception=None):
|
def _build_failure(self, error_message, exception=None):
|
||||||
""" Handles and logs a failed build. """
|
""" Handles and logs a failed build. """
|
||||||
|
@ -312,11 +314,11 @@ class BuildComponent(BaseComponent):
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _build_finished(self, job_status):
|
def _build_finished(self, job_status):
|
||||||
""" Alerts the parent that a build has completed and sets the status back to running. """
|
""" Alerts the parent that a build has completed and sets the status back to running. """
|
||||||
yield trollius.From(self.parent_manager.job_completed(self._current_job, job_status, self))
|
yield From(self.parent_manager.job_completed(self._current_job, job_status, self))
|
||||||
self._current_job = None
|
self._current_job = None
|
||||||
|
|
||||||
# Set the component back to a running state.
|
# Set the component back to a running state.
|
||||||
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
|
yield From(self._set_status(ComponentStatus.RUNNING))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _ping():
|
def _ping():
|
||||||
|
@ -331,32 +333,33 @@ class BuildComponent(BaseComponent):
|
||||||
if not version in SUPPORTED_WORKER_VERSIONS:
|
if not version in SUPPORTED_WORKER_VERSIONS:
|
||||||
logger.warning('Build component (token "%s") is running an out-of-date version: %s', token,
|
logger.warning('Build component (token "%s") is running an out-of-date version: %s', token,
|
||||||
version)
|
version)
|
||||||
raise trollius.Return(False)
|
raise Return(False)
|
||||||
|
|
||||||
if self._component_status != ComponentStatus.WAITING:
|
if self._component_status != ComponentStatus.WAITING:
|
||||||
logger.warning('Build component (token "%s") is already connected', self.expected_token)
|
logger.warning('Build component (token "%s") is already connected', self.expected_token)
|
||||||
raise trollius.Return(False)
|
raise Return(False)
|
||||||
|
|
||||||
if token != self.expected_token:
|
if token != self.expected_token:
|
||||||
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token,
|
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token,
|
||||||
token)
|
token)
|
||||||
raise trollius.Return(False)
|
raise Return(False)
|
||||||
|
|
||||||
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
|
yield From(self._set_status(ComponentStatus.RUNNING))
|
||||||
|
|
||||||
# Start the heartbeat check and updating loop.
|
# Start the heartbeat check and updating loop.
|
||||||
loop = trollius.get_event_loop()
|
loop = trollius.get_event_loop()
|
||||||
loop.create_task(self._heartbeat())
|
loop.create_task(self._heartbeat())
|
||||||
logger.debug('Build worker %s is connected and ready', self.builder_realm)
|
logger.debug('Build worker %s is connected and ready', self.builder_realm)
|
||||||
raise trollius.Return(True)
|
raise Return(True)
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _set_status(self, phase):
|
def _set_status(self, phase):
|
||||||
if phase == ComponentStatus.RUNNING:
|
if phase == ComponentStatus.RUNNING:
|
||||||
yield trollius.From(self.parent_manager.build_component_ready(self))
|
yield From(self.parent_manager.build_component_ready(self))
|
||||||
|
|
||||||
self._component_status = phase
|
self._component_status = phase
|
||||||
|
|
||||||
|
@trollius.coroutine
|
||||||
def _on_heartbeat(self):
|
def _on_heartbeat(self):
|
||||||
""" Updates the last known heartbeat. """
|
""" Updates the last known heartbeat. """
|
||||||
if self._component_status == ComponentStatus.TIMED_OUT:
|
if self._component_status == ComponentStatus.TIMED_OUT:
|
||||||
|
@ -371,13 +374,13 @@ class BuildComponent(BaseComponent):
|
||||||
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
||||||
the build system to catch crashes from either end.
|
the build system to catch crashes from either end.
|
||||||
"""
|
"""
|
||||||
yield trollius.From(trollius.sleep(INITIAL_TIMEOUT))
|
yield From(trollius.sleep(INITIAL_TIMEOUT))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# If the component is no longer running or actively building, nothing more to do.
|
# If the component is no longer running or actively building, nothing more to do.
|
||||||
if (self._component_status != ComponentStatus.RUNNING and
|
if (self._component_status != ComponentStatus.RUNNING and
|
||||||
self._component_status != ComponentStatus.BUILDING):
|
self._component_status != ComponentStatus.BUILDING):
|
||||||
raise trollius.Return()
|
raise Return()
|
||||||
|
|
||||||
# If there is an active build, write the heartbeat to its status.
|
# If there is an active build, write the heartbeat to its status.
|
||||||
build_status = self._build_status
|
build_status = self._build_status
|
||||||
|
@ -388,7 +391,7 @@ class BuildComponent(BaseComponent):
|
||||||
# Mark the build item.
|
# Mark the build item.
|
||||||
current_job = self._current_job
|
current_job = self._current_job
|
||||||
if current_job is not None:
|
if current_job is not None:
|
||||||
yield trollius.From(self.parent_manager.job_heartbeat(current_job))
|
yield From(self.parent_manager.job_heartbeat(current_job))
|
||||||
|
|
||||||
# Check the heartbeat from the worker.
|
# Check the heartbeat from the worker.
|
||||||
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
||||||
|
@ -397,20 +400,20 @@ class BuildComponent(BaseComponent):
|
||||||
logger.debug('Heartbeat on realm %s has expired: %s', self.builder_realm,
|
logger.debug('Heartbeat on realm %s has expired: %s', self.builder_realm,
|
||||||
self._last_heartbeat)
|
self._last_heartbeat)
|
||||||
|
|
||||||
yield trollius.From(self._timeout())
|
yield From(self._timeout())
|
||||||
raise trollius.Return()
|
raise Return()
|
||||||
|
|
||||||
logger.debug('Heartbeat on realm %s is valid: %s.', self.builder_realm,
|
logger.debug('Heartbeat on realm %s is valid: %s.', self.builder_realm,
|
||||||
self._last_heartbeat)
|
self._last_heartbeat)
|
||||||
|
|
||||||
yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _timeout(self):
|
def _timeout(self):
|
||||||
if self._component_status == ComponentStatus.TIMED_OUT:
|
if self._component_status == ComponentStatus.TIMED_OUT:
|
||||||
raise trollius.Return()
|
raise Return()
|
||||||
|
|
||||||
yield trollius.From(self._set_status(ComponentStatus.TIMED_OUT))
|
yield From(self._set_status(ComponentStatus.TIMED_OUT))
|
||||||
logger.warning('Build component with realm %s has timed out', self.builder_realm)
|
logger.warning('Build component with realm %s has timed out', self.builder_realm)
|
||||||
|
|
||||||
# If we still have a running job, then it has not completed and we need to tell the parent
|
# If we still have a running job, then it has not completed and we need to tell the parent
|
||||||
|
@ -419,7 +422,7 @@ class BuildComponent(BaseComponent):
|
||||||
self._build_status.set_error('Build worker timed out', internal_error=True,
|
self._build_status.set_error('Build worker timed out', internal_error=True,
|
||||||
requeued=self._current_job.has_retries_remaining())
|
requeued=self._current_job.has_retries_remaining())
|
||||||
|
|
||||||
yield trollius.From(self.parent_manager.job_completed(self._current_job,
|
yield From(self.parent_manager.job_completed(self._current_job,
|
||||||
BuildJobResult.INCOMPLETE,
|
BuildJobResult.INCOMPLETE,
|
||||||
self))
|
self))
|
||||||
self._build_status = None
|
self._build_status = None
|
||||||
|
|
Reference in a new issue