Merge pull request #2143 from jakedt/makebuildmanasyncagain

Make buildman async again
This commit is contained in:
Jimmy Zelinskie 2016-11-21 15:08:06 -05:00 committed by GitHub
commit 498d7fc15e
8 changed files with 112 additions and 93 deletions

View file

@ -1,5 +1,5 @@
from functools import partial
from trollius import get_event_loop
from trollius import get_event_loop, coroutine
class AsyncWrapper(object):
@ -25,3 +25,8 @@ class AsyncWrapper(object):
return self._loop.run_in_executor(self._executor, callable_delegate_attr)
return wrapper
@coroutine
def __call__(self, *args, **kwargs):
callable_delegate_attr = partial(self._delegate, *args, **kwargs)
return self._loop.run_in_executor(self._executor, callable_delegate_attr)

View file

@ -63,13 +63,11 @@ class BuildComponent(BaseComponent):
def onJoin(self, details):
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield From(self.register(self._determine_cache_tag,
u'io.quay.buildworker.determinecachetag'))
yield From(self.register(self._determine_cache_tag, u'io.quay.buildworker.determinecachetag'))
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield From(self.register(self._on_log_message, u'io.quay.builder.logmessagesynchronously'))
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
yield From(self.register(self._on_log_message_synchronously, 'io.quay.builder.logmessagesynchronously'))
yield From(self.subscribe(self._on_heartbeat, u'io.quay.builder.heartbeat'))
yield From(self._set_status(ComponentStatus.WAITING))
@ -98,7 +96,7 @@ class BuildComponent(BaseComponent):
try:
build_config = build_job.build_config
except BuildJobLoadException as irbe:
self._build_failure('Could not load build job information', irbe)
yield From(self._build_failure('Could not load build job information', irbe))
raise Return()
base_image_information = {}
@ -152,7 +150,7 @@ class BuildComponent(BaseComponent):
if not build_arguments['build_package'] and not build_arguments['git']:
logger.error('%s: insufficient build args: %s',
self._current_job.repo_build.uuid, build_arguments)
self._build_failure('Insufficient build arguments. No buildpack available.')
yield From(self._build_failure('Insufficient build arguments. No buildpack available.'))
raise Return()
# Invoke the build.
@ -210,6 +208,8 @@ class BuildComponent(BaseComponent):
status_dict[status_completion_key] = \
BuildComponent._total_completion(images, max(len(images), num_images))
@trollius.coroutine
def _on_log_message(self, phase, json_data):
""" Tails log messages and updates the build status. """
# Update the heartbeat.
@ -244,16 +244,17 @@ class BuildComponent(BaseComponent):
# the pull/push progress, as well as the current step index.
with self._build_status as status_dict:
try:
if self._build_status.set_phase(phase, log_data.get('status_data')):
changed_phase = yield From(self._build_status.set_phase(phase, log_data.get('status_data')))
if changed_phase:
logger.debug('Build %s has entered a new phase: %s', self.builder_realm, phase)
elif self._current_job.repo_build.phase == BUILD_PHASE.CANCELLED:
build_id = self._current_job.repo_build.uuid
logger.debug('Trying to move cancelled build into phase: %s with id: %s', phase, build_id)
return False
raise Return(False)
except InvalidRepositoryBuildException:
build_id = self._current_job.repo_build.uuid
logger.info('Build %s was not found; repo was probably deleted', build_id)
return False
raise Return(False)
BuildComponent._process_pushpull_status(status_dict, phase, log_data, self._image_info)
@ -264,13 +265,13 @@ class BuildComponent(BaseComponent):
# If the json data contains an error, then something went wrong with a push or pull.
if 'error' in log_data:
self._build_status.set_error(log_data['error'])
yield From(self._build_status.set_error(log_data['error']))
if current_step is not None:
self._build_status.set_command(current_status_string)
yield From(self._build_status.set_command(current_status_string))
elif phase == BUILD_PHASE.BUILDING:
self._build_status.append_log(current_status_string)
return True
yield From(self._build_status.append_log(current_status_string))
raise Return(True)
@trollius.coroutine
def _determine_cache_tag(self, command_comments, base_image_name, base_image_tag, base_image_id):
@ -283,18 +284,20 @@ class BuildComponent(BaseComponent):
tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments)
raise Return(tag_found or '')
@trollius.coroutine
def _build_failure(self, error_message, exception=None):
""" Handles and logs a failed build. """
self._build_status.set_error(error_message, {
yield From(self._build_status.set_error(error_message, {
'internal_error': str(exception) if exception else None
})
}))
build_id = self._current_job.repo_build.uuid
logger.warning('Build %s failed with message: %s', build_id, error_message)
# Mark that the build has finished (in an error state)
trollius.async(self._build_finished(BuildJobResult.ERROR))
yield From(self._build_finished(BuildJobResult.ERROR))
@trollius.coroutine
def _build_complete(self, result):
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
build_id = self._current_job.repo_build.uuid
@ -313,12 +316,12 @@ class BuildComponent(BaseComponent):
pass
try:
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
yield From(self._build_status.set_phase(BUILD_PHASE.COMPLETE))
except InvalidRepositoryBuildException:
logger.info('Build %s was not found; repo was probably deleted', build_id)
return
raise Return()
trollius.async(self._build_finished(BuildJobResult.COMPLETE))
yield From(self._build_finished(BuildJobResult.COMPLETE))
# Label the pushed manifests with the build metadata.
manifest_digests = kwargs.get('digests') or []
@ -342,9 +345,10 @@ class BuildComponent(BaseComponent):
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
# Write the error to the log.
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(),
internal_error=worker_error.is_internal_error(),
requeued=self._current_job.has_retries_remaining())
yield From(self._build_status.set_error(worker_error.public_message(),
worker_error.extra_data(),
internal_error=worker_error.is_internal_error(),
requeued=self._current_job.has_retries_remaining()))
# Send the notification that the build has failed.
self._current_job.send_notification('build_failure',
@ -354,10 +358,10 @@ class BuildComponent(BaseComponent):
if worker_error.is_internal_error():
logger.exception('[BUILD INTERNAL ERROR: Remote] Build ID: %s: %s', build_id,
worker_error.public_message())
trollius.async(self._build_finished(BuildJobResult.INCOMPLETE))
yield From(self._build_finished(BuildJobResult.INCOMPLETE))
else:
logger.debug('Got remote failure exception for build %s: %s', build_id, aex)
trollius.async(self._build_finished(BuildJobResult.ERROR))
yield From(self._build_finished(BuildJobResult.ERROR))
@trollius.coroutine
def _build_finished(self, job_status):
@ -430,9 +434,8 @@ class BuildComponent(BaseComponent):
raise Return()
# If there is an active build, write the heartbeat to its status.
build_status = self._build_status
if build_status is not None:
with build_status as status_dict:
if self._build_status is not None:
with self._build_status as status_dict:
status_dict['heartbeat'] = int(time.time())
# Mark the build item.
@ -466,8 +469,8 @@ class BuildComponent(BaseComponent):
# If we still have a running job, then it has not completed and we need to tell the parent
# manager.
if self._current_job is not None:
self._build_status.set_error('Build worker timed out', internal_error=True,
requeued=self._current_job.has_retries_remaining())
yield From(self._build_status.set_error('Build worker timed out', internal_error=True,
requeued=self._current_job.has_retries_remaining()))
build_id = self._current_job.build_uuid
logger.error('[BUILD INTERNAL ERROR: Timeout] Build ID: %s', build_id)
@ -478,10 +481,3 @@ class BuildComponent(BaseComponent):
# Unregister the current component so that it cannot be invoked again.
self.parent_manager.build_component_disposed(self, True)
@trollius.coroutine
def _on_log_message_synchronously(self, phase, json_data):
""" A method to synchronously update the pushing
phase so we don't have cancelled builds trying to be pushed
"""
raise Return(self._on_log_message(phase, json_data))

View file

@ -1,12 +1,17 @@
from data.database import BUILD_PHASE
from data import model
from redis import RedisError
import datetime
import logging
from redis import RedisError
from trollius import From, Return, coroutine
from data.database import BUILD_PHASE
from data import model
from buildman.asyncutil import AsyncWrapper
logger = logging.getLogger(__name__)
class StatusHandler(object):
""" Context wrapper for writing status to build logs. """
@ -14,62 +19,70 @@ class StatusHandler(object):
self._current_phase = None
self._current_command = None
self._uuid = repository_build_uuid
self._build_logs = build_logs
self._build_logs = AsyncWrapper(build_logs)
self._sync_build_logs = build_logs
self._build_model = AsyncWrapper(model.build)
self._status = {
'total_commands': 0,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
'total_commands': 0,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
}
# Write the initial status.
self.__exit__(None, None, None)
@coroutine
def _append_log_message(self, log_message, log_type=None, log_data=None):
log_data = log_data or {}
log_data['datetime'] = str(datetime.datetime.now())
try:
self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data)
yield From(self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data))
except RedisError:
logger.exception('Could not save build log for build %s: %s', self._uuid, log_message)
@coroutine
def append_log(self, log_message, extra_data=None):
if log_message is None:
return
self._append_log_message(log_message, log_data=extra_data)
yield From(self._append_log_message(log_message, log_data=extra_data))
@coroutine
def set_command(self, command, extra_data=None):
if self._current_command == command:
return
raise Return()
self._current_command = command
self._append_log_message(command, self._build_logs.COMMAND, extra_data)
yield From(self._append_log_message(command, self._build_logs.COMMAND, extra_data))
@coroutine
def set_error(self, error_message, extra_data=None, internal_error=False, requeued=False):
self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR)
error_phase = BUILD_PHASE.INTERNAL_ERROR if internal_error and requeued else BUILD_PHASE.ERROR
yield From(self.set_phase(error_phase))
extra_data = extra_data or {}
extra_data['internal_error'] = internal_error
self._append_log_message(error_message, self._build_logs.ERROR, extra_data)
yield From(self._append_log_message(error_message, self._build_logs.ERROR, extra_data))
@coroutine
def set_phase(self, phase, extra_data=None):
if phase == self._current_phase:
return False
raise Return(False)
self._current_phase = phase
self._append_log_message(phase, self._build_logs.PHASE, extra_data)
yield From(self._append_log_message(phase, self._build_logs.PHASE, extra_data))
# Update the repository build with the new phase
return model.build.update_phase(self._uuid, phase)
raise Return(self._build_model.update_phase(self._uuid, phase))
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
try:
self._build_logs.set_status(self._uuid, self._status)
self._sync_build_logs.set_status(self._uuid, self._status)
except RedisError:
logger.exception('Could not set status of build %s to %s', self._uuid, self._status)

View file

@ -58,7 +58,7 @@ class BaseManager(object):
@coroutine
def job_completed(self, build_job, job_status, build_component):
""" Method invoked once a job_item has completed, in some manner. The job_status will be
one of: incomplete, error, complete. Implementations of this method should call
one of: incomplete, error, complete. Implementations of this method should call coroutine
self.job_complete_callback with a status of Incomplete if they wish for the job to be
automatically requeued.
"""

View file

@ -79,7 +79,7 @@ class EnterpriseManager(BaseManager):
@coroutine
def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(build_job, job_status)
yield From(self.job_complete_callback(build_job, job_status))
def build_component_disposed(self, build_component, timed_out):
self.all_components.remove(build_component)

View file

@ -201,8 +201,8 @@ class EphemeralBuilderManager(BaseManager):
if got_lock:
logger.error('[BUILD INTERNAL ERROR: etcd %s] Build ID: %s. Exec name: %s. Exec ID: %s',
etcd_result.action, build_job.build_uuid, executor_name, execution_id)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
update_phase=True)
yield From(self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE, executor_name,
update_phase=True))
# Finally, we terminate the build execution for the job. We don't do this under a lock as
# terminating a node is an atomic operation; better to make sure it is terminated than not.
@ -585,7 +585,7 @@ class EphemeralBuilderManager(BaseManager):
# to ask for the phase to be updated as well.
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
executor_name = build_info.executor_name if build_info else None
self.job_complete_callback(build_job, job_status, executor_name, update_phase=False)
yield From(self.job_complete_callback(build_job, job_status, executor_name, update_phase=False))
# Kill the ephemeral builder.
yield From(self.kill_builder_executor(build_job.build_uuid))

View file

@ -1,16 +1,15 @@
import logging
import trollius
import json
import trollius
from threading import Event
from datetime import timedelta
from trollius.coroutines import From
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory
from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event
from trollius.coroutines import From
from datetime import timedelta
from buildman.enums import BuildJobResult, BuildServerStatus, RESULT_PHASES
from buildman.jobutil.buildstatus import StatusHandler
@ -46,12 +45,12 @@ class BuilderServer(object):
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
)
self._lifecycle_manager_config = lifecycle_manager_config
@ -141,6 +140,7 @@ class BuilderServer(object):
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION)
@trollius.coroutine
def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False):
if job_status == BuildJobResult.INCOMPLETE:
logger.warning('[BUILD INCOMPLETE: job complete] Build ID: %s. No retry restore.',
@ -151,7 +151,7 @@ class BuilderServer(object):
if update_phase:
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase(RESULT_PHASES[job_status])
yield From(status_handler.set_phase(RESULT_PHASES[job_status]))
self._job_count = self._job_count - 1
@ -201,7 +201,7 @@ class BuilderServer(object):
if schedule_success:
logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid)
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)
yield From(status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED))
self._job_count = self._job_count + 1
logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid,

View file

@ -13,6 +13,7 @@ from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
from buildman.asyncutil import AsyncWrapper
from util.metrics.metricqueue import duration_collector_async
from app import metric_queue
@ -134,16 +135,16 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.job_complete_callback = AsyncWrapper(Mock())
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
self.manager.initialize({'EXECUTOR': 'test'})
@ -389,8 +390,9 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
# Ensure the job was marked as incomplete, with an update_phase to True (so the DB record and
# logs are updated as well)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
'MockExecutor', update_phase=True)
yield From(self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
'MockExecutor',
update_phase=True))
@async_test
def test_change_worker(self):
@ -473,16 +475,19 @@ class TestEphemeral(EphemeralBuilderTestCase):
unregister_component_callback = Mock()
job_heartbeat_callback = Mock()
job_complete_callback = Mock()
@coroutine
def job_complete_callback(*args, **kwargs):
raise Return()
self.manager = EphemeralBuilderManager(
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
self._register_component,
unregister_component_callback,
job_heartbeat_callback,
job_complete_callback,
'127.0.0.1',
30,
etcd_creator=self._create_mock_etcd_client,
)
def tearDown(self):