Get the new build system working for enterprise

This commit is contained in:
Joseph Schorr 2014-11-13 19:41:17 -05:00
parent f93c0a46e8
commit 4322b5f81c
8 changed files with 319 additions and 55 deletions

View file

@ -2,11 +2,18 @@ import datetime
import logging import logging
import json import json
import trollius import trollius
import re
from autobahn.wamp.exception import ApplicationError from autobahn.wamp.exception import ApplicationError
from trollius.coroutines import From from trollius.coroutines import From
from buildman.basecomponent import BaseComponent from buildman.basecomponent import BaseComponent
from buildman.buildpack import BuildPackage, BuildPackageException from buildman.buildpack import BuildPackage, BuildPackageException
from buildman.buildstatus import StatusHandler
from buildman.server import BUILD_JOB_RESULT
from buildman.workererror import WorkerError
from data.database import BUILD_PHASE
HEARTBEAT_DELTA = datetime.timedelta(seconds=15) HEARTBEAT_DELTA = datetime.timedelta(seconds=15)
@ -18,10 +25,12 @@ class BuildComponent(BaseComponent):
server_hostname = None server_hostname = None
expected_token = None expected_token = None
builder_realm = None builder_realm = None
last_heartbeat = None
current_phase = 'joining' _last_heartbeat = None
current_job = None _component_status = 'joining'
_current_job = None
_build_status = None
_image_info = None
def __init__(self, config, realm=None, token=None, **kwargs): def __init__(self, config, realm=None, token=None, **kwargs):
self.expected_token = token self.expected_token = token
@ -38,17 +47,17 @@ class BuildComponent(BaseComponent):
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
self._set_phase('waiting') self._set_status('waiting')
def is_ready(self): def is_ready(self):
return self.current_phase == 'running' return self._component_status == 'running'
def start_build(self, build_job): def start_build(self, build_job):
if not self.is_ready(): self._current_job = build_job
return False self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
self._image_info = {}
self.current_job = build_job self._set_status('building')
self._set_phase('building')
# Retrieve the job's buildpack. # Retrieve the job's buildpack.
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key, buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
@ -60,7 +69,7 @@ class BuildComponent(BaseComponent):
buildpack = BuildPackage.from_url(buildpack_url) buildpack = BuildPackage.from_url(buildpack_url)
except BuildPackageException as bpe: except BuildPackageException as bpe:
self._build_failure('Could not retrieve build package', bpe) self._build_failure('Could not retrieve build package', bpe)
return False return
# Extract the base image information from the Dockerfile. # Extract the base image information from the Dockerfile.
parsed_dockerfile = None parsed_dockerfile = None
@ -71,18 +80,22 @@ class BuildComponent(BaseComponent):
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir')) parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
except BuildPackageException as bpe: except BuildPackageException as bpe:
self._build_failure('Could not find Dockerfile in build package', bpe) self._build_failure('Could not find Dockerfile in build package', bpe)
return False return
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag() image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None: if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_failure('Missing FROM line Dockerfile') self._build_failure('Missing FROM line Dockerfile')
return False return
base_image_information = { base_image_information = {
'repository': image_and_tag_tuple[0], 'repository': image_and_tag_tuple[0],
'tag': image_and_tag_tuple[1] 'tag': image_and_tag_tuple[1]
} }
# Extract the number of steps from the Dockerfile.
with self._build_status as status_dict:
status_dict['total_commands'] = len(parsed_dockerfile.commands)
# Add the pull robot information, if any. # Add the pull robot information, if any.
if build_config.get('pull_credentials') is not None: if build_config.get('pull_credentials') is not None:
base_image_information['username'] = build_config['pull_credentials'].get('username', '') base_image_information['username'] = build_config['pull_credentials'].get('username', '')
@ -109,7 +122,7 @@ class BuildComponent(BaseComponent):
'build_package': buildpack_url, 'build_package': buildpack_url,
'sub_directory': build_config.get('build_subdir', ''), 'sub_directory': build_config.get('build_subdir', ''),
'repository': repository_name, 'repository': repository_name,
'registry': self.server_hostname, 'registry': '10.0.2.2:5000' or self.server_hostname,
'pull_token': build_job.repo_build().access_token.code, 'pull_token': build_job.repo_build().access_token.code,
'push_token': build_job.repo_build().access_token.code, 'push_token': build_job.repo_build().access_token.code,
'tag_names': build_config.get('docker_tags', ['latest']), 'tag_names': build_config.get('docker_tags', ['latest']),
@ -121,31 +134,129 @@ class BuildComponent(BaseComponent):
logger.debug('Invoking build: %s', self.builder_realm) logger.debug('Invoking build: %s', self.builder_realm)
logger.debug('With Arguments: %s', build_arguments) logger.debug('With Arguments: %s', build_arguments)
(self.call("io.quay.builder.build", **build_arguments) return (self.call("io.quay.builder.build", **build_arguments)
.add_done_callback(self._build_complete)) .add_done_callback(self._build_complete))
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status['current'] for status in statuses.values()])
total_bytes = sum([status['total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
@staticmethod
def __process_pushpull_status(status_dict, current_phase, docker_data, images):
if not docker_data:
return
num_images = 0
status_completion_key = ''
if current_phase == 'pushing':
status_completion_key = 'push_completion'
num_images = status_dict['total_commands']
elif current_phase == 'pulling':
status_completion_key = 'pull_completion'
elif current_phase == 'priming-cache':
status_completion_key = 'cache_completion'
else:
return
if 'progressDetail' in docker_data and 'id' in docker_data:
image_id = docker_data['id']
detail = docker_data['progressDetail']
if 'current' in detail and 'total' in detail:
images[image_id] = detail
status_dict[status_completion_key] = \
BuildComponent.__total_completion(images, max(len(images), num_images))
def _on_log_message(self, status, json_data):
# Parse any of the JSON data logged.
docker_data = {}
if json_data:
try:
docker_data = json.loads(json_data)
except ValueError:
pass
# Extra the current status message (if any).
fully_unwrapped = ''
keys_to_extract = ['error', 'status', 'stream']
for key in keys_to_extract:
if key in docker_data:
fully_unwrapped = docker_data[key]
break
# Determine if this is a step string.
current_step = None
current_status_string = str(fully_unwrapped.encode('utf-8'))
if current_status_string and status == 'building':
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
if step_increment:
current_step = int(step_increment.group(1))
# Parse and update the phase and the status_dict. The status dictionary contains
# the pull/push progress.
with self._build_status as status_dict:
self._build_status.set_phase(status)
BuildComponent.__process_pushpull_status(status_dict, status, docker_data, self._image_info)
# If the current message is for a step, then update that index.
if current_step:
status_dict['current_command'] = current_step
# If the json data contains an error, then something went wrong with a push or pull.
if 'error' in docker_data:
self._build_status.set_error(docker_data['error'])
# If we are in the building phase, then write out the log.
if status == 'building':
if current_step:
self._build_status.set_command(current_status_string)
else:
self._build_status.append_log(current_status_string)
return True
def _build_failure(self, error_message, exception=None): def _build_failure(self, error_message, exception=None):
# TODO: log this message self._build_status.set_error(error_message, {
print error_message 'internal_error': exception.message if exception else None
print exception })
self._set_phase('running')
build_id = self._current_job.repo_build().uuid
logger.warning('Build %s failed with message: %s', build_id, self._error_message)
# Mark that the build has finished (in an error state)
self._build_finished(BUILD_JOB_RESULT.ERROR)
def _build_complete(self, result): def _build_complete(self, result):
try: try:
status = result.result() # Retrieve the result. This will raise an ApplicationError on any error that occurred.
# TODO: log the success result.result()
print status self._build_finished(BUILD_JOB_RESULT.COMPLETE)
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
except ApplicationError as ae: except ApplicationError as ae:
error_kind = ae.error worker_error = WorkerError(ae.error, ae.kwargs.get('base_error'))
# TODO: log the error
print error_kind # Write the error to the log.
finally: self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
self._set_phase('running')
# Mark the build as completed.
if worker_error.is_internal_error():
self._build_finished(BUILD_JOB_RESULT.INCOMPLETE)
else:
self._build_finished(BUILD_JOB_RESULT.ERROR)
def _build_finished(self, job_status):
self.parent_manager.job_completed(self._current_job, job_status, self)
self._current_job = None
# Set the component back to a running state.
self._set_status('running')
def _on_ready(self, token): def _on_ready(self, token):
if self.current_phase != 'waiting': if self._component_status != 'waiting':
logger.warning('Build component with token %s is already connected', self.expected_token) logger.warning('Build component with token %s is already connected', self.expected_token)
return return
@ -153,7 +264,7 @@ class BuildComponent(BaseComponent):
logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token) logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token)
return return
self._set_phase('running') self._set_status('running')
# Start the heartbeat check. # Start the heartbeat check.
loop = trollius.get_event_loop() loop = trollius.get_event_loop()
@ -161,15 +272,11 @@ class BuildComponent(BaseComponent):
logger.debug('Build worker %s is connected and ready' % self.builder_realm) logger.debug('Build worker %s is connected and ready' % self.builder_realm)
return True return True
def _on_log_message(self, status, json): def _set_status(self, phase):
# TODO: log the message self._component_status = phase
print json
def _set_phase(self, phase):
self.current_phase = phase
def _on_heartbeat(self): def _on_heartbeat(self):
self.last_heartbeat = datetime.datetime.now() self._last_heartbeat = datetime.datetime.now()
def _start_heartbeat_check(self, loop): def _start_heartbeat_check(self, loop):
trollius.set_event_loop(loop) trollius.set_event_loop(loop)
@ -178,33 +285,35 @@ class BuildComponent(BaseComponent):
@trollius.coroutine @trollius.coroutine
def _check_heartbeat(self, loop): def _check_heartbeat(self, loop):
while True: while True:
if self.current_phase != 'running' or self.current_phase != 'building': if self._component_status != 'running' and self._component_status != 'building':
return return
logger.debug('Checking heartbeat on realm %s and build %s', logger.debug('Checking heartbeat on realm %s', self.builder_realm)
self.builder_realm, self.expected_token) if not self._last_heartbeat:
if not self.last_heartbeat:
self._timeout() self._timeout()
return return
if self.last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA: if self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
self._timeout() self._timeout()
return return
yield From(trollius.sleep(5)) yield From(trollius.sleep(5))
def _timeout(self): def _timeout(self):
self._set_phase('timeout') self._set_status('timeout')
logger.warning('Build component %s timed out', self.expected_token) logger.warning('Build component %s timed out', self.expected_token)
self._dispose(timed_out=True) self._dispose(timed_out=True)
def _dispose(self, timed_out=False): def _dispose(self, timed_out=False):
# If we still have a running job, then it has not completed and we need to tell the parent # If we still have a running job, then it has not completed and we need to tell the parent
# manager. # manager.
if self.current_job is not None: if self._current_job is not None:
self.parent_manager.job_completed(self.current_job, 'incomplete', self) if timed_out:
self.current_job = None self._build_status.set_error('Build worker timed out. Build has been requeued')
self.parent_manager.job_completed(self._current_job, BUILD_JOB_RESULT.INCOMPLETE, self)
self._build_status = None
self._current_job = None
# Unregister the current component so that it cannot be invoked again. # Unregister the current component so that it cannot be invoked again.
self.parent_manager.build_component_disposed(self, timed_out) self.parent_manager.build_component_disposed(self, timed_out)

View file

@ -42,7 +42,7 @@ class BuildJob(object):
cached_tags = set(tags) & set([tag.name for tag in existing_tags]) cached_tags = set(tags) & set([tag.name for tag in existing_tags])
if cached_tags: if cached_tags:
return cached_tags[0] return list(cached_tags)[0]
return None return None

49
buildman/buildstatus.py Normal file
View file

@ -0,0 +1,49 @@
from functools import partial
from data.database import BUILD_PHASE
class StatusHandler(object):
""" Context wrapper for writing status to build logs. """
def __init__(self, build_logs, repository_build):
self._current_phase = None
self._repository_build = repository_build
self._uuid = repository_build.uuid
self._build_logs = build_logs
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
}
# Write the initial status.
self.__exit__(None, None, None)
def _append_log_message(self, log_message, log_type=None, log_data=None):
self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data)
def append_log(self, log_message, extra_data=None):
self._append_log_message(log_message, log_data=extra_data)
def set_command(self, command, extra_data=None):
self._append_log_message(command, self._build_logs.COMMAND, extra_data)
def set_error(self, error_message, extra_data=None):
self.set_phase(BUILD_PHASE.ERROR)
self._append_log_message(error_message, self._build_logs.ERROR, extra_data)
def set_phase(self, phase, extra_data=None):
if phase == self._current_phase:
return
self._current_phase = phase
self._append_log_message(phase, self._build_logs.PHASE, extra_data)
self._repository_build.phase = phase
self._repository_build.save()
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
self._build_logs.set_status(self._uuid, self._status)

View file

@ -60,6 +60,6 @@ class EnterpriseManager(BaseManager):
def job_completed(self, build_job, job_status, build_component): def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(build_job, job_status) self.job_complete_callback(build_job, job_status)
def component_disposed(self, build_component, timed_out): def build_component_disposed(self, build_component, timed_out):
self.build_components.remove(build_component) self.build_components.remove(build_component)

View file

@ -15,10 +15,16 @@ from buildjob import BuildJob, BuildJobLoadException
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 30 WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20 TIMEOUT_PERIOD_MINUTES = 20
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60 RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
class BUILD_JOB_RESULT(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuilderServer(object): class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build """ Server which handles both HTTP and WAMP requests, managing the full state of the build
controller. controller.
@ -101,18 +107,20 @@ class BuilderServer(object):
self._session_factory.remove(component) self._session_factory.remove(component)
def _job_complete(self, build_job, job_status): def _job_complete(self, build_job, job_status):
if job_status == 'incomplete': if job_status == BUILD_JOB_RESULT.INCOMPLETE:
self._queue.incomplete(build_job.job_item(), restore_retry=True) self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
elif job_status == 'error': elif job_status == BUILD_JOB_RESULT.ERROR:
self._queue.incomplete(build_job.job_item(), restore_retry=False) self._queue.incomplete(build_job.job_item(), restore_retry=False)
else: else:
self._queue.complete(job) self._queue.complete(build_job.job_item())
self._job_count = self._job_count - 1 self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count: if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set() self._shutdown_event.set()
# TODO: check for work here?
@trollius.coroutine @trollius.coroutine
def _work_checker(self): def _work_checker(self):
while self._current_status == 'running': while self._current_status == 'running':
@ -135,7 +143,7 @@ class BuilderServer(object):
logger.debug('Build job scheduled. Running: %s', self._job_count) logger.debug('Build job scheduled. Running: %s', self._job_count)
else: else:
logger.debug('All workers are busy. Requeuing.') logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True) self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) yield From(trollius.sleep(WORK_CHECK_TIMEOUT))

87
buildman/workererror.py Normal file
View file

@ -0,0 +1,87 @@
class WorkerError(object):
""" Helper class which represents errors raised by a build worker. """
def __init__(self, error_code, base_message=None):
self._error_code = error_code
self._base_message = base_message
self._error_handlers = {
'io.quay.builder.buildpackissue': {
'message': 'Could not load build package',
'is_internal': True
},
'io.quay.builder.cannotextractbuildpack': {
'message': 'Could not extract the contents of the build package'
},
'io.quay.builder.cannotpullforcache': {
'message': 'Could not pull cached image',
'is_internal': True
},
'io.quay.builder.cannotpullbaseimage': {
'message': 'Could not pull base image',
'show_base_error': True
},
'io.quay.builder.internalerror': {
'message': 'An internal error occurred while building. Please submit a ticket.'
},
'io.quay.builder.buildrunerror': {
'message': 'Could not start the build process',
'is_internal': True
},
'io.quay.builder.builderror': {
'message': 'A build step failed',
'show_base_error': True
},
'io.quay.builder.tagissue': {
'message': 'Could not tag built image',
'is_internal': True
},
'io.quay.builder.pushissue': {
'message': 'Could not push built image',
'show_base_error': True,
'is_internal': True
},
'io.quay.builder.dockerconnecterror': {
'message': 'Could not connect to Docker daemon',
'is_internal': True
},
'io.quay.builder.missingorinvalidargument': {
'message': 'Missing required arguments for builder',
'is_internal': True
}
}
def is_internal_error(self):
handler = self._error_handlers.get(self._error_code)
return handler.get('is_internal', False) if handler else True
def public_message(self):
handler = self._error_handlers.get(self._error_code)
if not handler:
return 'An unknown error occurred'
message = handler['message']
if handler.get('show_base_error', False) and self._base_message:
message = message + ': ' + self._base_message
if handler.get('is_internal', False):
message = message + '\nThe build will be retried shortly'
return message
def extra_data(self):
if self._base_message:
return {
'base_error': self._base_message
}
return {}

View file

@ -876,6 +876,10 @@ i.toggle-icon:hover {
background-color: #f0ad4e; background-color: #f0ad4e;
} }
.phase-icon.priming-cache {
background-color: #ddd;
}
.phase-icon.pushing { .phase-icon.pushing {
background-color: #5cb85c; background-color: #5cb85c;
} }

View file

@ -5678,6 +5678,9 @@ quayApp.directive('buildMessage', function () {
case 'building': case 'building':
return 'Building image from Dockerfile'; return 'Building image from Dockerfile';
case 'priming-cache':
return 'Priming cache for build';
case 'pushing': case 'pushing':
return 'Pushing image built from Dockerfile'; return 'Pushing image built from Dockerfile';
@ -5720,6 +5723,10 @@ quayApp.directive('buildProgress', function () {
return buildInfo.status.push_completion * 100; return buildInfo.status.push_completion * 100;
break; break;
case 'priming-cache':
return buildInfo.status.cache_completion * 100;
break;
case 'complete': case 'complete':
return 100; return 100;
break; break;