From eacf3f01d2917ecea11eded6a7e53de6e53037fb Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 11 Nov 2014 18:23:15 -0500 Subject: [PATCH] WIP: Start implementation of the build manager/controller. This code is not yet working completely. --- buildman/__init__.py | 0 buildman/basecomponent.py | 11 ++ buildman/buildcomponent.py | 212 ++++++++++++++++++++++++++++++++ buildman/buildpack.py | 86 +++++++++++++ buildman/enterprise_builder.py | 20 +++ buildman/manager/__init__.py | 0 buildman/manager/basemanager.py | 36 ++++++ buildman/manager/enterprise.py | 65 ++++++++++ buildman/server.py | 146 ++++++++++++++++++++++ 9 files changed, 576 insertions(+) create mode 100644 buildman/__init__.py create mode 100644 buildman/basecomponent.py create mode 100644 buildman/buildcomponent.py create mode 100644 buildman/buildpack.py create mode 100644 buildman/enterprise_builder.py create mode 100644 buildman/manager/__init__.py create mode 100644 buildman/manager/basemanager.py create mode 100644 buildman/manager/enterprise.py create mode 100644 buildman/server.py diff --git a/buildman/__init__.py b/buildman/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/basecomponent.py b/buildman/basecomponent.py new file mode 100644 index 000000000..3cf5bc26b --- /dev/null +++ b/buildman/basecomponent.py @@ -0,0 +1,11 @@ +from autobahn.asyncio.wamp import ApplicationSession + +class BaseComponent(ApplicationSession): + """ Base class for all registered component sessions in the server. """ + server = None + parent_manager = None + build_logs = None + user_files = None + + def __init__(self, config, **kwargs): + ApplicationSession.__init__(self, config) diff --git a/buildman/buildcomponent.py b/buildman/buildcomponent.py new file mode 100644 index 000000000..c67b4cb8b --- /dev/null +++ b/buildman/buildcomponent.py @@ -0,0 +1,212 @@ +import datetime +import logging +import json +import trollius + +from trollius.coroutines import From +from buildman.basecomponent import BaseComponent + +HEARTBEAT_DELTA = datetime.timedelta(seconds=15) + +logger = logging.getLogger(__name__) + +class BuildComponent(BaseComponent): + """ An application session component which conducts one (or more) builds. """ + + server_hostname = None + expected_token = None + builder_realm = None + last_heartbeat = None + + current_phase = 'joining' + current_job = None + + def __init__(self, config, realm=None, token=None, **kwargs): + self.expected_token = token + self.builder_realm = realm + + BaseComponent.__init__(self, config, **kwargs) + + def onConnect(self): + self.join(self.builder_realm) + + def onJoin(self, details): + logger.debug('Registering methods and listeners for component %s' % self.builder_realm) + yield From(self.register(self._on_ready, u'io.quay.buildworker.ready')) + yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) + yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) + + self._set_phase('waiting') + + def is_ready(self): + return self.current_phase == 'running' + + def start_build(self, job_item): + if not self.is_ready(): + return False + + self.job_item = job_item + self._set_phase('building') + + # Parse the build job's config. + logger.debug('Parsing job JSON configuration block') + try: + job_config = json.loads(job_item.body) + except ValueError: + self._build_failure('Could not parse build job configuration') + return False + + # Retrieve the job's buildpack. + buildpack_url = self.user_files.get_file_url(job_item.resource_key, requires_cors=False) + logger.debug('Retreiving build package: %s' % buildpack_url) + + buildpack = None + try: + buildpack = BuildPack.from_url(buildpack_url) + except BuildPackageException as bpe: + self._build_failure('Could not retrieve build package', bpe) + return False + + # Extract the base image information from the Dockerfile. + parsed_dockerfile = None + logger.debug('Parsing dockerfile') + + try: + parsed_dockerfile = buildpack.parse_dockerfile(job_config.get('build_subdir')) + except BuildPackageException as bpe: + self._build_failure('Could not find Dockerfile in build package', bpe) + return False + + image_and_tag_tuple = parsed_dockerfile.get_image_and_tag() + if image_and_tag_tuple is None or image_and_tag_tuple[0] is None: + self._build_failure('Missing FROM line Dockerfile') + return False + + base_image_information = { + 'repository': image_and_tag_tuple[0], + 'tag': image_and_tag_tuple[1] + } + + # Add the pull robot information, if any. + if job_config.get('pull_credentials') is not None: + base_image_information['username'] = job_config['pull_credentials'].get('username', '') + base_image_information['password'] = job_config['pull_credentials'].get('password', '') + + # Retrieve the repository's full name. + repo = job_config.repository + repository_name = repo.namespace_user.username + '/' + repo.name + + # Parse the build queue item into build arguments. + # build_package: URL to the build package to download and untar/unzip. + # sub_directory: The location within the build package of the Dockerfile and the build context. + # repository: The repository for which this build is occurring. + # registry: The registry for which this build is occuring. Example: 'quay.io', 'staging.quay.io' + # pull_token: The token to use when pulling the cache for building. + # push_token: The token to use to push the built image. + # tag_names: The name(s) of the tag(s) for the newly built image. + # base_image: The image name and credentials to use to conduct the base image pull. + # repository: The repository to pull. + # tag: The tag to pull. + # username: The username for pulling the base image (if any). + # password: The password for pulling the base image (if any). + build_arguments = { + 'build_package': buildpack_url, + 'sub_directory': job_config.get('build_subdir', ''), + 'repository': repository_name, + 'registry': self.server_hostname, + 'pull_token': job_item.access_token.code, + 'push_token': job_item.access_token.code, + 'tag_names': job_config.get('docker_tags', ['latest']), + 'base_image': base_image_information + } + + # Invoke the build. + logger.debug('Invoking build: %s', token) + logger.debug('With Arguments: %s', build_arguments) + + (self.call("io.quay.builder.build", **build_arguments) + .add_done_callback(self._build_complete)) + + return True + + def _build_failure(self, error_message, exception=None): + # TODO: log this message + print error_kind + self._set_phase('running') + + def _build_complete(self, result): + try: + status = result.result() + # TODO: log the success + print status + except ApplicationError as ae: + error_kind = ae.error + # TODO: log the error + print error_kind + finally: + self._set_phase('running') + + def _on_ready(self, token): + if self.current_phase != 'waiting': + logger.warning('Build component with token %s is already connected', self.expected_token) + return + + if token != self.expected_token: + logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token) + return + + self._set_phase('running') + + # Start the heartbeat check. + loop = trollius.get_event_loop() + loop.create_task(self._check_heartbeat(loop)) + logger.debug('Build worker %s is connected and ready' % self.builder_realm) + return True + + def _on_log_message(self, status, json): + # TODO: log the message + print json + + def _set_phase(self, phase): + self.current_phase = phase + + def _on_heartbeat(self): + self.last_heartbeat = datetime.datetime.now() + + def _start_heartbeat_check(self, loop): + trollius.set_event_loop(loop) + loop.run_until_complete(self._check_heartbeat()) + + @trollius.coroutine + def _check_heartbeat(self, loop): + while True: + if self.current_phase != 'running' or self.current_phase != 'building': + return + + logger.debug('Checking heartbeat on realm %s and build %s', + self.builder_realm, self.expected_token) + + if not self.last_heartbeat: + self._timeout() + return + + if self.last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA: + self._timeout() + return + + yield From(trollius.sleep(5)) + + def _timeout(self): + self._set_phase('timeout') + logger.warning('Build component %s timed out', self.expected_token) + self._dispose(timed_out=True) + + def _dispose(self, timed_out=False): + # If we still have a running job, then it has not completed and we need to tell the parent + # manager. + if self.job_item is not None: + self.parent_manager.job_completed(job_item, 'incomplete', self) + self.job_item = None + + # Unregister the current component so that it cannot be invoked again. + self.parent_manager.build_component_disposed(self, timed_out) diff --git a/buildman/buildpack.py b/buildman/buildpack.py new file mode 100644 index 000000000..f3e0347d2 --- /dev/null +++ b/buildman/buildpack.py @@ -0,0 +1,86 @@ +import tarfile + +from tempfile import TemporaryFile, mkdtemp +from zipfile import ZipFile +from util.dockerfileparse import parse_dockerfile, ParsedDockerfile + + +class BuildPackageException(Exception): + """ Exception raised when retrieving or parsing a build package. """ + pass + + +class BuildPackage(object): + """ Helper class for easy reading and updating of a Dockerfile build pack. """ + + def __init__(self, requests_file): + self._mime_processors = { + 'application/zip': BuildPack.__prepare_zip, + 'application/x-zip-compressed': BuildPack.__prepare_zip, + 'text/plain': BuildPack.__prepare_dockerfile, + 'application/octet-stream': BuildPack.__prepare_dockerfile, + 'application/x-tar': BuildPack.__prepare_tarball, + 'application/gzip': BuildPack.__prepare_tarball, + 'application/x-gzip': BuildPack.__prepare_tarball, + } + + c_type = buildpack_resource.headers['content-type'] + c_type = c_type.split(';')[0] if ';' in c_type else c_type + + if c_type not in self._mime_processors: + raise BuildPackageException('Unknown build package mime type: %s' % c_type) + + self._package_directory = None + try: + self._package_directory = self._mime_processors[c_type](requests_file) + except Exception as ex: + raise BuildPackageException(ex.message) + + def parse_dockerfile(self, build_subdirectory): + dockerfile_path = os.path.join(self._package_directory, subdirectory, 'Dockerfile') + if not os.path.exists(dockerfile_path): + if subdirectory: + message = 'Build package did not contain a Dockerfile at sub directory %s.' % subdirectory + else: + message = 'Build package did not contain a Dockerfile at the root directory.' + + raise BuildPackageException(message) + + with open(dockerfile_path, 'r') as dockerfileobj: + return parse_dockerfile(dockerfileobj.read()) + + @classmethod + def from_url(url): + buildpack_resource = requests.get(buildpack_url, stream=True) + return BuildPackage(buildpack_resource, c_type) + + @staticmethod + def __prepare_zip(request_file): + build_dir = mkdtemp(prefix='docker-build-') + + # Save the zip file to temp somewhere + with TemporaryFile() as zip_file: + zip_file.write(request_file.content) + to_extract = ZipFile(zip_file) + to_extract.extractall(build_dir) + + return build_dir + + @staticmethod + def __prepare_dockerfile(request_file): + build_dir = mkdtemp(prefix='docker-build-') + dockerfile_path = os.path.join(build_dir, "Dockerfile") + with open(dockerfile_path, 'w') as dockerfile: + dockerfile.write(request_file.content) + + return build_dir + + @staticmethod + def __prepare_tarball(request_file): + build_dir = mkdtemp(prefix='docker-build-') + + # Save the zip file to temp somewhere + with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream: + safe_extractall(tar_stream, build_dir) + + return build_dir \ No newline at end of file diff --git a/buildman/enterprise_builder.py b/buildman/enterprise_builder.py new file mode 100644 index 000000000..62b67a14c --- /dev/null +++ b/buildman/enterprise_builder.py @@ -0,0 +1,20 @@ +import argparse +import logging + +from app import app, userfiles as user_files, build_logs, dockerfile_build_queue + +from buildman.manager.enterprise import EnterpriseManager +from buildman.server import BuilderServer + +logger = logging.getLogger(__name__) + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + parser = argparse.ArgumentParser() + parser.add_argument("--host", type = str, default = "127.0.0.1", help = 'Host IP.') + args = parser.parse_args() + + server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs, + user_files, EnterpriseManager) + server.run(args.host) diff --git a/buildman/manager/__init__.py b/buildman/manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/buildman/manager/basemanager.py b/buildman/manager/basemanager.py new file mode 100644 index 000000000..1a5d101f7 --- /dev/null +++ b/buildman/manager/basemanager.py @@ -0,0 +1,36 @@ +class BaseManager(object): + """ Base for all worker managers. """ + def __init__(self, register_component, unregister_component, job_complete_callback): + self.register_component = register_component + self.unregister_component = unregister_component + self.job_complete_callback = job_complete_callback + + def shutdown(self): + """ Indicates that the build controller server is in a shutdown state and that no new jobs + or workers should be performed. Existing workers should be cleaned up once their jobs + have completed + """ + raise NotImplementedError + + def schedule(self, job_item): + """ Schedules a queue item to be built. Returns True if the item was properly scheduled + and False if all workers are busy. + """ + raise NotImplementedError + + def initialize(self): + """ Runs any initialization code for the manager. Called once the server is in a ready state. + """ + raise NotImplementedError + + def build_component_disposed(self, build_component, timed_out): + """ Method invoked whenever a build component has been disposed. The timed_out boolean indicates + whether the component's heartbeat timed out. + """ + raise NotImplementedError + + def job_completed(self, job_item, job_status, build_component): + """ Method invoked once a job_item has completed, in some manner. The job_status will be + one of: incomplete, error, complete. If incomplete, the job should be requeued. + """ + raise NotImplementedError \ No newline at end of file diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py new file mode 100644 index 000000000..59a3ac12d --- /dev/null +++ b/buildman/manager/enterprise.py @@ -0,0 +1,65 @@ +import logging +import uuid + +from buildman.manager.basemanager import BaseManager +from buildman.basecomponent import BaseComponent +from buildman.buildcomponent import BuildComponent + +from trollius.coroutines import From + +REGISTRATION_REALM = 'registration' +logger = logging.getLogger(__name__) + +class DynamicRegistrationComponent(BaseComponent): + """ Component session that handles dynamic registration of the builder components. """ + + def onConnect(self): + self.join(REGISTRATION_REALM) + + def onJoin(self, details): + logger.debug('Registering registration method') + yield From(self.register(self._worker_register, u'io.quay.buildworker.register')) + + def _worker_register(self): + realm = self.parent_manager.add_build_component() + logger.debug('Registering new build component+worker with realm %s', realm) + return realm + + +class EnterpriseManager(BaseManager): + """ Build manager implementation for the Enterprise Registry. """ + build_components = [] + shutting_down = False + + def initialize(self): + # Add a component which is used by build workers for dynamic registration. Unlike + # production, build workers in enterprise are long-lived and register dynamically. + self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent) + + def add_build_component(self): + # Generate a new unique realm ID for the build worker. + realm = str(uuid.uuid4()) + component = self.register_component(realm, BuildComponent, token="") + self.build_components.append(component) + return realm + + def schedule(self, job_item): + if self.shutting_down: + return False + + for component in self.build_components: + if component.is_ready(): + component.start_build(job_item) + return True + + return False + + def shutdown(self): + self.shutting_down = True + + def job_completed(self, job_item, job_status, build_component): + self.job_complete_callback(job_item, job_status) + + def component_disposed(self, build_component, timed_out): + self.build_components.remove(build_component) + diff --git a/buildman/server.py b/buildman/server.py new file mode 100644 index 000000000..abeb83a14 --- /dev/null +++ b/buildman/server.py @@ -0,0 +1,146 @@ +import logging +import trollius + +from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory +from autobahn.asyncio.websocket import WampWebSocketServerFactory, WampWebSocketServerProtocol +from autobahn.wamp import types +from autobahn.wamp.exception import ApplicationError + +from aiowsgi import create_server as create_wsgi_server +from flask import Flask +from threading import Event, Lock +from trollius.coroutines import From + +logger = logging.getLogger(__name__) + +WORK_CHECK_TIMEOUT = 30 +TIMEOUT_PERIOD_MINUTES = 20 +RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60 + +class BuilderServer(object): + """ Server which handles both HTTP and WAMP requests, managing the full state of the build + controller. + """ + _loop = None + _current_status = 'starting' + _current_components = [] + _job_count = 0 + + def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass): + self._session_factory = RouterSessionFactory(RouterFactory()) + + self._server_hostname = server_hostname + self._queue = queue + self._build_logs = build_logs + self._user_files = user_files + self._lifecycle_manager = lifecycle_manager_klass( + self._register_component, self._unregister_component, self._job_complete) + + self._shutdown_event = Event() + self._current_status = 'running' + + self._register_controller() + + def _register_controller(self): + controller_app = Flask('controller') + server = self + + @controller_app.route('/status') + def status(): + return server._current_status + + self._controller_app = controller_app + + def run(self, host): + logging.debug('Initializing the lifecycle manager') + self._lifecycle_manager.initialize() + + logging.debug('Initializing all members of the event loop') + loop = trollius.get_event_loop() + trollius.Task(self._initialize(loop, host)) + + logging.debug('Starting server on port 8080, with controller on port 8181') + try: + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + loop.close() + + def close(self): + logging.debug('Requested server shutdown') + self._current_status = 'shutting_down' + self._lifecycle_manager.shutdown() + self._shutdown_event.wait() + logging.debug('Shutting down server') + + def _register_component(self, realm, component_klass, **kwargs): + """ Registers a component with the server. The component_klass must derive from + BaseComponent. + """ + logging.debug('Registering component with realm %s', realm) + + component = component_klass(types.ComponentConfig(realm = realm), realm=realm, **kwargs) + component.server = self + component.parent_manager = self._lifecycle_manager + component.build_logs = self._build_logs + component.user_files = self._user_files + component.server_hostname = self._server_hostname + + self._current_components.append(component) + self._session_factory.add(component) + return component + + def _unregister_component(self, component): + logging.debug('Unregistering component with realm %s and token %s', + component.builder_realm, component.expected_token) + + self._current_components.remove(component) + self._session_factory.remove(component) + + def _job_complete(self, job_item, job_status): + if job_status == 'incomplete': + self._queue.incomplete(job_item, restore_retry=True) + elif job_status == 'error': + self._queue.incomplete(job_item, restore_retry=False) + else: + self._queue.complete(job) + + self._job_count = self._job_count - 1 + + if self._current_status == 'shutting_down' and not self._job_count: + self._shutdown_event.set() + + @trollius.coroutine + def _work_checker(self): + while self._current_status == 'running': + logger.debug('Checking for more work') + job_item = self._queue.get(processing_time=RESERVATION_SECONDS) + if job_item is None: + logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) + yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) + continue + + logger.debug('Build job found. Checking for an avaliable worker.') + if self._lifecycle_manager.schedule(job_item): + self._job_count = self._job_count + 1 + logger.debug('Build job scheduled. Running: %s', self._job_count) + else: + logger.debug('All workers are busy. Requeuing.') + self._queue.incomplete(job_item, restore_retry=True) + + yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) + + + @trollius.coroutine + def _initialize(self, loop, host): + # Create the WAMP server. + transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp = False) + transport_factory.setProtocolOptions(failByDrop = True) + + # Initialize the controller server and the WAMP server + create_wsgi_server(self._controller_app, loop=loop, host=host, port=8181) + yield From(loop.create_server(transport_factory, host, 8080)) + + # Initialize the work queue checker. + yield self._work_checker()