diff --git a/Dockerfile.web b/Dockerfile.web index 575332a3a..7d693668d 100644 --- a/Dockerfile.web +++ b/Dockerfile.web @@ -52,6 +52,7 @@ ADD conf/init/nginx /etc/service/nginx ADD conf/init/diffsworker /etc/service/diffsworker ADD conf/init/notificationworker /etc/service/notificationworker ADD conf/init/buildlogsarchiver /etc/service/buildlogsarchiver +ADD conf/init/buildmanager /etc/service/buildmanager # Download any external libs. RUN mkdir static/fonts static/ldn diff --git a/buildman/__init__.py b/buildman/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/builder.py b/buildman/builder.py new file mode 100644 index 000000000..d6ba15ea6 --- /dev/null +++ b/buildman/builder.py @@ -0,0 +1,46 @@ +import logging +import os +import features + +from app import app, userfiles as user_files, build_logs, dockerfile_build_queue + +from buildman.manager.enterprise import EnterpriseManager +from buildman.server import BuilderServer + +from trollius import SSLContext + +logger = logging.getLogger(__name__) + +BUILD_MANAGERS = { + 'enterprise': EnterpriseManager +} + +def run_build_manager(): + if not features.BUILD_SUPPORT: + logger.debug('Building is disabled. Please enable the feature flag') + return + + build_manager_config = app.config.get('BUILD_MANAGER') + if build_manager_config is None: + return + + logger.debug('Asking to start build manager with lifecycle "%s"', build_manager_config[0]) + manager_klass = BUILD_MANAGERS.get(build_manager_config[0]) + if manager_klass is None: + return + + logger.debug('Starting build manager with lifecycle "%s"', build_manager_config[0]) + ssl_context = None + if os.environ.get('SSL_CONFIG'): + logger.debug('Loading SSL cert and key') + ssl_context = SSLContext() + ssl_context.load_cert_chain(os.environ.get('SSL_CONFIG') + '/ssl.cert', + os.environ.get('SSL_CONFIG') + '/ssl.key') + + server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs, + user_files, manager_klass) + server.run('0.0.0.0', ssl=ssl_context) + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + run_build_manager() diff --git a/buildman/component/__init__.py b/buildman/component/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/component/basecomponent.py b/buildman/component/basecomponent.py new file mode 100644 index 000000000..47781dff5 --- /dev/null +++ b/buildman/component/basecomponent.py @@ -0,0 +1,10 @@ +from autobahn.asyncio.wamp import ApplicationSession + +class BaseComponent(ApplicationSession): + """ Base class for all registered component sessions in the server. """ + def __init__(self, config, **kwargs): + ApplicationSession.__init__(self, config) + self.server = None + self.parent_manager = None + self.build_logs = None + self.user_files = None diff --git a/buildman/component/buildcomponent.py b/buildman/component/buildcomponent.py new file mode 100644 index 000000000..d518d3453 --- /dev/null +++ b/buildman/component/buildcomponent.py @@ -0,0 +1,365 @@ +import datetime +import time +import logging +import json +import trollius +import re + +from autobahn.wamp.exception import ApplicationError +from trollius.coroutines import From + +from buildman.server import BuildJobResult +from buildman.component.basecomponent import BaseComponent +from buildman.jobutil.buildpack import BuildPackage, BuildPackageException +from buildman.jobutil.buildstatus import StatusHandler +from buildman.jobutil.workererror import WorkerError + +from data.database import BUILD_PHASE + +HEARTBEAT_DELTA = datetime.timedelta(seconds=30) +HEARTBEAT_TIMEOUT = 10 +INITIAL_TIMEOUT = 25 + +SUPPORTED_WORKER_VERSIONS = ['0.1-beta'] + +logger = logging.getLogger(__name__) + +class ComponentStatus(object): + """ ComponentStatus represents the possible states of a component. """ + JOINING = 'joining' + WAITING = 'waiting' + RUNNING = 'running' + BUILDING = 'building' + TIMED_OUT = 'timeout' + +class BuildComponent(BaseComponent): + """ An application session component which conducts one (or more) builds. """ + def __init__(self, config, realm=None, token=None, **kwargs): + self.expected_token = token + self.builder_realm = realm + + self.parent_manager = None + self.server_hostname = None + + self._component_status = ComponentStatus.JOINING + self._last_heartbeat = None + self._current_job = None + self._build_status = None + self._image_info = None + + BaseComponent.__init__(self, config, **kwargs) + + def onConnect(self): + self.join(self.builder_realm) + + def onJoin(self, details): + logger.debug('Registering methods and listeners for component %s', self.builder_realm) + yield From(self.register(self._on_ready, u'io.quay.buildworker.ready')) + yield From(self.register(self._ping, u'io.quay.buildworker.ping')) + yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) + yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) + + self._set_status(ComponentStatus.WAITING) + + def is_ready(self): + """ Determines whether a build component is ready to begin a build. """ + return self._component_status == ComponentStatus.RUNNING + + def start_build(self, build_job): + """ Starts a build. """ + self._current_job = build_job + self._build_status = StatusHandler(self.build_logs, build_job.repo_build()) + self._image_info = {} + + self._set_status(ComponentStatus.BUILDING) + + # Retrieve the job's buildpack. + buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key, + requires_cors=False) + + logger.debug('Retreiving build package: %s', buildpack_url) + buildpack = None + try: + buildpack = BuildPackage.from_url(buildpack_url) + except BuildPackageException as bpe: + self._build_failure('Could not retrieve build package', bpe) + return + + # Extract the base image information from the Dockerfile. + parsed_dockerfile = None + logger.debug('Parsing dockerfile') + + build_config = build_job.build_config() + try: + parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir')) + except BuildPackageException as bpe: + self._build_failure('Could not find Dockerfile in build package', bpe) + return + + image_and_tag_tuple = parsed_dockerfile.get_image_and_tag() + if image_and_tag_tuple is None or image_and_tag_tuple[0] is None: + self._build_failure('Missing FROM line in Dockerfile') + return + + base_image_information = { + 'repository': image_and_tag_tuple[0], + 'tag': image_and_tag_tuple[1] + } + + # Extract the number of steps from the Dockerfile. + with self._build_status as status_dict: + status_dict['total_commands'] = len(parsed_dockerfile.commands) + + # Add the pull robot information, if any. + if build_config.get('pull_credentials') is not None: + base_image_information['username'] = build_config['pull_credentials'].get('username', '') + base_image_information['password'] = build_config['pull_credentials'].get('password', '') + + # Retrieve the repository's fully qualified name. + repo = build_job.repo_build().repository + repository_name = repo.namespace_user.username + '/' + repo.name + + # Parse the build queue item into build arguments. + # build_package: URL to the build package to download and untar/unzip. + # sub_directory: The location within the build package of the Dockerfile and the build context. + # repository: The repository for which this build is occurring. + # registry: The registry for which this build is occuring (e.g. 'quay.io', 'staging.quay.io'). + # pull_token: The token to use when pulling the cache for building. + # push_token: The token to use to push the built image. + # tag_names: The name(s) of the tag(s) for the newly built image. + # base_image: The image name and credentials to use to conduct the base image pull. + # repository: The repository to pull. + # tag: The tag to pull. + # username: The username for pulling the base image (if any). + # password: The password for pulling the base image (if any). + build_arguments = { + 'build_package': buildpack_url, + 'sub_directory': build_config.get('build_subdir', ''), + 'repository': repository_name, + 'registry': self.server_hostname, + 'pull_token': build_job.repo_build().access_token.code, + 'push_token': build_job.repo_build().access_token.code, + 'tag_names': build_config.get('docker_tags', ['latest']), + 'base_image': base_image_information, + 'cached_tag': build_job.determine_cached_tag() or '' + } + + # Invoke the build. + logger.debug('Invoking build: %s', self.builder_realm) + logger.debug('With Arguments: %s', build_arguments) + + return (self + .call("io.quay.builder.build", **build_arguments) + .add_done_callback(self._build_complete)) + + @staticmethod + def _total_completion(statuses, total_images): + """ Returns the current amount completion relative to the total completion of a build. """ + percentage_with_sizes = float(len(statuses.values())) / total_images + sent_bytes = sum([status['current'] for status in statuses.values()]) + total_bytes = sum([status['total'] for status in statuses.values()]) + return float(sent_bytes) / total_bytes * percentage_with_sizes + + @staticmethod + def _process_pushpull_status(status_dict, current_phase, docker_data, images): + """ Processes the status of a push or pull by updating the provided status_dict and images. """ + if not docker_data: + return + + num_images = 0 + status_completion_key = '' + + if current_phase == 'pushing': + status_completion_key = 'push_completion' + num_images = status_dict['total_commands'] + elif current_phase == 'pulling': + status_completion_key = 'pull_completion' + elif current_phase == 'priming-cache': + status_completion_key = 'cache_completion' + else: + return + + if 'progressDetail' in docker_data and 'id' in docker_data: + image_id = docker_data['id'] + detail = docker_data['progressDetail'] + + if 'current' in detail and 'total' in detail: + images[image_id] = detail + status_dict[status_completion_key] = \ + BuildComponent._total_completion(images, max(len(images), num_images)) + + def _on_log_message(self, phase, json_data): + """ Tails log messages and updates the build status. """ + # Parse any of the JSON data logged. + docker_data = {} + if json_data: + try: + docker_data = json.loads(json_data) + except ValueError: + pass + + # Extract the current status message (if any). + fully_unwrapped = '' + keys_to_extract = ['error', 'status', 'stream'] + for key in keys_to_extract: + if key in docker_data: + fully_unwrapped = docker_data[key] + break + + # Determine if this is a step string. + current_step = None + current_status_string = str(fully_unwrapped.encode('utf-8')) + + if current_status_string and phase == BUILD_PHASE.BUILDING: + step_increment = re.search(r'Step ([0-9]+) :', current_status_string) + if step_increment: + current_step = int(step_increment.group(1)) + + # Parse and update the phase and the status_dict. The status dictionary contains + # the pull/push progress, as well as the current step index. + with self._build_status as status_dict: + if self._build_status.set_phase(phase): + logger.debug('Build %s has entered a new phase: %s', self.builder_realm, phase) + + BuildComponent._process_pushpull_status(status_dict, phase, docker_data, self._image_info) + + # If the current message represents the beginning of a new step, then update the + # current command index. + if current_step is not None: + status_dict['current_command'] = current_step + + # If the json data contains an error, then something went wrong with a push or pull. + if 'error' in docker_data: + self._build_status.set_error(docker_data['error']) + + if current_step is not None: + self._build_status.set_command(current_status_string) + elif phase == BUILD_PHASE.BUILDING: + self._build_status.append_log(current_status_string) + + + def _build_failure(self, error_message, exception=None): + """ Handles and logs a failed build. """ + self._build_status.set_error(error_message, { + 'internal_error': exception.message if exception else None + }) + + build_id = self._current_job.repo_build().uuid + logger.warning('Build %s failed with message: %s', build_id, error_message) + + # Mark that the build has finished (in an error state) + self._build_finished(BuildJobResult.ERROR) + + def _build_complete(self, result): + """ Wraps up a completed build. Handles any errors and calls self._build_finished. """ + try: + # Retrieve the result. This will raise an ApplicationError on any error that occurred. + result.result() + self._build_status.set_phase(BUILD_PHASE.COMPLETE) + self._build_finished(BuildJobResult.COMPLETE) + except ApplicationError as aex: + worker_error = WorkerError(aex.error, aex.kwargs.get('base_error')) + + # Write the error to the log. + self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(), + internal_error=worker_error.is_internal_error()) + + # Mark the build as completed. + if worker_error.is_internal_error(): + self._build_finished(BuildJobResult.INCOMPLETE) + else: + self._build_finished(BuildJobResult.ERROR) + + def _build_finished(self, job_status): + """ Alerts the parent that a build has completed and sets the status back to running. """ + self.parent_manager.job_completed(self._current_job, job_status, self) + self._current_job = None + + # Set the component back to a running state. + self._set_status(ComponentStatus.RUNNING) + + @staticmethod + def _ping(): + """ Ping pong. """ + return 'pong' + + def _on_ready(self, token, version): + if not version in SUPPORTED_WORKER_VERSIONS: + logger.warning('Build component (token "%s") is running an out-of-date version: %s', version) + return False + + if self._component_status != 'waiting': + logger.warning('Build component (token "%s") is already connected', self.expected_token) + return False + + if token != self.expected_token: + logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token) + return False + + self._set_status(ComponentStatus.RUNNING) + + # Start the heartbeat check and updating loop. + loop = trollius.get_event_loop() + loop.create_task(self._heartbeat()) + logger.debug('Build worker %s is connected and ready', self.builder_realm) + return True + + def _set_status(self, phase): + self._component_status = phase + + def _on_heartbeat(self): + """ Updates the last known heartbeat. """ + self._last_heartbeat = datetime.datetime.now() + + @trollius.coroutine + def _heartbeat(self): + """ Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat + and updating the heartbeat in the build status dictionary (if applicable). This allows + the build system to catch crashes from either end. + """ + yield From(trollius.sleep(INITIAL_TIMEOUT)) + + while True: + # If the component is no longer running or actively building, nothing more to do. + if (self._component_status != ComponentStatus.RUNNING and + self._component_status != ComponentStatus.BUILDING): + return + + # If there is an active build, write the heartbeat to its status. + build_status = self._build_status + if build_status is not None: + with build_status as status_dict: + status_dict['heartbeat'] = int(time.time()) + + + # Mark the build item. + current_job = self._current_job + if current_job is not None: + self.parent_manager.job_heartbeat(current_job) + + # Check the heartbeat from the worker. + logger.debug('Checking heartbeat on realm %s', self.builder_realm) + if self._last_heartbeat and self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA: + self._timeout() + return + + yield From(trollius.sleep(HEARTBEAT_TIMEOUT)) + + def _timeout(self): + self._set_status(ComponentStatus.TIMED_OUT) + logger.warning('Build component with realm %s has timed out', self.builder_realm) + self._dispose(timed_out=True) + + def _dispose(self, timed_out=False): + # If we still have a running job, then it has not completed and we need to tell the parent + # manager. + if self._current_job is not None: + if timed_out: + self._build_status.set_error('Build worker timed out', internal_error=True) + + self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self) + self._build_status = None + self._current_job = None + + # Unregister the current component so that it cannot be invoked again. + self.parent_manager.build_component_disposed(self, timed_out) diff --git a/buildman/jobutil/__init__.py b/buildman/jobutil/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/jobutil/buildjob.py b/buildman/jobutil/buildjob.py new file mode 100644 index 000000000..2b9dbf35c --- /dev/null +++ b/buildman/jobutil/buildjob.py @@ -0,0 +1,60 @@ +from data import model + +import json + +class BuildJobLoadException(Exception): + """ Exception raised if a build job could not be instantiated for some reason. """ + pass + +class BuildJob(object): + """ Represents a single in-progress build job. """ + def __init__(self, job_item): + self._job_item = job_item + + try: + self._job_details = json.loads(job_item.body) + except ValueError: + raise BuildJobLoadException( + 'Could not parse build queue item config with ID %s' % self._job_details['build_uuid'] + ) + + try: + self._repo_build = model.get_repository_build(self._job_details['namespace'], + self._job_details['repository'], + self._job_details['build_uuid']) + except model.InvalidRepositoryBuildException: + raise BuildJobLoadException( + 'Could not load repository build with ID %s' % self._job_details['build_uuid']) + + try: + self._build_config = json.loads(self._repo_build.job_config) + except ValueError: + raise BuildJobLoadException( + 'Could not parse repository build job config with ID %s' % self._job_details['build_uuid'] + ) + + def determine_cached_tag(self): + """ Returns the tag to pull to prime the cache or None if none. """ + # TODO(jschorr): Change this to use the more complicated caching rules, once we have caching + # be a pull of things besides the constructed tags. + tags = self._build_config.get('docker_tags', ['latest']) + existing_tags = model.list_repository_tags(self._job_details['namespace'], + self._job_details['repository']) + + cached_tags = set(tags) & set([tag.name for tag in existing_tags]) + if cached_tags: + return list(cached_tags)[0] + + return None + + def job_item(self): + """ Returns the job's queue item. """ + return self._job_item + + def repo_build(self): + """ Returns the repository build DB row for the job. """ + return self._repo_build + + def build_config(self): + """ Returns the parsed repository build config for the job. """ + return self._build_config diff --git a/buildman/jobutil/buildpack.py b/buildman/jobutil/buildpack.py new file mode 100644 index 000000000..9892c65d3 --- /dev/null +++ b/buildman/jobutil/buildpack.py @@ -0,0 +1,88 @@ +import tarfile +import requests +import os + +from tempfile import TemporaryFile, mkdtemp +from zipfile import ZipFile +from util.dockerfileparse import parse_dockerfile +from util.safetar import safe_extractall + +class BuildPackageException(Exception): + """ Exception raised when retrieving or parsing a build package. """ + pass + + +class BuildPackage(object): + """ Helper class for easy reading and updating of a Dockerfile build pack. """ + + def __init__(self, requests_file): + self._mime_processors = { + 'application/zip': BuildPackage._prepare_zip, + 'application/x-zip-compressed': BuildPackage._prepare_zip, + 'text/plain': BuildPackage._prepare_dockerfile, + 'application/octet-stream': BuildPackage._prepare_dockerfile, + 'application/x-tar': BuildPackage._prepare_tarball, + 'application/gzip': BuildPackage._prepare_tarball, + 'application/x-gzip': BuildPackage._prepare_tarball, + } + + c_type = requests_file.headers['content-type'] + c_type = c_type.split(';')[0] if ';' in c_type else c_type + + if c_type not in self._mime_processors: + raise BuildPackageException('Unknown build package mime type: %s' % c_type) + + self._package_directory = None + try: + self._package_directory = self._mime_processors[c_type](requests_file) + except Exception as ex: + raise BuildPackageException(ex.message) + + def parse_dockerfile(self, subdirectory): + dockerfile_path = os.path.join(self._package_directory, subdirectory, 'Dockerfile') + if not os.path.exists(dockerfile_path): + if subdirectory: + message = 'Build package did not contain a Dockerfile at sub directory %s.' % subdirectory + else: + message = 'Build package did not contain a Dockerfile at the root directory.' + + raise BuildPackageException(message) + + with open(dockerfile_path, 'r') as dockerfileobj: + return parse_dockerfile(dockerfileobj.read()) + + @staticmethod + def from_url(url): + buildpack_resource = requests.get(url, stream=True) + return BuildPackage(buildpack_resource) + + @staticmethod + def _prepare_zip(request_file): + build_dir = mkdtemp(prefix='docker-build-') + + # Save the zip file to temp somewhere + with TemporaryFile() as zip_file: + zip_file.write(request_file.content) + to_extract = ZipFile(zip_file) + to_extract.extractall(build_dir) + + return build_dir + + @staticmethod + def _prepare_dockerfile(request_file): + build_dir = mkdtemp(prefix='docker-build-') + dockerfile_path = os.path.join(build_dir, "Dockerfile") + with open(dockerfile_path, 'w') as dockerfile: + dockerfile.write(request_file.content) + + return build_dir + + @staticmethod + def _prepare_tarball(request_file): + build_dir = mkdtemp(prefix='docker-build-') + + # Save the zip file to temp somewhere + with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream: + safe_extractall(tar_stream, build_dir) + + return build_dir diff --git a/buildman/jobutil/buildstatus.py b/buildman/jobutil/buildstatus.py new file mode 100644 index 000000000..68b8cd5e3 --- /dev/null +++ b/buildman/jobutil/buildstatus.py @@ -0,0 +1,52 @@ +from data.database import BUILD_PHASE + +class StatusHandler(object): + """ Context wrapper for writing status to build logs. """ + + def __init__(self, build_logs, repository_build): + self._current_phase = None + self._repository_build = repository_build + self._uuid = repository_build.uuid + self._build_logs = build_logs + + self._status = { + 'total_commands': None, + 'current_command': None, + 'push_completion': 0.0, + 'pull_completion': 0.0, + } + + # Write the initial status. + self.__exit__(None, None, None) + + def _append_log_message(self, log_message, log_type=None, log_data=None): + self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data) + + def append_log(self, log_message, extra_data=None): + self._append_log_message(log_message, log_data=extra_data) + + def set_command(self, command, extra_data=None): + self._append_log_message(command, self._build_logs.COMMAND, extra_data) + + def set_error(self, error_message, extra_data=None, internal_error=False): + self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error else BUILD_PHASE.ERROR) + + extra_data = extra_data or {} + extra_data['internal_error'] = internal_error + self._append_log_message(error_message, self._build_logs.ERROR, extra_data) + + def set_phase(self, phase, extra_data=None): + if phase == self._current_phase: + return False + + self._current_phase = phase + self._append_log_message(phase, self._build_logs.PHASE, extra_data) + self._repository_build.phase = phase + self._repository_build.save() + return True + + def __enter__(self): + return self._status + + def __exit__(self, exc_type, value, traceback): + self._build_logs.set_status(self._uuid, self._status) diff --git a/buildman/jobutil/workererror.py b/buildman/jobutil/workererror.py new file mode 100644 index 000000000..8271976e4 --- /dev/null +++ b/buildman/jobutil/workererror.py @@ -0,0 +1,84 @@ +class WorkerError(object): + """ Helper class which represents errors raised by a build worker. """ + def __init__(self, error_code, base_message=None): + self._error_code = error_code + self._base_message = base_message + + self._error_handlers = { + 'io.quay.builder.buildpackissue': { + 'message': 'Could not load build package', + 'is_internal': True + }, + + 'io.quay.builder.cannotextractbuildpack': { + 'message': 'Could not extract the contents of the build package' + }, + + 'io.quay.builder.cannotpullforcache': { + 'message': 'Could not pull cached image', + 'is_internal': True + }, + + 'io.quay.builder.cannotpullbaseimage': { + 'message': 'Could not pull base image', + 'show_base_error': True + }, + + 'io.quay.builder.internalerror': { + 'message': 'An internal error occurred while building. Please submit a ticket.' + }, + + 'io.quay.builder.buildrunerror': { + 'message': 'Could not start the build process', + 'is_internal': True + }, + + 'io.quay.builder.builderror': { + 'message': 'A build step failed', + 'show_base_error': True + }, + + 'io.quay.builder.tagissue': { + 'message': 'Could not tag built image', + 'is_internal': True + }, + + 'io.quay.builder.pushissue': { + 'message': 'Could not push built image', + 'show_base_error': True, + 'is_internal': True + }, + + 'io.quay.builder.dockerconnecterror': { + 'message': 'Could not connect to Docker daemon', + 'is_internal': True + }, + + 'io.quay.builder.missingorinvalidargument': { + 'message': 'Missing required arguments for builder', + 'is_internal': True + } + } + + def is_internal_error(self): + handler = self._error_handlers.get(self._error_code) + return handler.get('is_internal', False) if handler else True + + def public_message(self): + handler = self._error_handlers.get(self._error_code) + if not handler: + return 'An unknown error occurred' + + message = handler['message'] + if handler.get('show_base_error', False) and self._base_message: + message = message + ': ' + self._base_message + + return message + + def extra_data(self): + if self._base_message: + return { + 'base_error': self._base_message + } + + return {} diff --git a/buildman/manager/__init__.py b/buildman/manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py new file mode 100644 index 000000000..f66054c45 --- /dev/null +++ b/buildman/manager/basemanager.py @@ -0,0 +1,49 @@ +class BaseManager(object): + """ Base for all worker managers. """ + def __init__(self, register_component, unregister_component, job_heartbeat_callback, + job_complete_callback): + self.register_component = register_component + self.unregister_component = unregister_component + self.job_heartbeat_callback = job_heartbeat_callback + self.job_complete_callback = job_complete_callback + + def job_heartbeat(self, build_job): + """ Method invoked to tell the manager that a job is still running. This method will be called + every few minutes. """ + self.job_heartbeat_callback(build_job) + + def setup_time(self): + """ Returns the number of seconds that the build system should wait before allowing the job + to be picked up again after called 'schedule'. + """ + raise NotImplementedError + + def shutdown(self): + """ Indicates that the build controller server is in a shutdown state and that no new jobs + or workers should be performed. Existing workers should be cleaned up once their jobs + have completed + """ + raise NotImplementedError + + def schedule(self, build_job, loop): + """ Schedules a queue item to be built. Returns True if the item was properly scheduled + and False if all workers are busy. + """ + raise NotImplementedError + + def initialize(self): + """ Runs any initialization code for the manager. Called once the server is in a ready state. + """ + raise NotImplementedError + + def build_component_disposed(self, build_component, timed_out): + """ Method invoked whenever a build component has been disposed. The timed_out boolean indicates + whether the component's heartbeat timed out. + """ + raise NotImplementedError + + def job_completed(self, build_job, job_status, build_component): + """ Method invoked once a job_item has completed, in some manner. The job_status will be + one of: incomplete, error, complete. If incomplete, the job should be requeued. + """ + raise NotImplementedError diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py new file mode 100644 index 000000000..824e02d53 --- /dev/null +++ b/buildman/manager/enterprise.py @@ -0,0 +1,72 @@ +import logging +import uuid + +from buildman.component.basecomponent import BaseComponent +from buildman.component.buildcomponent import BuildComponent +from buildman.manager.basemanager import BaseManager + +from trollius.coroutines import From + +REGISTRATION_REALM = 'registration' +logger = logging.getLogger(__name__) + +class DynamicRegistrationComponent(BaseComponent): + """ Component session that handles dynamic registration of the builder components. """ + + def onConnect(self): + self.join(REGISTRATION_REALM) + + def onJoin(self, details): + logger.debug('Registering registration method') + yield From(self.register(self._worker_register, u'io.quay.buildworker.register')) + + def _worker_register(self): + realm = self.parent_manager.add_build_component() + logger.debug('Registering new build component+worker with realm %s', realm) + return realm + + +class EnterpriseManager(BaseManager): + """ Build manager implementation for the Enterprise Registry. """ + build_components = [] + shutting_down = False + + def initialize(self): + # Add a component which is used by build workers for dynamic registration. Unlike + # production, build workers in enterprise are long-lived and register dynamically. + self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent) + + def setup_time(self): + # Builders are already registered, so the setup time should be essentially instant. We therefore + # only return a minute here. + return 60 + + def add_build_component(self): + """ Adds a new build component for an Enterprise Registry. """ + # Generate a new unique realm ID for the build worker. + realm = str(uuid.uuid4()) + component = self.register_component(realm, BuildComponent, token="") + self.build_components.append(component) + return realm + + def schedule(self, build_job, loop): + """ Schedules a build for an Enterprise Registry. """ + if self.shutting_down: + return False + + for component in self.build_components: + if component.is_ready(): + loop.call_soon(component.start_build, build_job) + return True + + return False + + def shutdown(self): + self.shutting_down = True + + def job_completed(self, build_job, job_status, build_component): + self.job_complete_callback(build_job, job_status) + + def build_component_disposed(self, build_component, timed_out): + self.build_components.remove(build_component) + diff --git a/buildman/server.py b/buildman/server.py new file mode 100644 index 000000000..3863406f2 --- /dev/null +++ b/buildman/server.py @@ -0,0 +1,177 @@ +import logging +import trollius + +from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory +from autobahn.asyncio.websocket import WampWebSocketServerFactory +from autobahn.wamp import types + +from aiowsgi import create_server as create_wsgi_server +from flask import Flask +from threading import Event +from trollius.coroutines import From +from datetime import datetime, timedelta + +from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException +from data.queue import WorkQueue + +logger = logging.getLogger(__name__) + +WORK_CHECK_TIMEOUT = 10 +TIMEOUT_PERIOD_MINUTES = 20 +JOB_TIMEOUT_SECONDS = 300 +MINIMUM_JOB_EXTENSION = timedelta(minutes=2) + +WEBSOCKET_PORT = 8787 +CONTROLLER_PORT = 8686 + +class BuildJobResult(object): + """ Build job result enum """ + INCOMPLETE = 'incomplete' + COMPLETE = 'complete' + ERROR = 'error' + +class BuilderServer(object): + """ Server which handles both HTTP and WAMP requests, managing the full state of the build + controller. + """ + def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass): + self._loop = None + self._current_status = 'starting' + self._current_components = [] + self._job_count = 0 + + self._session_factory = RouterSessionFactory(RouterFactory()) + self._server_hostname = server_hostname + self._queue = queue + self._build_logs = build_logs + self._user_files = user_files + self._lifecycle_manager = lifecycle_manager_klass( + self._register_component, + self._unregister_component, + self._job_heartbeat, + self._job_complete + ) + + self._shutdown_event = Event() + self._current_status = 'running' + + self._register_controller() + + def _register_controller(self): + controller_app = Flask('controller') + server = self + + @controller_app.route('/status') + def status(): + return server._current_status + + self._controller_app = controller_app + + def run(self, host, ssl=None): + logger.debug('Initializing the lifecycle manager') + self._lifecycle_manager.initialize() + + logger.debug('Initializing all members of the event loop') + loop = trollius.get_event_loop() + trollius.Task(self._initialize(loop, host, ssl)) + + logger.debug('Starting server on port %s, with controller on port %s', WEBSOCKET_PORT, + CONTROLLER_PORT) + try: + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + loop.close() + + def close(self): + logger.debug('Requested server shutdown') + self._current_status = 'shutting_down' + self._lifecycle_manager.shutdown() + self._shutdown_event.wait() + logger.debug('Shutting down server') + + def _register_component(self, realm, component_klass, **kwargs): + """ Registers a component with the server. The component_klass must derive from + BaseComponent. + """ + logger.debug('Registering component with realm %s', realm) + + component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) + component.server = self + component.parent_manager = self._lifecycle_manager + component.build_logs = self._build_logs + component.user_files = self._user_files + component.server_hostname = self._server_hostname + + self._current_components.append(component) + self._session_factory.add(component) + return component + + def _unregister_component(self, component): + logger.debug('Unregistering component with realm %s and token %s', + component.builder_realm, component.expected_token) + + self._current_components.remove(component) + self._session_factory.remove(component) + + def _job_heartbeat(self, build_job): + WorkQueue.extend_processing(build_job.job_item(), seconds_from_now=JOB_TIMEOUT_SECONDS, + retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) + + def _job_complete(self, build_job, job_status): + if job_status == BuildJobResult.INCOMPLETE: + self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30) + elif job_status == BuildJobResult.ERROR: + self._queue.incomplete(build_job.job_item(), restore_retry=False) + else: + self._queue.complete(build_job.job_item()) + + self._job_count = self._job_count - 1 + + if self._current_status == 'shutting_down' and not self._job_count: + self._shutdown_event.set() + + # TODO(jschorr): check for work here? + + @trollius.coroutine + def _work_checker(self): + while self._current_status == 'running': + logger.debug('Checking for more work') + job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) + if job_item is None: + logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) + yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) + continue + + try: + build_job = BuildJob(job_item) + except BuildJobLoadException as irbe: + logger.exception(irbe) + self._queue.incomplete(job_item, restore_retry=False) + + logger.debug('Build job found. Checking for an avaliable worker.') + if self._lifecycle_manager.schedule(build_job, self._loop): + self._job_count = self._job_count + 1 + logger.debug('Build job scheduled. Running: %s', self._job_count) + else: + logger.debug('All workers are busy. Requeuing.') + self._queue.incomplete(job_item, restore_retry=True, retry_after=0) + + yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) + + + @trollius.coroutine + def _initialize(self, loop, host, ssl=None): + self._loop = loop + + # Create the WAMP server. + transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False) + transport_factory.setProtocolOptions(failByDrop=True) + + # Initialize the controller server and the WAMP server + create_wsgi_server(self._controller_app, loop=loop, host=host, port=CONTROLLER_PORT, ssl=ssl) + yield From(loop.create_server(transport_factory, host, WEBSOCKET_PORT, ssl=ssl)) + + # Initialize the work queue checker. + yield From(self._work_checker()) diff --git a/conf/http-base.conf b/conf/http-base.conf index ad3d9f178..1eb0b6170 100644 --- a/conf/http-base.conf +++ b/conf/http-base.conf @@ -23,3 +23,11 @@ upstream verbs_app_server { upstream registry_app_server { server unix:/tmp/gunicorn_registry.sock fail_timeout=0; } + +upstream build_manager_controller_server { + server localhost:8686; +} + +upstream build_manager_websocket_server { + server localhost:8787; +} \ No newline at end of file diff --git a/conf/init/buildmanager/log/run b/conf/init/buildmanager/log/run new file mode 100755 index 000000000..1dd4c3fef --- /dev/null +++ b/conf/init/buildmanager/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec svlogd /var/log/buildmanager/ \ No newline at end of file diff --git a/conf/init/buildmanager/run b/conf/init/buildmanager/run new file mode 100755 index 000000000..d0bc6564f --- /dev/null +++ b/conf/init/buildmanager/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting internal build manager' + +cd / +venv/bin/python -m buildman.builder 2>&1 + +echo 'Internal build manager exited' \ No newline at end of file diff --git a/conf/server-base.conf b/conf/server-base.conf index da46a5d5f..5b06b76c5 100644 --- a/conf/server-base.conf +++ b/conf/server-base.conf @@ -60,3 +60,15 @@ location /v1/_ping { add_header X-Docker-Registry-Standalone 0; return 200 'true'; } + +location ~ ^/b1/controller(/?)(.*) { + proxy_pass http://build_manager_controller_server/$2; + proxy_read_timeout 2000; +} + +location ~ ^/b1/socket(/?)(.*) { + proxy_pass http://build_manager_websocket_server/$2; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; +} diff --git a/config.py b/config.py index 4627dc9a2..60334ebd3 100644 --- a/config.py +++ b/config.py @@ -163,6 +163,8 @@ class DefaultConfig(object): # Feature Flag: Whether users can be renamed FEATURE_USER_RENAME = False + BUILD_MANAGER = ('enterprise', {}) + DISTRIBUTED_STORAGE_CONFIG = { 'local_eu': ['LocalStorage', {'storage_path': 'test/data/registry/eu'}], 'local_us': ['LocalStorage', {'storage_path': 'test/data/registry/us'}], diff --git a/data/database.py b/data/database.py index 2cb1a51ca..c8d7ff8ae 100644 --- a/data/database.py +++ b/data/database.py @@ -420,6 +420,7 @@ class RepositoryTag(BaseModel): class BUILD_PHASE(object): """ Build phases enum """ ERROR = 'error' + INTERNAL_ERROR = 'internalerror' UNPACKING = 'unpacking' PULLING = 'pulling' BUILDING = 'building' diff --git a/data/queue.py b/data/queue.py index 159bd56ce..aaebcc86b 100644 --- a/data/queue.py +++ b/data/queue.py @@ -127,12 +127,15 @@ class WorkQueue(object): incomplete_item_obj.save() self._currently_processing = False - def extend_processing(self, queue_item, seconds_from_now): + @staticmethod + def extend_processing(queue_item, seconds_from_now, retry_count=None, + minimum_extension=MINIMUM_EXTENSION): new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now) # Only actually write the new expiration to the db if it moves the expiration some minimum - queue_item_obj = QueueItem.get(QueueItem.id == queue_item.id) - if new_expiration - queue_item_obj.processing_expires > MINIMUM_EXTENSION: - with self._transaction_factory(db): - queue_item_obj.processing_expires = new_expiration - queue_item_obj.save() + if new_expiration - queue_item.processing_expires > minimum_extension: + if retry_count is not None: + queue_item.retries_remaining = retry_count + + queue_item.processing_expires = new_expiration + queue_item.save() \ No newline at end of file diff --git a/endpoints/api/build.py b/endpoints/api/build.py index 8eb565f50..682380dd9 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -1,5 +1,7 @@ import logging import json +import time +import datetime from flask import request, redirect @@ -9,7 +11,7 @@ from endpoints.api import (RepositoryParamResource, parse_args, query_param, nic ApiResource, internal_only, format_date, api, Unauthorized, NotFound) from endpoints.common import start_build from endpoints.trigger import BuildTrigger -from data import model +from data import model, database from auth.auth_context import get_authenticated_user from auth.permissions import ModifyRepositoryPermission, AdministerOrganizationPermission from data.buildlogs import BuildStatusRetrievalError @@ -65,6 +67,13 @@ def build_status_view(build_obj, can_write=False): status = {} phase = 'cannot_load' + # If the status contains a heartbeat, then check to see if has been written in the last few + # minutes. If not, then the build timed out. + if status is not None and 'heartbeat' in status and status['heartbeat']: + heartbeat = datetime.datetime.fromtimestamp(status['heartbeat']) + if datetime.datetime.now() - heartbeat > datetime.timedelta(minutes=1): + phase = database.BUILD_PHASE.INTERNAL_ERROR + logger.debug('Can write: %s job_config: %s', can_write, build_obj.job_config) resp = { 'id': build_obj.uuid, diff --git a/endpoints/trigger.py b/endpoints/trigger.py index fdfde7000..3a9813428 100644 --- a/endpoints/trigger.py +++ b/endpoints/trigger.py @@ -150,8 +150,10 @@ def raise_unsupported(): class GithubBuildTrigger(BuildTrigger): @staticmethod def _get_client(auth_token): - return Github(auth_token, client_id=github_trigger.client_id(), - client_secret=github_trigger.client_secret()) + return Github(auth_token, + base_url=github_trigger.api_endpoint(), + client_id=github_trigger.client_id(), + client_secret=github_trigger.client_secret()) @classmethod def service_name(cls): diff --git a/local-run.sh b/local-run.sh new file mode 100755 index 000000000..d606624a7 --- /dev/null +++ b/local-run.sh @@ -0,0 +1 @@ +gunicorn -c conf/gunicorn_local.py application:application \ No newline at end of file diff --git a/local-test.sh b/local-test.sh new file mode 100755 index 000000000..a54491969 --- /dev/null +++ b/local-test.sh @@ -0,0 +1 @@ +TEST=true python -m unittest discover diff --git a/requirements-nover.txt b/requirements-nover.txt index 741efe889..7772b3145 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -1,3 +1,6 @@ +autobahn +aiowsgi +trollius peewee flask py-bcrypt diff --git a/requirements.txt b/requirements.txt index e8479b500..14d5cb33f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,9 @@ SQLAlchemy==0.9.8 Werkzeug==0.9.6 git+https://github.com/DevTable/aniso8601-fake.git git+https://github.com/DevTable/anunidecode.git +aiowsgi==0.3 alembic==0.6.7 +autobahn==0.9.3-3 backports.ssl-match-hostname==3.4.0.2 beautifulsoup4==4.3.2 blinker==1.3 @@ -51,6 +53,7 @@ reportlab==2.7 requests==2.4.3 six==1.8.0 stripe==1.19.1 +trollius==1.0.3 tzlocal==1.1.2 websocket-client==0.21.0 wsgiref==0.1.2 diff --git a/run-local.sh b/run-local.sh deleted file mode 100755 index 628873fd7..000000000 --- a/run-local.sh +++ /dev/null @@ -1 +0,0 @@ -gunicorn -c conf/gunicorn_local.py application:application \ No newline at end of file diff --git a/static/css/quay.css b/static/css/quay.css index 415628dc5..8c22856e6 100644 --- a/static/css/quay.css +++ b/static/css/quay.css @@ -864,6 +864,10 @@ i.toggle-icon:hover { background-color: red; } +.phase-icon.internalerror { + background-color: #DFFF00; +} + .phase-icon.waiting, .phase-icon.unpacking, .phase-icon.starting, .phase-icon.initializing { background-color: #ddd; } @@ -876,6 +880,10 @@ i.toggle-icon:hover { background-color: #f0ad4e; } +.phase-icon.priming-cache { + background-color: #ddd; +} + .phase-icon.pushing { background-color: #5cb85c; } @@ -4872,3 +4880,11 @@ i.slack-icon { width: 120px; padding-right: 10px; } + +.progress.active .progress-bar { + /* Note: There is a bug in Chrome which results in high CPU usage for active progress-bars + due to their animation. This enables the GPU for the rendering, which cuts CPU usage in + half (although it is still not great) + */ + transform: translateZ(0); +} \ No newline at end of file diff --git a/static/directives/build-log-error.html b/static/directives/build-log-error.html index 1b27fb447..c012a623f 100644 --- a/static/directives/build-log-error.html +++ b/static/directives/build-log-error.html @@ -9,7 +9,6 @@ -
Note: The credentials {{ getLocalPullInfo().login.username }} for registry {{ getLocalPullInfo().login.registry }} cannot diff --git a/static/directives/trigger-description.html b/static/directives/trigger-description.html index 91000dd1e..b39771a95 100644 --- a/static/directives/trigger-description.html +++ b/static/directives/trigger-description.html @@ -1,7 +1,10 @@ - Push to GitHub repository {{ trigger.config.build_source }} + Push to GitHub Enterprise repository + + {{ trigger.config.build_source }} +
Branches/Tags: diff --git a/static/js/app.js b/static/js/app.js index 26c341daa..45d080158 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -1730,14 +1730,22 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading keyService['githubEndpoint'] = oauth['GITHUB_LOGIN_CONFIG']['GITHUB_ENDPOINT']; - keyService['githubTriggerAuthorizeUrl'] = oauth['GITHUB_LOGIN_CONFIG']['AUTHORIZE_ENDPOINT']; + keyService['githubTriggerEndpoint'] = oauth['GITHUB_TRIGGER_CONFIG']['GITHUB_ENDPOINT']; + keyService['githubTriggerAuthorizeUrl'] = oauth['GITHUB_TRIGGER_CONFIG']['AUTHORIZE_ENDPOINT']; keyService['githubLoginScope'] = 'user:email'; keyService['googleLoginScope'] = 'openid email'; keyService.isEnterprise = function(service) { - var isGithubEnterprise = keyService['githubLoginUrl'].indexOf('https://github.com/') < 0; - return service == 'github' && isGithubEnterprise; + switch (service) { + case 'github': + return keyService['githubLoginUrl'].indexOf('https://github.com/') < 0; + + case 'github-trigger': + return keyService['githubTriggerAuthorizeUrl'].indexOf('https://github.com/') < 0; + } + + return false; }; keyService.getExternalLoginUrl = function(service, action) { @@ -4747,6 +4755,11 @@ quayApp.directive('buildLogError', function () { 'entries': '=entries' }, controller: function($scope, $element, Config) { + $scope.isInternalError = function() { + var entry = $scope.entries[$scope.entries.length - 1]; + return entry && entry.data && entry.data['internal_error']; + }; + $scope.getLocalPullInfo = function() { if ($scope.entries.__localpull !== undefined) { return $scope.entries.__localpull; @@ -4802,7 +4815,9 @@ quayApp.directive('triggerDescription', function () { 'trigger': '=trigger', 'short': '=short' }, - controller: function($scope, $element) { + controller: function($scope, $element, KeyService, TriggerService) { + $scope.KeyService = KeyService; + $scope.TriggerService = TriggerService; } }; return directiveDefinitionObject; @@ -5680,6 +5695,9 @@ quayApp.directive('buildMessage', function () { case 'building': return 'Building image from Dockerfile'; + case 'priming-cache': + return 'Priming cache for build'; + case 'pushing': return 'Pushing image built from Dockerfile'; @@ -5688,6 +5706,9 @@ quayApp.directive('buildMessage', function () { case 'error': return 'Dockerfile build failed'; + + case 'internalerror': + return 'An internal system error occurred while building; the build will be retried in the next few minutes.'; } }; } @@ -5721,6 +5742,10 @@ quayApp.directive('buildProgress', function () { return buildInfo.status.push_completion * 100; break; + case 'priming-cache': + return buildInfo.status.cache_completion * 100; + break; + case 'complete': return 100; break; diff --git a/static/js/controllers.js b/static/js/controllers.js index 4a5a20622..220412060 100644 --- a/static/js/controllers.js +++ b/static/js/controllers.js @@ -1331,6 +1331,7 @@ function RepoAdminCtrl($scope, Restangular, ApiService, KeyService, TriggerServi $scope.Features = Features; $scope.TriggerService = TriggerService; + $scope.KeyService = KeyService; $scope.permissions = {'team': [], 'user': [], 'loading': 2}; $scope.logsShown = 0; diff --git a/static/partials/repo-admin.html b/static/partials/repo-admin.html index 006ad2443..78add8642 100644 --- a/static/partials/repo-admin.html +++ b/static/partials/repo-admin.html @@ -308,7 +308,8 @@ diff --git a/util/oauth.py b/util/oauth.py index 5349e5435..e0d38d395 100644 --- a/util/oauth.py +++ b/util/oauth.py @@ -52,6 +52,9 @@ class GithubOAuthConfig(OAuthConfig): def _api_endpoint(self): return self.config.get('API_ENDPOINT', self._get_url(self._endpoint(), '/api/v3/')) + def api_endpoint(self): + return self._api_endpoint()[0:-1] + def user_endpoint(self): api_endpoint = self._api_endpoint() return self._get_url(api_endpoint, 'user')