From f0ef4347e5c49b2453d6ded74b3a3f5fff394435 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 18 Nov 2016 00:04:50 -0500 Subject: [PATCH] Make the redis client use AsyncWrapper and coroutines Change all log messages to be synchronous --- buildman/asyncutil.py | 7 ++- buildman/component/buildcomponent.py | 65 +++++++++++++--------------- buildman/jobutil/buildstatus.py | 33 +++++++++----- buildman/manager/basemanager.py | 2 +- buildman/manager/enterprise.py | 2 +- buildman/manager/ephemeral.py | 6 +-- buildman/server.py | 5 ++- test/test_buildman.py | 13 ++++-- 8 files changed, 76 insertions(+), 57 deletions(-) diff --git a/buildman/asyncutil.py b/buildman/asyncutil.py index 26b16e28a..47966fb46 100644 --- a/buildman/asyncutil.py +++ b/buildman/asyncutil.py @@ -1,5 +1,5 @@ from functools import partial -from trollius import get_event_loop +from trollius import get_event_loop, coroutine class AsyncWrapper(object): @@ -25,3 +25,8 @@ class AsyncWrapper(object): return self._loop.run_in_executor(self._executor, callable_delegate_attr) return wrapper + + @coroutine + def __call__(self, *args, **kwargs): + callable_delegate_attr = partial(self._delegate, *args, **kwargs) + return self._loop.run_in_executor(self._executor, callable_delegate_attr) diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py index affd0d54e..d453949ef 100644 --- a/buildman/component/buildcomponent.py +++ b/buildman/component/buildcomponent.py @@ -63,13 +63,11 @@ class BuildComponent(BaseComponent): def onJoin(self, details): logger.debug('Registering methods and listeners for component %s', self.builder_realm) yield From(self.register(self._on_ready, u'io.quay.buildworker.ready')) - yield From(self.register(self._determine_cache_tag, - u'io.quay.buildworker.determinecachetag')) + yield From(self.register(self._determine_cache_tag, u'io.quay.buildworker.determinecachetag')) yield From(self.register(self._ping, u'io.quay.buildworker.ping')) + yield From(self.register(self._on_log_message, u'io.quay.builder.logmessagesynchronously')) - yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) - yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) - yield From(self.register(self._on_log_message_synchronously, 'io.quay.builder.logmessagesynchronously')) + yield From(self.subscribe(self._on_heartbeat, u'io.quay.builder.heartbeat')) yield From(self._set_status(ComponentStatus.WAITING)) @@ -98,7 +96,7 @@ class BuildComponent(BaseComponent): try: build_config = build_job.build_config except BuildJobLoadException as irbe: - self._build_failure('Could not load build job information', irbe) + yield From(self._build_failure('Could not load build job information', irbe)) raise Return() base_image_information = {} @@ -152,7 +150,7 @@ class BuildComponent(BaseComponent): if not build_arguments['build_package'] and not build_arguments['git']: logger.error('%s: insufficient build args: %s', self._current_job.repo_build.uuid, build_arguments) - self._build_failure('Insufficient build arguments. No buildpack available.') + yield From(self._build_failure('Insufficient build arguments. No buildpack available.')) raise Return() # Invoke the build. @@ -210,6 +208,8 @@ class BuildComponent(BaseComponent): status_dict[status_completion_key] = \ BuildComponent._total_completion(images, max(len(images), num_images)) + + @trollius.coroutine def _on_log_message(self, phase, json_data): """ Tails log messages and updates the build status. """ # Update the heartbeat. @@ -244,16 +244,17 @@ class BuildComponent(BaseComponent): # the pull/push progress, as well as the current step index. with self._build_status as status_dict: try: - if self._build_status.set_phase(phase, log_data.get('status_data')): + changed_phase = yield From(self._build_status.set_phase(phase, log_data.get('status_data'))) + if changed_phase: logger.debug('Build %s has entered a new phase: %s', self.builder_realm, phase) elif self._current_job.repo_build.phase == BUILD_PHASE.CANCELLED: build_id = self._current_job.repo_build.uuid logger.debug('Trying to move cancelled build into phase: %s with id: %s', phase, build_id) - return False + raise Return(False) except InvalidRepositoryBuildException: build_id = self._current_job.repo_build.uuid logger.info('Build %s was not found; repo was probably deleted', build_id) - return False + raise Return(False) BuildComponent._process_pushpull_status(status_dict, phase, log_data, self._image_info) @@ -264,13 +265,13 @@ class BuildComponent(BaseComponent): # If the json data contains an error, then something went wrong with a push or pull. if 'error' in log_data: - self._build_status.set_error(log_data['error']) + yield From(self._build_status.set_error(log_data['error'])) if current_step is not None: - self._build_status.set_command(current_status_string) + yield From(self._build_status.set_command(current_status_string)) elif phase == BUILD_PHASE.BUILDING: - self._build_status.append_log(current_status_string) - return True + yield From(self._build_status.append_log(current_status_string)) + raise Return(True) @trollius.coroutine def _determine_cache_tag(self, command_comments, base_image_name, base_image_tag, base_image_id): @@ -283,18 +284,20 @@ class BuildComponent(BaseComponent): tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments) raise Return(tag_found or '') + @trollius.coroutine def _build_failure(self, error_message, exception=None): """ Handles and logs a failed build. """ - self._build_status.set_error(error_message, { + yield From(self._build_status.set_error(error_message, { 'internal_error': str(exception) if exception else None - }) + })) build_id = self._current_job.repo_build.uuid logger.warning('Build %s failed with message: %s', build_id, error_message) # Mark that the build has finished (in an error state) - trollius.async(self._build_finished(BuildJobResult.ERROR)) + yield From(self._build_finished(BuildJobResult.ERROR)) + @trollius.coroutine def _build_complete(self, result): """ Wraps up a completed build. Handles any errors and calls self._build_finished. """ build_id = self._current_job.repo_build.uuid @@ -313,12 +316,12 @@ class BuildComponent(BaseComponent): pass try: - self._build_status.set_phase(BUILD_PHASE.COMPLETE) + yield From(self._build_status.set_phase(BUILD_PHASE.COMPLETE)) except InvalidRepositoryBuildException: logger.info('Build %s was not found; repo was probably deleted', build_id) - return + raise Return() - trollius.async(self._build_finished(BuildJobResult.COMPLETE)) + yield From(self._build_finished(BuildJobResult.COMPLETE)) # Label the pushed manifests with the build metadata. manifest_digests = kwargs.get('digests') or [] @@ -342,9 +345,10 @@ class BuildComponent(BaseComponent): worker_error = WorkerError(aex.error, aex.kwargs.get('base_error')) # Write the error to the log. - self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(), - internal_error=worker_error.is_internal_error(), - requeued=self._current_job.has_retries_remaining()) + yield From(self._build_status.set_error(worker_error.public_message(), + worker_error.extra_data(), + internal_error=worker_error.is_internal_error(), + requeued=self._current_job.has_retries_remaining())) # Send the notification that the build has failed. self._current_job.send_notification('build_failure', @@ -354,10 +358,10 @@ class BuildComponent(BaseComponent): if worker_error.is_internal_error(): logger.exception('[BUILD INTERNAL ERROR: Remote] Build ID: %s: %s', build_id, worker_error.public_message()) - trollius.async(self._build_finished(BuildJobResult.INCOMPLETE)) + yield From(self._build_finished(BuildJobResult.INCOMPLETE)) else: logger.debug('Got remote failure exception for build %s: %s', build_id, aex) - trollius.async(self._build_finished(BuildJobResult.ERROR)) + yield From(self._build_finished(BuildJobResult.ERROR)) @trollius.coroutine def _build_finished(self, job_status): @@ -465,8 +469,8 @@ class BuildComponent(BaseComponent): # If we still have a running job, then it has not completed and we need to tell the parent # manager. if self._current_job is not None: - self._build_status.set_error('Build worker timed out', internal_error=True, - requeued=self._current_job.has_retries_remaining()) + yield From(self._build_status.set_error('Build worker timed out', internal_error=True, + requeued=self._current_job.has_retries_remaining())) build_id = self._current_job.build_uuid logger.error('[BUILD INTERNAL ERROR: Timeout] Build ID: %s', build_id) @@ -477,10 +481,3 @@ class BuildComponent(BaseComponent): # Unregister the current component so that it cannot be invoked again. self.parent_manager.build_component_disposed(self, True) - - @trollius.coroutine - def _on_log_message_synchronously(self, phase, json_data): - """ A method to synchronously update the pushing - phase so we don't have cancelled builds trying to be pushed - """ - raise Return(self._on_log_message(phase, json_data)) diff --git a/buildman/jobutil/buildstatus.py b/buildman/jobutil/buildstatus.py index 5dc40583f..b862c75c6 100644 --- a/buildman/jobutil/buildstatus.py +++ b/buildman/jobutil/buildstatus.py @@ -1,6 +1,9 @@ import datetime import logging +from redis import RedisError +from trollius import From, Return, coroutine + from data.database import BUILD_PHASE from data import model from buildman.asyncutil import AsyncWrapper @@ -16,7 +19,9 @@ class StatusHandler(object): self._current_phase = None self._current_command = None self._uuid = repository_build_uuid - self._build_logs = build_logs + self._build_logs = AsyncWrapper(build_logs) + self._sync_build_logs = build_logs + self._build_model = AsyncWrapper(model.build) self._status = { 'total_commands': 0, @@ -28,50 +33,56 @@ class StatusHandler(object): # Write the initial status. self.__exit__(None, None, None) + @coroutine def _append_log_message(self, log_message, log_type=None, log_data=None): log_data = log_data or {} log_data['datetime'] = str(datetime.datetime.now()) try: - self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data) + yield From(self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data)) except RedisError: logger.exception('Could not save build log for build %s: %s', self._uuid, log_message) + @coroutine def append_log(self, log_message, extra_data=None): if log_message is None: return - self._append_log_message(log_message, log_data=extra_data) + yield From(self._append_log_message(log_message, log_data=extra_data)) + @coroutine def set_command(self, command, extra_data=None): if self._current_command == command: - return + raise Return() self._current_command = command - self._append_log_message(command, self._build_logs.COMMAND, extra_data) + yield From(self._append_log_message(command, self._build_logs.COMMAND, extra_data)) + @coroutine def set_error(self, error_message, extra_data=None, internal_error=False, requeued=False): - self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR) + error_phase = BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR + yield From(self.set_phase(error_phase)) extra_data = extra_data or {} extra_data['internal_error'] = internal_error - self._append_log_message(error_message, self._build_logs.ERROR, extra_data) + yield From(self._append_log_message(error_message, self._build_logs.ERROR, extra_data)) + @coroutine def set_phase(self, phase, extra_data=None): if phase == self._current_phase: - return False + raise Return(False) self._current_phase = phase - self._append_log_message(phase, self._build_logs.PHASE, extra_data) + yield From(self._append_log_message(phase, self._build_logs.PHASE, extra_data)) # Update the repository build with the new phase - return model.build.update_phase(self._uuid, phase) + raise Return(self._build_model.update_phase(self._uuid, phase)) def __enter__(self): return self._status def __exit__(self, exc_type, value, traceback): try: - self._build_logs.set_status(self._uuid, self._status) + self._sync_build_logs.set_status(self._uuid, self._status) except RedisError: logger.exception('Could not set status of build %s to %s', self._uuid, self._status) diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py index 62514f072..23627830a 100644 --- a/buildman/manager/basemanager.py +++ b/buildman/manager/basemanager.py @@ -58,7 +58,7 @@ class BaseManager(object): @coroutine def job_completed(self, build_job, job_status, build_component): """ Method invoked once a job_item has completed, in some manner. The job_status will be - one of: incomplete, error, complete. Implementations of this method should call + one of: incomplete, error, complete. Implementations of this method should call coroutine self.job_complete_callback with a status of Incomplete if they wish for the job to be automatically requeued. """ diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index 4a17bfc57..3d32a61d0 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -79,7 +79,7 @@ class EnterpriseManager(BaseManager): @coroutine def job_completed(self, build_job, job_status, build_component): - self.job_complete_callback(build_job, job_status) + yield From(self.job_complete_callback(build_job, job_status)) def build_component_disposed(self, build_component, timed_out): self.all_components.remove(build_component) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 5071b9b07..58b72fb13 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -201,8 +201,8 @@ class EphemeralBuilderManager(BaseManager): if got_lock: logger.error('[BUILD INTERNAL ERROR: etcd %s] Build ID: %s. Exec name: %s. Exec ID: %s', etcd_result.action, build_job.build_uuid, executor_name, execution_id) - self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name, - update_phase=True) + yield From(self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name, + update_phase=True)) # Finally, we terminate the build execution for the job. We don't do this under a lock as # terminating a node is an atomic operation; better to make sure it is terminated than not. @@ -585,7 +585,7 @@ class EphemeralBuilderManager(BaseManager): # to ask for the phase to be updated as well. build_info = self._build_uuid_to_info.get(build_job.build_uuid, None) executor_name = build_info.executor_name if build_info else None - self.job_complete_callback(build_job, job_status, executor_name, update_phase=False) + yield From(self.job_complete_callback(build_job, job_status, executor_name, update_phase=False)) # Kill the ephemeral builder. yield From(self.kill_builder_executor(build_job.build_uuid)) diff --git a/buildman/server.py b/buildman/server.py index 0ab40b57c..2ad5be99d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -140,6 +140,7 @@ class BuilderServer(object): self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, minimum_extension=MINIMUM_JOB_EXTENSION) + @trollius.coroutine def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False): if job_status == BuildJobResult.INCOMPLETE: logger.warning('[BUILD INCOMPLETE: job complete] Build ID: %s. No retry restore.', @@ -150,7 +151,7 @@ class BuilderServer(object): if update_phase: status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - status_handler.set_phase(RESULT_PHASES[job_status]) + yield From(status_handler.set_phase(RESULT_PHASES[job_status])) self._job_count = self._job_count - 1 @@ -200,7 +201,7 @@ class BuilderServer(object): if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) - status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED) + yield From(status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)) self._job_count = self._job_count + 1 logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, diff --git a/test/test_buildman.py b/test/test_buildman.py index 77cc825c9..6ec466421 100644 --- a/test/test_buildman.py +++ b/test/test_buildman.py @@ -13,6 +13,7 @@ from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction, ETCD_MAX_WATCH_TIMEOUT) from buildman.component.buildcomponent import BuildComponent from buildman.server import BuildJobResult +from buildman.asyncutil import AsyncWrapper from util.metrics.metricqueue import duration_collector_async from app import metric_queue @@ -134,7 +135,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): self.register_component_callback = Mock() self.unregister_component_callback = Mock() self.job_heartbeat_callback = Mock() - self.job_complete_callback = Mock() + self.job_complete_callback = AsyncWrapper(Mock()) self.manager = EphemeralBuilderManager( self.register_component_callback, @@ -389,8 +390,9 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase): # Ensure the job was marked as incomplete, with an update_phase to True (so the DB record and # logs are updated as well) - self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE, - 'MockExecutor', update_phase=True) + yield From(self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE, + 'MockExecutor', + update_phase=True)) @async_test def test_change_worker(self): @@ -473,7 +475,10 @@ class TestEphemeral(EphemeralBuilderTestCase): unregister_component_callback = Mock() job_heartbeat_callback = Mock() - job_complete_callback = Mock() + + @coroutine + def job_complete_callback(*args, **kwargs): + raise Return() self.manager = EphemeralBuilderManager( self._register_component,