Make the redis client use AsyncWrapper and coroutines
Change all log messages to be synchronous
This commit is contained in:
parent
5935e93eb8
commit
f0ef4347e5
8 changed files with 76 additions and 57 deletions
|
@ -1,5 +1,5 @@
|
|||
from functools import partial
|
||||
from trollius import get_event_loop
|
||||
from trollius import get_event_loop, coroutine
|
||||
|
||||
|
||||
class AsyncWrapper(object):
|
||||
|
@ -25,3 +25,8 @@ class AsyncWrapper(object):
|
|||
return self._loop.run_in_executor(self._executor, callable_delegate_attr)
|
||||
|
||||
return wrapper
|
||||
|
||||
@coroutine
|
||||
def __call__(self, *args, **kwargs):
|
||||
callable_delegate_attr = partial(self._delegate, *args, **kwargs)
|
||||
return self._loop.run_in_executor(self._executor, callable_delegate_attr)
|
||||
|
|
|
@ -63,13 +63,11 @@ class BuildComponent(BaseComponent):
|
|||
def onJoin(self, details):
|
||||
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
|
||||
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
||||
yield From(self.register(self._determine_cache_tag,
|
||||
u'io.quay.buildworker.determinecachetag'))
|
||||
yield From(self.register(self._determine_cache_tag, u'io.quay.buildworker.determinecachetag'))
|
||||
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
||||
yield From(self.register(self._on_log_message, u'io.quay.builder.logmessagesynchronously'))
|
||||
|
||||
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
||||
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
||||
yield From(self.register(self._on_log_message_synchronously, 'io.quay.builder.logmessagesynchronously'))
|
||||
yield From(self.subscribe(self._on_heartbeat, u'io.quay.builder.heartbeat'))
|
||||
|
||||
yield From(self._set_status(ComponentStatus.WAITING))
|
||||
|
||||
|
@ -98,7 +96,7 @@ class BuildComponent(BaseComponent):
|
|||
try:
|
||||
build_config = build_job.build_config
|
||||
except BuildJobLoadException as irbe:
|
||||
self._build_failure('Could not load build job information', irbe)
|
||||
yield From(self._build_failure('Could not load build job information', irbe))
|
||||
raise Return()
|
||||
|
||||
base_image_information = {}
|
||||
|
@ -152,7 +150,7 @@ class BuildComponent(BaseComponent):
|
|||
if not build_arguments['build_package'] and not build_arguments['git']:
|
||||
logger.error('%s: insufficient build args: %s',
|
||||
self._current_job.repo_build.uuid, build_arguments)
|
||||
self._build_failure('Insufficient build arguments. No buildpack available.')
|
||||
yield From(self._build_failure('Insufficient build arguments. No buildpack available.'))
|
||||
raise Return()
|
||||
|
||||
# Invoke the build.
|
||||
|
@ -210,6 +208,8 @@ class BuildComponent(BaseComponent):
|
|||
status_dict[status_completion_key] = \
|
||||
BuildComponent._total_completion(images, max(len(images), num_images))
|
||||
|
||||
|
||||
@trollius.coroutine
|
||||
def _on_log_message(self, phase, json_data):
|
||||
""" Tails log messages and updates the build status. """
|
||||
# Update the heartbeat.
|
||||
|
@ -244,16 +244,17 @@ class BuildComponent(BaseComponent):
|
|||
# the pull/push progress, as well as the current step index.
|
||||
with self._build_status as status_dict:
|
||||
try:
|
||||
if self._build_status.set_phase(phase, log_data.get('status_data')):
|
||||
changed_phase = yield From(self._build_status.set_phase(phase, log_data.get('status_data')))
|
||||
if changed_phase:
|
||||
logger.debug('Build %s has entered a new phase: %s', self.builder_realm, phase)
|
||||
elif self._current_job.repo_build.phase == BUILD_PHASE.CANCELLED:
|
||||
build_id = self._current_job.repo_build.uuid
|
||||
logger.debug('Trying to move cancelled build into phase: %s with id: %s', phase, build_id)
|
||||
return False
|
||||
raise Return(False)
|
||||
except InvalidRepositoryBuildException:
|
||||
build_id = self._current_job.repo_build.uuid
|
||||
logger.info('Build %s was not found; repo was probably deleted', build_id)
|
||||
return False
|
||||
raise Return(False)
|
||||
|
||||
BuildComponent._process_pushpull_status(status_dict, phase, log_data, self._image_info)
|
||||
|
||||
|
@ -264,13 +265,13 @@ class BuildComponent(BaseComponent):
|
|||
|
||||
# If the json data contains an error, then something went wrong with a push or pull.
|
||||
if 'error' in log_data:
|
||||
self._build_status.set_error(log_data['error'])
|
||||
yield From(self._build_status.set_error(log_data['error']))
|
||||
|
||||
if current_step is not None:
|
||||
self._build_status.set_command(current_status_string)
|
||||
yield From(self._build_status.set_command(current_status_string))
|
||||
elif phase == BUILD_PHASE.BUILDING:
|
||||
self._build_status.append_log(current_status_string)
|
||||
return True
|
||||
yield From(self._build_status.append_log(current_status_string))
|
||||
raise Return(True)
|
||||
|
||||
@trollius.coroutine
|
||||
def _determine_cache_tag(self, command_comments, base_image_name, base_image_tag, base_image_id):
|
||||
|
@ -283,18 +284,20 @@ class BuildComponent(BaseComponent):
|
|||
tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments)
|
||||
raise Return(tag_found or '')
|
||||
|
||||
@trollius.coroutine
|
||||
def _build_failure(self, error_message, exception=None):
|
||||
""" Handles and logs a failed build. """
|
||||
self._build_status.set_error(error_message, {
|
||||
yield From(self._build_status.set_error(error_message, {
|
||||
'internal_error': str(exception) if exception else None
|
||||
})
|
||||
}))
|
||||
|
||||
build_id = self._current_job.repo_build.uuid
|
||||
logger.warning('Build %s failed with message: %s', build_id, error_message)
|
||||
|
||||
# Mark that the build has finished (in an error state)
|
||||
trollius.async(self._build_finished(BuildJobResult.ERROR))
|
||||
yield From(self._build_finished(BuildJobResult.ERROR))
|
||||
|
||||
@trollius.coroutine
|
||||
def _build_complete(self, result):
|
||||
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
|
||||
build_id = self._current_job.repo_build.uuid
|
||||
|
@ -313,12 +316,12 @@ class BuildComponent(BaseComponent):
|
|||
pass
|
||||
|
||||
try:
|
||||
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
||||
yield From(self._build_status.set_phase(BUILD_PHASE.COMPLETE))
|
||||
except InvalidRepositoryBuildException:
|
||||
logger.info('Build %s was not found; repo was probably deleted', build_id)
|
||||
return
|
||||
raise Return()
|
||||
|
||||
trollius.async(self._build_finished(BuildJobResult.COMPLETE))
|
||||
yield From(self._build_finished(BuildJobResult.COMPLETE))
|
||||
|
||||
# Label the pushed manifests with the build metadata.
|
||||
manifest_digests = kwargs.get('digests') or []
|
||||
|
@ -342,9 +345,10 @@ class BuildComponent(BaseComponent):
|
|||
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
|
||||
|
||||
# Write the error to the log.
|
||||
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(),
|
||||
internal_error=worker_error.is_internal_error(),
|
||||
requeued=self._current_job.has_retries_remaining())
|
||||
yield From(self._build_status.set_error(worker_error.public_message(),
|
||||
worker_error.extra_data(),
|
||||
internal_error=worker_error.is_internal_error(),
|
||||
requeued=self._current_job.has_retries_remaining()))
|
||||
|
||||
# Send the notification that the build has failed.
|
||||
self._current_job.send_notification('build_failure',
|
||||
|
@ -354,10 +358,10 @@ class BuildComponent(BaseComponent):
|
|||
if worker_error.is_internal_error():
|
||||
logger.exception('[BUILD INTERNAL ERROR: Remote] Build ID: %s: %s', build_id,
|
||||
worker_error.public_message())
|
||||
trollius.async(self._build_finished(BuildJobResult.INCOMPLETE))
|
||||
yield From(self._build_finished(BuildJobResult.INCOMPLETE))
|
||||
else:
|
||||
logger.debug('Got remote failure exception for build %s: %s', build_id, aex)
|
||||
trollius.async(self._build_finished(BuildJobResult.ERROR))
|
||||
yield From(self._build_finished(BuildJobResult.ERROR))
|
||||
|
||||
@trollius.coroutine
|
||||
def _build_finished(self, job_status):
|
||||
|
@ -465,8 +469,8 @@ class BuildComponent(BaseComponent):
|
|||
# If we still have a running job, then it has not completed and we need to tell the parent
|
||||
# manager.
|
||||
if self._current_job is not None:
|
||||
self._build_status.set_error('Build worker timed out', internal_error=True,
|
||||
requeued=self._current_job.has_retries_remaining())
|
||||
yield From(self._build_status.set_error('Build worker timed out', internal_error=True,
|
||||
requeued=self._current_job.has_retries_remaining()))
|
||||
|
||||
build_id = self._current_job.build_uuid
|
||||
logger.error('[BUILD INTERNAL ERROR: Timeout] Build ID: %s', build_id)
|
||||
|
@ -477,10 +481,3 @@ class BuildComponent(BaseComponent):
|
|||
|
||||
# Unregister the current component so that it cannot be invoked again.
|
||||
self.parent_manager.build_component_disposed(self, True)
|
||||
|
||||
@trollius.coroutine
|
||||
def _on_log_message_synchronously(self, phase, json_data):
|
||||
""" A method to synchronously update the pushing
|
||||
phase so we don't have cancelled builds trying to be pushed
|
||||
"""
|
||||
raise Return(self._on_log_message(phase, json_data))
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
import datetime
|
||||
import logging
|
||||
|
||||
from redis import RedisError
|
||||
from trollius import From, Return, coroutine
|
||||
|
||||
from data.database import BUILD_PHASE
|
||||
from data import model
|
||||
from buildman.asyncutil import AsyncWrapper
|
||||
|
@ -16,7 +19,9 @@ class StatusHandler(object):
|
|||
self._current_phase = None
|
||||
self._current_command = None
|
||||
self._uuid = repository_build_uuid
|
||||
self._build_logs = build_logs
|
||||
self._build_logs = AsyncWrapper(build_logs)
|
||||
self._sync_build_logs = build_logs
|
||||
self._build_model = AsyncWrapper(model.build)
|
||||
|
||||
self._status = {
|
||||
'total_commands': 0,
|
||||
|
@ -28,50 +33,56 @@ class StatusHandler(object):
|
|||
# Write the initial status.
|
||||
self.__exit__(None, None, None)
|
||||
|
||||
@coroutine
|
||||
def _append_log_message(self, log_message, log_type=None, log_data=None):
|
||||
log_data = log_data or {}
|
||||
log_data['datetime'] = str(datetime.datetime.now())
|
||||
|
||||
try:
|
||||
self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data)
|
||||
yield From(self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data))
|
||||
except RedisError:
|
||||
logger.exception('Could not save build log for build %s: %s', self._uuid, log_message)
|
||||
|
||||
@coroutine
|
||||
def append_log(self, log_message, extra_data=None):
|
||||
if log_message is None:
|
||||
return
|
||||
|
||||
self._append_log_message(log_message, log_data=extra_data)
|
||||
yield From(self._append_log_message(log_message, log_data=extra_data))
|
||||
|
||||
@coroutine
|
||||
def set_command(self, command, extra_data=None):
|
||||
if self._current_command == command:
|
||||
return
|
||||
raise Return()
|
||||
|
||||
self._current_command = command
|
||||
self._append_log_message(command, self._build_logs.COMMAND, extra_data)
|
||||
yield From(self._append_log_message(command, self._build_logs.COMMAND, extra_data))
|
||||
|
||||
@coroutine
|
||||
def set_error(self, error_message, extra_data=None, internal_error=False, requeued=False):
|
||||
self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR)
|
||||
error_phase = BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR
|
||||
yield From(self.set_phase(error_phase))
|
||||
|
||||
extra_data = extra_data or {}
|
||||
extra_data['internal_error'] = internal_error
|
||||
self._append_log_message(error_message, self._build_logs.ERROR, extra_data)
|
||||
yield From(self._append_log_message(error_message, self._build_logs.ERROR, extra_data))
|
||||
|
||||
@coroutine
|
||||
def set_phase(self, phase, extra_data=None):
|
||||
if phase == self._current_phase:
|
||||
return False
|
||||
raise Return(False)
|
||||
|
||||
self._current_phase = phase
|
||||
self._append_log_message(phase, self._build_logs.PHASE, extra_data)
|
||||
yield From(self._append_log_message(phase, self._build_logs.PHASE, extra_data))
|
||||
|
||||
# Update the repository build with the new phase
|
||||
return model.build.update_phase(self._uuid, phase)
|
||||
raise Return(self._build_model.update_phase(self._uuid, phase))
|
||||
|
||||
def __enter__(self):
|
||||
return self._status
|
||||
|
||||
def __exit__(self, exc_type, value, traceback):
|
||||
try:
|
||||
self._build_logs.set_status(self._uuid, self._status)
|
||||
self._sync_build_logs.set_status(self._uuid, self._status)
|
||||
except RedisError:
|
||||
logger.exception('Could not set status of build %s to %s', self._uuid, self._status)
|
||||
|
|
|
@ -58,7 +58,7 @@ class BaseManager(object):
|
|||
@coroutine
|
||||
def job_completed(self, build_job, job_status, build_component):
|
||||
""" Method invoked once a job_item has completed, in some manner. The job_status will be
|
||||
one of: incomplete, error, complete. Implementations of this method should call
|
||||
one of: incomplete, error, complete. Implementations of this method should call coroutine
|
||||
self.job_complete_callback with a status of Incomplete if they wish for the job to be
|
||||
automatically requeued.
|
||||
"""
|
||||
|
|
|
@ -79,7 +79,7 @@ class EnterpriseManager(BaseManager):
|
|||
|
||||
@coroutine
|
||||
def job_completed(self, build_job, job_status, build_component):
|
||||
self.job_complete_callback(build_job, job_status)
|
||||
yield From(self.job_complete_callback(build_job, job_status))
|
||||
|
||||
def build_component_disposed(self, build_component, timed_out):
|
||||
self.all_components.remove(build_component)
|
||||
|
|
|
@ -201,8 +201,8 @@ class EphemeralBuilderManager(BaseManager):
|
|||
if got_lock:
|
||||
logger.error('[BUILD INTERNAL ERROR: etcd %s] Build ID: %s. Exec name: %s. Exec ID: %s',
|
||||
etcd_result.action, build_job.build_uuid, executor_name, execution_id)
|
||||
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
|
||||
update_phase=True)
|
||||
yield From(self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
|
||||
update_phase=True))
|
||||
|
||||
# Finally, we terminate the build execution for the job. We don't do this under a lock as
|
||||
# terminating a node is an atomic operation; better to make sure it is terminated than not.
|
||||
|
@ -585,7 +585,7 @@ class EphemeralBuilderManager(BaseManager):
|
|||
# to ask for the phase to be updated as well.
|
||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
||||
executor_name = build_info.executor_name if build_info else None
|
||||
self.job_complete_callback(build_job, job_status, executor_name, update_phase=False)
|
||||
yield From(self.job_complete_callback(build_job, job_status, executor_name, update_phase=False))
|
||||
|
||||
# Kill the ephemeral builder.
|
||||
yield From(self.kill_builder_executor(build_job.build_uuid))
|
||||
|
|
|
@ -140,6 +140,7 @@ class BuilderServer(object):
|
|||
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
|
||||
minimum_extension=MINIMUM_JOB_EXTENSION)
|
||||
|
||||
@trollius.coroutine
|
||||
def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False):
|
||||
if job_status == BuildJobResult.INCOMPLETE:
|
||||
logger.warning('[BUILD INCOMPLETE: job complete] Build ID: %s. No retry restore.',
|
||||
|
@ -150,7 +151,7 @@ class BuilderServer(object):
|
|||
|
||||
if update_phase:
|
||||
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
|
||||
status_handler.set_phase(RESULT_PHASES[job_status])
|
||||
yield From(status_handler.set_phase(RESULT_PHASES[job_status]))
|
||||
|
||||
self._job_count = self._job_count - 1
|
||||
|
||||
|
@ -200,7 +201,7 @@ class BuilderServer(object):
|
|||
if schedule_success:
|
||||
logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid)
|
||||
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
|
||||
status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)
|
||||
yield From(status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED))
|
||||
|
||||
self._job_count = self._job_count + 1
|
||||
logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid,
|
||||
|
|
|
@ -13,6 +13,7 @@ from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
|
|||
ETCD_MAX_WATCH_TIMEOUT)
|
||||
from buildman.component.buildcomponent import BuildComponent
|
||||
from buildman.server import BuildJobResult
|
||||
from buildman.asyncutil import AsyncWrapper
|
||||
from util.metrics.metricqueue import duration_collector_async
|
||||
from app import metric_queue
|
||||
|
||||
|
@ -134,7 +135,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
self.register_component_callback = Mock()
|
||||
self.unregister_component_callback = Mock()
|
||||
self.job_heartbeat_callback = Mock()
|
||||
self.job_complete_callback = Mock()
|
||||
self.job_complete_callback = AsyncWrapper(Mock())
|
||||
|
||||
self.manager = EphemeralBuilderManager(
|
||||
self.register_component_callback,
|
||||
|
@ -389,8 +390,9 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
|||
|
||||
# Ensure the job was marked as incomplete, with an update_phase to True (so the DB record and
|
||||
# logs are updated as well)
|
||||
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
|
||||
'MockExecutor', update_phase=True)
|
||||
yield From(self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
|
||||
'MockExecutor',
|
||||
update_phase=True))
|
||||
|
||||
@async_test
|
||||
def test_change_worker(self):
|
||||
|
@ -473,7 +475,10 @@ class TestEphemeral(EphemeralBuilderTestCase):
|
|||
|
||||
unregister_component_callback = Mock()
|
||||
job_heartbeat_callback = Mock()
|
||||
job_complete_callback = Mock()
|
||||
|
||||
@coroutine
|
||||
def job_complete_callback(*args, **kwargs):
|
||||
raise Return()
|
||||
|
||||
self.manager = EphemeralBuilderManager(
|
||||
self._register_component,
|
||||
|
|
Reference in a new issue