872c135205
Without being static or passing a self parameter, the worker will receive a runtime WAMP error when they attempt to ping during a health check, this marks them unhealthy every single time you attempt a health check.
358 lines
13 KiB
Python
358 lines
13 KiB
Python
import datetime
|
|
import time
|
|
import logging
|
|
import json
|
|
import trollius
|
|
import re
|
|
|
|
from autobahn.wamp.exception import ApplicationError
|
|
from trollius.coroutines import From
|
|
|
|
from buildman.basecomponent import BaseComponent
|
|
from buildman.buildpack import BuildPackage, BuildPackageException
|
|
from buildman.buildstatus import StatusHandler
|
|
from buildman.server import BuildJobResult
|
|
from buildman.workererror import WorkerError
|
|
|
|
from data.database import BUILD_PHASE
|
|
|
|
HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
|
|
HEARTBEAT_TIMEOUT = 10
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
class ComponentStatus(object):
|
|
""" ComponentStatus represents the possible states of a component. """
|
|
JOINING = 'joining'
|
|
WAITING = 'waiting'
|
|
RUNNING = 'running'
|
|
BUILDING = 'building'
|
|
TIMED_OUT = 'timeout'
|
|
|
|
class BuildComponent(BaseComponent):
|
|
""" An application session component which conducts one (or more) builds. """
|
|
def __init__(self, config, realm=None, token=None, **kwargs):
|
|
self.expected_token = token
|
|
self.builder_realm = realm
|
|
|
|
self.parent_manager = None
|
|
self.server_hostname = None
|
|
|
|
self._component_status = ComponentStatus.JOINING
|
|
self._last_heartbeat = None
|
|
self._current_job = None
|
|
self._build_status = None
|
|
self._image_info = None
|
|
|
|
BaseComponent.__init__(self, config, **kwargs)
|
|
|
|
def onConnect(self):
|
|
self.join(self.builder_realm)
|
|
|
|
def onJoin(self, details):
|
|
LOGGER.debug('Registering methods and listeners for component %s', self.builder_realm)
|
|
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
|
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
|
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
|
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
|
|
|
self._set_status(ComponentStatus.WAITING)
|
|
|
|
def is_ready(self):
|
|
""" Determines whether a build component is ready to begin a build. """
|
|
return self._component_status == ComponentStatus.RUNNING
|
|
|
|
def start_build(self, build_job):
|
|
""" Starts a build. """
|
|
self._current_job = build_job
|
|
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
|
self._image_info = {}
|
|
|
|
self._set_status(ComponentStatus.BUILDING)
|
|
|
|
# Retrieve the job's buildpack.
|
|
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
|
requires_cors=False)
|
|
|
|
LOGGER.debug('Retreiving build package: %s', buildpack_url)
|
|
buildpack = None
|
|
try:
|
|
buildpack = BuildPackage.from_url(buildpack_url)
|
|
except BuildPackageException as bpe:
|
|
self._build_failure('Could not retrieve build package', bpe)
|
|
return
|
|
|
|
# Extract the base image information from the Dockerfile.
|
|
parsed_dockerfile = None
|
|
LOGGER.debug('Parsing dockerfile')
|
|
|
|
build_config = build_job.build_config()
|
|
try:
|
|
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
|
|
except BuildPackageException as bpe:
|
|
self._build_failure('Could not find Dockerfile in build package', bpe)
|
|
return
|
|
|
|
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
|
|
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
|
|
self._build_failure('Missing FROM line in Dockerfile')
|
|
return
|
|
|
|
base_image_information = {
|
|
'repository': image_and_tag_tuple[0],
|
|
'tag': image_and_tag_tuple[1]
|
|
}
|
|
|
|
# Extract the number of steps from the Dockerfile.
|
|
with self._build_status as status_dict:
|
|
status_dict['total_commands'] = len(parsed_dockerfile.commands)
|
|
|
|
# Add the pull robot information, if any.
|
|
if build_config.get('pull_credentials') is not None:
|
|
base_image_information['username'] = build_config['pull_credentials'].get('username', '')
|
|
base_image_information['password'] = build_config['pull_credentials'].get('password', '')
|
|
|
|
# Retrieve the repository's fully qualified name.
|
|
repo = build_job.repo_build().repository
|
|
repository_name = repo.namespace_user.username + '/' + repo.name
|
|
|
|
# Parse the build queue item into build arguments.
|
|
# build_package: URL to the build package to download and untar/unzip.
|
|
# sub_directory: The location within the build package of the Dockerfile and the build context.
|
|
# repository: The repository for which this build is occurring.
|
|
# registry: The registry for which this build is occuring (e.g. 'quay.io', 'staging.quay.io').
|
|
# pull_token: The token to use when pulling the cache for building.
|
|
# push_token: The token to use to push the built image.
|
|
# tag_names: The name(s) of the tag(s) for the newly built image.
|
|
# base_image: The image name and credentials to use to conduct the base image pull.
|
|
# repository: The repository to pull.
|
|
# tag: The tag to pull.
|
|
# username: The username for pulling the base image (if any).
|
|
# password: The password for pulling the base image (if any).
|
|
build_arguments = {
|
|
'build_package': buildpack_url,
|
|
'sub_directory': build_config.get('build_subdir', ''),
|
|
'repository': repository_name,
|
|
'registry': self.server_hostname,
|
|
'pull_token': build_job.repo_build().access_token.code,
|
|
'push_token': build_job.repo_build().access_token.code,
|
|
'tag_names': build_config.get('docker_tags', ['latest']),
|
|
'base_image': base_image_information,
|
|
'cached_tag': build_job.determine_cached_tag() or ''
|
|
}
|
|
|
|
# Invoke the build.
|
|
LOGGER.debug('Invoking build: %s', self.builder_realm)
|
|
LOGGER.debug('With Arguments: %s', build_arguments)
|
|
|
|
return (self
|
|
.call("io.quay.builder.build", **build_arguments)
|
|
.add_done_callback(self._build_complete))
|
|
|
|
@staticmethod
|
|
def _total_completion(statuses, total_images):
|
|
""" Returns the current amount completion relative to the total completion of a build. """
|
|
percentage_with_sizes = float(len(statuses.values())) / total_images
|
|
sent_bytes = sum([status['current'] for status in statuses.values()])
|
|
total_bytes = sum([status['total'] for status in statuses.values()])
|
|
return float(sent_bytes) / total_bytes * percentage_with_sizes
|
|
|
|
@staticmethod
|
|
def _process_pushpull_status(status_dict, current_phase, docker_data, images):
|
|
""" Processes the status of a push or pull by updating the provided status_dict and images. """
|
|
if not docker_data:
|
|
return
|
|
|
|
num_images = 0
|
|
status_completion_key = ''
|
|
|
|
if current_phase == 'pushing':
|
|
status_completion_key = 'push_completion'
|
|
num_images = status_dict['total_commands']
|
|
elif current_phase == 'pulling':
|
|
status_completion_key = 'pull_completion'
|
|
elif current_phase == 'priming-cache':
|
|
status_completion_key = 'cache_completion'
|
|
else:
|
|
return
|
|
|
|
if 'progressDetail' in docker_data and 'id' in docker_data:
|
|
image_id = docker_data['id']
|
|
detail = docker_data['progressDetail']
|
|
|
|
if 'current' in detail and 'total' in detail:
|
|
images[image_id] = detail
|
|
status_dict[status_completion_key] = \
|
|
BuildComponent._total_completion(images, max(len(images), num_images))
|
|
|
|
def _on_log_message(self, phase, json_data):
|
|
""" Tails log messages and updates the build status. """
|
|
# Parse any of the JSON data logged.
|
|
docker_data = {}
|
|
if json_data:
|
|
try:
|
|
docker_data = json.loads(json_data)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Extract the current status message (if any).
|
|
fully_unwrapped = ''
|
|
keys_to_extract = ['error', 'status', 'stream']
|
|
for key in keys_to_extract:
|
|
if key in docker_data:
|
|
fully_unwrapped = docker_data[key]
|
|
break
|
|
|
|
# Determine if this is a step string.
|
|
current_step = None
|
|
current_status_string = str(fully_unwrapped.encode('utf-8'))
|
|
|
|
if current_status_string and phase == BUILD_PHASE.BUILDING:
|
|
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
|
|
if step_increment:
|
|
current_step = int(step_increment.group(1))
|
|
|
|
# Parse and update the phase and the status_dict. The status dictionary contains
|
|
# the pull/push progress, as well as the current step index.
|
|
with self._build_status as status_dict:
|
|
if self._build_status.set_phase(phase):
|
|
LOGGER.debug('Build %s has entered a new phase: %s', self.builder_realm, phase)
|
|
|
|
BuildComponent._process_pushpull_status(status_dict, phase, docker_data, self._image_info)
|
|
|
|
# If the current message represents the beginning of a new step, then update the
|
|
# current command index.
|
|
if current_step is not None:
|
|
status_dict['current_command'] = current_step
|
|
|
|
# If the json data contains an error, then something went wrong with a push or pull.
|
|
if 'error' in docker_data:
|
|
self._build_status.set_error(docker_data['error'])
|
|
|
|
if current_step is not None:
|
|
self._build_status.set_command(current_status_string)
|
|
elif phase == BUILD_PHASE.BUILDING:
|
|
self._build_status.append_log(current_status_string)
|
|
|
|
|
|
def _build_failure(self, error_message, exception=None):
|
|
""" Handles and logs a failed build. """
|
|
self._build_status.set_error(error_message, {
|
|
'internal_error': exception.message if exception else None
|
|
})
|
|
|
|
build_id = self._current_job.repo_build().uuid
|
|
LOGGER.warning('Build %s failed with message: %s', build_id, error_message)
|
|
|
|
# Mark that the build has finished (in an error state)
|
|
self._build_finished(BuildJobResult.ERROR)
|
|
|
|
def _build_complete(self, result):
|
|
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
|
|
try:
|
|
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
|
result.result()
|
|
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
|
self._build_finished(BuildJobResult.COMPLETE)
|
|
except ApplicationError as aex:
|
|
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
|
|
|
|
# Write the error to the log.
|
|
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
|
|
|
|
# Mark the build as completed.
|
|
if worker_error.is_internal_error():
|
|
self._build_finished(BuildJobResult.INCOMPLETE)
|
|
else:
|
|
self._build_finished(BuildJobResult.ERROR)
|
|
|
|
def _build_finished(self, job_status):
|
|
""" Alerts the parent that a build has completed and sets the status back to running. """
|
|
self.parent_manager.job_completed(self._current_job, job_status, self)
|
|
self._current_job = None
|
|
|
|
# Set the component back to a running state.
|
|
self._set_status(ComponentStatus.RUNNING)
|
|
|
|
@staticmethod
|
|
def _ping():
|
|
""" Ping pong. """
|
|
return 'pong'
|
|
|
|
def _on_ready(self, token):
|
|
if self._component_status != 'waiting':
|
|
LOGGER.warning('Build component (token "%s") is already connected', self.expected_token)
|
|
return
|
|
|
|
if token != self.expected_token:
|
|
LOGGER.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token)
|
|
return
|
|
|
|
self._set_status(ComponentStatus.RUNNING)
|
|
|
|
# Start the heartbeat check and updating loop.
|
|
loop = trollius.get_event_loop()
|
|
loop.create_task(self._heartbeat())
|
|
LOGGER.debug('Build worker %s is connected and ready', self.builder_realm)
|
|
return True
|
|
|
|
def _set_status(self, phase):
|
|
self._component_status = phase
|
|
|
|
def _on_heartbeat(self):
|
|
""" Updates the last known heartbeat. """
|
|
self._last_heartbeat = datetime.datetime.now()
|
|
|
|
def _start_heartbeat(self, loop):
|
|
""" Begins an async loop to keep a heartbeat going with a client. """
|
|
trollius.set_event_loop(loop)
|
|
loop.run_until_complete(self._heartbeat())
|
|
|
|
@trollius.coroutine
|
|
def _heartbeat(self):
|
|
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
|
|
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
|
the build system to catch crashes from either end.
|
|
"""
|
|
while True:
|
|
# If the component is no longer running or actively building, nothing more to do.
|
|
if (self._component_status != ComponentStatus.RUNNING and
|
|
self._component_status != ComponentStatus.BUILDING):
|
|
return
|
|
|
|
# If there is an active build, write the heartbeat to its status.
|
|
build_status = self._build_status
|
|
if build_status is not None:
|
|
with build_status as status_dict:
|
|
status_dict['heartbeat'] = int(time.time())
|
|
|
|
# Check the heartbeat from the worker.
|
|
LOGGER.debug('Checking heartbeat on realm %s', self.builder_realm)
|
|
if not self._last_heartbeat:
|
|
self._timeout()
|
|
return
|
|
|
|
if self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
|
|
self._timeout()
|
|
return
|
|
|
|
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
|
|
|
def _timeout(self):
|
|
self._set_status(ComponentStatus.TIMED_OUT)
|
|
LOGGER.warning('Build component (token "%s") timed out', self.expected_token)
|
|
self._dispose(timed_out=True)
|
|
|
|
def _dispose(self, timed_out=False):
|
|
# If we still have a running job, then it has not completed and we need to tell the parent
|
|
# manager.
|
|
if self._current_job is not None:
|
|
if timed_out:
|
|
self._build_status.set_error('Build worker timed out. Build has been requeued.')
|
|
|
|
self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self)
|
|
self._build_status = None
|
|
self._current_job = None
|
|
|
|
# Unregister the current component so that it cannot be invoked again.
|
|
self.parent_manager.build_component_disposed(self, timed_out)
|