290c8abeb5
Enterprises use "" for tokens. This was confusing to read in the logs without making things more clear by adding quotes around the value.
357 lines
13 KiB
Python
357 lines
13 KiB
Python
import datetime
|
|
import time
|
|
import logging
|
|
import json
|
|
import trollius
|
|
import re
|
|
|
|
from autobahn.wamp.exception import ApplicationError
|
|
from trollius.coroutines import From
|
|
|
|
from buildman.basecomponent import BaseComponent
|
|
from buildman.buildpack import BuildPackage, BuildPackageException
|
|
from buildman.buildstatus import StatusHandler
|
|
from buildman.server import BuildJobResult
|
|
from buildman.workererror import WorkerError
|
|
|
|
from data.database import BUILD_PHASE
|
|
|
|
HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
|
|
HEARTBEAT_TIMEOUT = 10
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
class ComponentStatus(object):
|
|
""" ComponentStatus represents the possible states of a component. """
|
|
JOINING = 'joining'
|
|
WAITING = 'waiting'
|
|
RUNNING = 'running'
|
|
BUILDING = 'building'
|
|
TIMED_OUT = 'timeout'
|
|
|
|
class BuildComponent(BaseComponent):
|
|
""" An application session component which conducts one (or more) builds. """
|
|
def __init__(self, config, realm=None, token=None, **kwargs):
|
|
self.expected_token = token
|
|
self.builder_realm = realm
|
|
|
|
self.parent_manager = None
|
|
self.server_hostname = None
|
|
|
|
self._component_status = ComponentStatus.JOINING
|
|
self._last_heartbeat = None
|
|
self._current_job = None
|
|
self._build_status = None
|
|
self._image_info = None
|
|
|
|
BaseComponent.__init__(self, config, **kwargs)
|
|
|
|
def onConnect(self):
|
|
self.join(self.builder_realm)
|
|
|
|
def onJoin(self, details):
|
|
LOGGER.debug('Registering methods and listeners for component %s', self.builder_realm)
|
|
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
|
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
|
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
|
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
|
|
|
self._set_status(ComponentStatus.WAITING)
|
|
|
|
def is_ready(self):
|
|
""" Determines whether a build component is ready to begin a build. """
|
|
return self._component_status == ComponentStatus.RUNNING
|
|
|
|
def start_build(self, build_job):
|
|
""" Starts a build. """
|
|
self._current_job = build_job
|
|
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
|
self._image_info = {}
|
|
|
|
self._set_status(ComponentStatus.BUILDING)
|
|
|
|
# Retrieve the job's buildpack.
|
|
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
|
requires_cors=False)
|
|
|
|
LOGGER.debug('Retreiving build package: %s', buildpack_url)
|
|
buildpack = None
|
|
try:
|
|
buildpack = BuildPackage.from_url(buildpack_url)
|
|
except BuildPackageException as bpe:
|
|
self._build_failure('Could not retrieve build package', bpe)
|
|
return
|
|
|
|
# Extract the base image information from the Dockerfile.
|
|
parsed_dockerfile = None
|
|
LOGGER.debug('Parsing dockerfile')
|
|
|
|
build_config = build_job.build_config()
|
|
try:
|
|
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
|
|
except BuildPackageException as bpe:
|
|
self._build_failure('Could not find Dockerfile in build package', bpe)
|
|
return
|
|
|
|
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
|
|
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
|
|
self._build_failure('Missing FROM line in Dockerfile')
|
|
return
|
|
|
|
base_image_information = {
|
|
'repository': image_and_tag_tuple[0],
|
|
'tag': image_and_tag_tuple[1]
|
|
}
|
|
|
|
# Extract the number of steps from the Dockerfile.
|
|
with self._build_status as status_dict:
|
|
status_dict['total_commands'] = len(parsed_dockerfile.commands)
|
|
|
|
# Add the pull robot information, if any.
|
|
if build_config.get('pull_credentials') is not None:
|
|
base_image_information['username'] = build_config['pull_credentials'].get('username', '')
|
|
base_image_information['password'] = build_config['pull_credentials'].get('password', '')
|
|
|
|
# Retrieve the repository's fully qualified name.
|
|
repo = build_job.repo_build().repository
|
|
repository_name = repo.namespace_user.username + '/' + repo.name
|
|
|
|
# Parse the build queue item into build arguments.
|
|
# build_package: URL to the build package to download and untar/unzip.
|
|
# sub_directory: The location within the build package of the Dockerfile and the build context.
|
|
# repository: The repository for which this build is occurring.
|
|
# registry: The registry for which this build is occuring (e.g. 'quay.io', 'staging.quay.io').
|
|
# pull_token: The token to use when pulling the cache for building.
|
|
# push_token: The token to use to push the built image.
|
|
# tag_names: The name(s) of the tag(s) for the newly built image.
|
|
# base_image: The image name and credentials to use to conduct the base image pull.
|
|
# repository: The repository to pull.
|
|
# tag: The tag to pull.
|
|
# username: The username for pulling the base image (if any).
|
|
# password: The password for pulling the base image (if any).
|
|
build_arguments = {
|
|
'build_package': buildpack_url,
|
|
'sub_directory': build_config.get('build_subdir', ''),
|
|
'repository': repository_name,
|
|
'registry': self.server_hostname,
|
|
'pull_token': build_job.repo_build().access_token.code,
|
|
'push_token': build_job.repo_build().access_token.code,
|
|
'tag_names': build_config.get('docker_tags', ['latest']),
|
|
'base_image': base_image_information,
|
|
'cached_tag': build_job.determine_cached_tag() or ''
|
|
}
|
|
|
|
# Invoke the build.
|
|
LOGGER.debug('Invoking build: %s', self.builder_realm)
|
|
LOGGER.debug('With Arguments: %s', build_arguments)
|
|
|
|
return (self
|
|
.call("io.quay.builder.build", **build_arguments)
|
|
.add_done_callback(self._build_complete))
|
|
|
|
@staticmethod
|
|
def _total_completion(statuses, total_images):
|
|
""" Returns the current amount completion relative to the total completion of a build. """
|
|
percentage_with_sizes = float(len(statuses.values())) / total_images
|
|
sent_bytes = sum([status['current'] for status in statuses.values()])
|
|
total_bytes = sum([status['total'] for status in statuses.values()])
|
|
return float(sent_bytes) / total_bytes * percentage_with_sizes
|
|
|
|
@staticmethod
|
|
def _process_pushpull_status(status_dict, current_phase, docker_data, images):
|
|
""" Processes the status of a push or pull by updating the provided status_dict and images. """
|
|
if not docker_data:
|
|
return
|
|
|
|
num_images = 0
|
|
status_completion_key = ''
|
|
|
|
if current_phase == 'pushing':
|
|
status_completion_key = 'push_completion'
|
|
num_images = status_dict['total_commands']
|
|
elif current_phase == 'pulling':
|
|
status_completion_key = 'pull_completion'
|
|
elif current_phase == 'priming-cache':
|
|
status_completion_key = 'cache_completion'
|
|
else:
|
|
return
|
|
|
|
if 'progressDetail' in docker_data and 'id' in docker_data:
|
|
image_id = docker_data['id']
|
|
detail = docker_data['progressDetail']
|
|
|
|
if 'current' in detail and 'total' in detail:
|
|
images[image_id] = detail
|
|
status_dict[status_completion_key] = \
|
|
BuildComponent._total_completion(images, max(len(images), num_images))
|
|
|
|
def _on_log_message(self, phase, json_data):
|
|
""" Tails log messages and updates the build status. """
|
|
# Parse any of the JSON data logged.
|
|
docker_data = {}
|
|
if json_data:
|
|
try:
|
|
docker_data = json.loads(json_data)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Extract the current status message (if any).
|
|
fully_unwrapped = ''
|
|
keys_to_extract = ['error', 'status', 'stream']
|
|
for key in keys_to_extract:
|
|
if key in docker_data:
|
|
fully_unwrapped = docker_data[key]
|
|
break
|
|
|
|
# Determine if this is a step string.
|
|
current_step = None
|
|
current_status_string = str(fully_unwrapped.encode('utf-8'))
|
|
|
|
if current_status_string and phase == BUILD_PHASE.BUILDING:
|
|
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
|
|
if step_increment:
|
|
current_step = int(step_increment.group(1))
|
|
|
|
# Parse and update the phase and the status_dict. The status dictionary contains
|
|
# the pull/push progress, as well as the current step index.
|
|
with self._build_status as status_dict:
|
|
if self._build_status.set_phase(phase):
|
|
LOGGER.debug('Build %s has entered a new phase: %s', self.builder_realm, phase)
|
|
|
|
BuildComponent._process_pushpull_status(status_dict, phase, docker_data, self._image_info)
|
|
|
|
# If the current message represents the beginning of a new step, then update the
|
|
# current command index.
|
|
if current_step is not None:
|
|
status_dict['current_command'] = current_step
|
|
|
|
# If the json data contains an error, then something went wrong with a push or pull.
|
|
if 'error' in docker_data:
|
|
self._build_status.set_error(docker_data['error'])
|
|
|
|
if current_step is not None:
|
|
self._build_status.set_command(current_status_string)
|
|
elif phase == BUILD_PHASE.BUILDING:
|
|
self._build_status.append_log(current_status_string)
|
|
|
|
|
|
def _build_failure(self, error_message, exception=None):
|
|
""" Handles and logs a failed build. """
|
|
self._build_status.set_error(error_message, {
|
|
'internal_error': exception.message if exception else None
|
|
})
|
|
|
|
build_id = self._current_job.repo_build().uuid
|
|
LOGGER.warning('Build %s failed with message: %s', build_id, error_message)
|
|
|
|
# Mark that the build has finished (in an error state)
|
|
self._build_finished(BuildJobResult.ERROR)
|
|
|
|
def _build_complete(self, result):
|
|
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
|
|
try:
|
|
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
|
result.result()
|
|
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
|
self._build_finished(BuildJobResult.COMPLETE)
|
|
except ApplicationError as aex:
|
|
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
|
|
|
|
# Write the error to the log.
|
|
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
|
|
|
|
# Mark the build as completed.
|
|
if worker_error.is_internal_error():
|
|
self._build_finished(BuildJobResult.INCOMPLETE)
|
|
else:
|
|
self._build_finished(BuildJobResult.ERROR)
|
|
|
|
def _build_finished(self, job_status):
|
|
""" Alerts the parent that a build has completed and sets the status back to running. """
|
|
self.parent_manager.job_completed(self._current_job, job_status, self)
|
|
self._current_job = None
|
|
|
|
# Set the component back to a running state.
|
|
self._set_status(ComponentStatus.RUNNING)
|
|
|
|
def _ping():
|
|
""" Ping pong. """
|
|
return 'pong'
|
|
|
|
def _on_ready(self, token):
|
|
if self._component_status != 'waiting':
|
|
LOGGER.warning('Build component (token "%s") is already connected', self.expected_token)
|
|
return
|
|
|
|
if token != self.expected_token:
|
|
LOGGER.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token)
|
|
return
|
|
|
|
self._set_status(ComponentStatus.RUNNING)
|
|
|
|
# Start the heartbeat check and updating loop.
|
|
loop = trollius.get_event_loop()
|
|
loop.create_task(self._heartbeat())
|
|
LOGGER.debug('Build worker %s is connected and ready', self.builder_realm)
|
|
return True
|
|
|
|
def _set_status(self, phase):
|
|
self._component_status = phase
|
|
|
|
def _on_heartbeat(self):
|
|
""" Updates the last known heartbeat. """
|
|
self._last_heartbeat = datetime.datetime.now()
|
|
|
|
def _start_heartbeat(self, loop):
|
|
""" Begins an async loop to keep a heartbeat going with a client. """
|
|
trollius.set_event_loop(loop)
|
|
loop.run_until_complete(self._heartbeat())
|
|
|
|
@trollius.coroutine
|
|
def _heartbeat(self):
|
|
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
|
|
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
|
the build system to catch crashes from either end.
|
|
"""
|
|
while True:
|
|
# If the component is no longer running or actively building, nothing more to do.
|
|
if (self._component_status != ComponentStatus.RUNNING and
|
|
self._component_status != ComponentStatus.BUILDING):
|
|
return
|
|
|
|
# If there is an active build, write the heartbeat to its status.
|
|
build_status = self._build_status
|
|
if build_status is not None:
|
|
with build_status as status_dict:
|
|
status_dict['heartbeat'] = int(time.time())
|
|
|
|
# Check the heartbeat from the worker.
|
|
LOGGER.debug('Checking heartbeat on realm %s', self.builder_realm)
|
|
if not self._last_heartbeat:
|
|
self._timeout()
|
|
return
|
|
|
|
if self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
|
|
self._timeout()
|
|
return
|
|
|
|
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
|
|
|
def _timeout(self):
|
|
self._set_status(ComponentStatus.TIMED_OUT)
|
|
LOGGER.warning('Build component (token "%s") timed out', self.expected_token)
|
|
self._dispose(timed_out=True)
|
|
|
|
def _dispose(self, timed_out=False):
|
|
# If we still have a running job, then it has not completed and we need to tell the parent
|
|
# manager.
|
|
if self._current_job is not None:
|
|
if timed_out:
|
|
self._build_status.set_error('Build worker timed out. Build has been requeued.')
|
|
|
|
self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self)
|
|
self._build_status = None
|
|
self._current_job = None
|
|
|
|
# Unregister the current component so that it cannot be invoked again.
|
|
self.parent_manager.build_component_disposed(self, timed_out)
|