WIP: Get everything working except logging and job completion

This commit is contained in:
Joseph Schorr 2014-11-12 14:03:07 -05:00
parent eacf3f01d2
commit f93c0a46e8
6 changed files with 120 additions and 52 deletions

View file

@ -3,8 +3,10 @@ import logging
import json import json
import trollius import trollius
from autobahn.wamp.exception import ApplicationError
from trollius.coroutines import From from trollius.coroutines import From
from buildman.basecomponent import BaseComponent from buildman.basecomponent import BaseComponent
from buildman.buildpack import BuildPackage, BuildPackageException
HEARTBEAT_DELTA = datetime.timedelta(seconds=15) HEARTBEAT_DELTA = datetime.timedelta(seconds=15)
@ -41,28 +43,21 @@ class BuildComponent(BaseComponent):
def is_ready(self): def is_ready(self):
return self.current_phase == 'running' return self.current_phase == 'running'
def start_build(self, job_item): def start_build(self, build_job):
if not self.is_ready(): if not self.is_ready():
return False return False
self.job_item = job_item self.current_job = build_job
self._set_phase('building') self._set_phase('building')
# Parse the build job's config.
logger.debug('Parsing job JSON configuration block')
try:
job_config = json.loads(job_item.body)
except ValueError:
self._build_failure('Could not parse build job configuration')
return False
# Retrieve the job's buildpack. # Retrieve the job's buildpack.
buildpack_url = self.user_files.get_file_url(job_item.resource_key, requires_cors=False) buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
logger.debug('Retreiving build package: %s' % buildpack_url) requires_cors=False)
logger.debug('Retreiving build package: %s' % buildpack_url)
buildpack = None buildpack = None
try: try:
buildpack = BuildPack.from_url(buildpack_url) buildpack = BuildPackage.from_url(buildpack_url)
except BuildPackageException as bpe: except BuildPackageException as bpe:
self._build_failure('Could not retrieve build package', bpe) self._build_failure('Could not retrieve build package', bpe)
return False return False
@ -71,8 +66,9 @@ class BuildComponent(BaseComponent):
parsed_dockerfile = None parsed_dockerfile = None
logger.debug('Parsing dockerfile') logger.debug('Parsing dockerfile')
build_config = build_job.build_config()
try: try:
parsed_dockerfile = buildpack.parse_dockerfile(job_config.get('build_subdir')) parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
except BuildPackageException as bpe: except BuildPackageException as bpe:
self._build_failure('Could not find Dockerfile in build package', bpe) self._build_failure('Could not find Dockerfile in build package', bpe)
return False return False
@ -88,12 +84,12 @@ class BuildComponent(BaseComponent):
} }
# Add the pull robot information, if any. # Add the pull robot information, if any.
if job_config.get('pull_credentials') is not None: if build_config.get('pull_credentials') is not None:
base_image_information['username'] = job_config['pull_credentials'].get('username', '') base_image_information['username'] = build_config['pull_credentials'].get('username', '')
base_image_information['password'] = job_config['pull_credentials'].get('password', '') base_image_information['password'] = build_config['pull_credentials'].get('password', '')
# Retrieve the repository's full name. # Retrieve the repository's full name.
repo = job_config.repository repo = build_job.repo_build().repository
repository_name = repo.namespace_user.username + '/' + repo.name repository_name = repo.namespace_user.username + '/' + repo.name
# Parse the build queue item into build arguments. # Parse the build queue item into build arguments.
@ -111,17 +107,18 @@ class BuildComponent(BaseComponent):
# password: The password for pulling the base image (if any). # password: The password for pulling the base image (if any).
build_arguments = { build_arguments = {
'build_package': buildpack_url, 'build_package': buildpack_url,
'sub_directory': job_config.get('build_subdir', ''), 'sub_directory': build_config.get('build_subdir', ''),
'repository': repository_name, 'repository': repository_name,
'registry': self.server_hostname, 'registry': self.server_hostname,
'pull_token': job_item.access_token.code, 'pull_token': build_job.repo_build().access_token.code,
'push_token': job_item.access_token.code, 'push_token': build_job.repo_build().access_token.code,
'tag_names': job_config.get('docker_tags', ['latest']), 'tag_names': build_config.get('docker_tags', ['latest']),
'base_image': base_image_information 'base_image': base_image_information,
'cached_tag': build_job.determine_cached_tag() or ''
} }
# Invoke the build. # Invoke the build.
logger.debug('Invoking build: %s', token) logger.debug('Invoking build: %s', self.builder_realm)
logger.debug('With Arguments: %s', build_arguments) logger.debug('With Arguments: %s', build_arguments)
(self.call("io.quay.builder.build", **build_arguments) (self.call("io.quay.builder.build", **build_arguments)
@ -131,7 +128,8 @@ class BuildComponent(BaseComponent):
def _build_failure(self, error_message, exception=None): def _build_failure(self, error_message, exception=None):
# TODO: log this message # TODO: log this message
print error_kind print error_message
print exception
self._set_phase('running') self._set_phase('running')
def _build_complete(self, result): def _build_complete(self, result):
@ -204,9 +202,9 @@ class BuildComponent(BaseComponent):
def _dispose(self, timed_out=False): def _dispose(self, timed_out=False):
# If we still have a running job, then it has not completed and we need to tell the parent # If we still have a running job, then it has not completed and we need to tell the parent
# manager. # manager.
if self.job_item is not None: if self.current_job is not None:
self.parent_manager.job_completed(job_item, 'incomplete', self) self.parent_manager.job_completed(self.current_job, 'incomplete', self)
self.job_item = None self.current_job = None
# Unregister the current component so that it cannot be invoked again. # Unregister the current component so that it cannot be invoked again.
self.parent_manager.build_component_disposed(self, timed_out) self.parent_manager.build_component_disposed(self, timed_out)

59
buildman/buildjob.py Normal file
View file

@ -0,0 +1,59 @@
from data import model
import json
class BuildJobLoadException(Exception):
""" Exception raised if a build job could not be instantiated for some reason. """
pass
class BuildJob(object):
""" Represents a single in-progress build job. """
def __init__(self, job_item):
self._job_item = job_item
try:
self._job_details = json.loads(job_item.body)
except ValueError:
raise BuildJobLoadException(
'Could not parse build queue item config with ID %s' % self._job_details['build_uuid'])
try:
self._repo_build = model.get_repository_build(self._job_details['namespace'],
self._job_details['repository'],
self._job_details['build_uuid'])
except model.InvalidRepositoryBuildException:
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self._job_details['build_uuid'])
try:
self._build_config = json.loads(self._repo_build.job_config)
except ValueError:
raise BuildJobLoadException(
'Could not parse repository build job config with ID %s' % self._job_details['build_uuid'])
def determine_cached_tag(self):
""" Returns the tag to pull to prime the cache or None if none. """
# TODO(jschorr): Change this to use the more complicated caching rules, once we have caching
# be a pull of things besides the constructed tags.
tags = self._build_config.get('docker_tags', ['latest'])
existing_tags = model.list_repository_tags(self._job_details['namespace'],
self._job_details['repository'])
cached_tags = set(tags) & set([tag.name for tag in existing_tags])
if cached_tags:
return cached_tags[0]
return None
def job_item(self):
""" Returns the job's queue item. """
return self._job_item
def repo_build(self):
""" Returns the repository build DB row for the job. """
return self._repo_build
def build_config(self):
""" Returns the parsed repository build config for the job. """
return self._build_config

View file

@ -1,10 +1,11 @@
import tarfile import tarfile
import requests
import os
from tempfile import TemporaryFile, mkdtemp from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile from zipfile import ZipFile
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile from util.dockerfileparse import parse_dockerfile, ParsedDockerfile
class BuildPackageException(Exception): class BuildPackageException(Exception):
""" Exception raised when retrieving or parsing a build package. """ """ Exception raised when retrieving or parsing a build package. """
pass pass
@ -15,16 +16,16 @@ class BuildPackage(object):
def __init__(self, requests_file): def __init__(self, requests_file):
self._mime_processors = { self._mime_processors = {
'application/zip': BuildPack.__prepare_zip, 'application/zip': BuildPackage.__prepare_zip,
'application/x-zip-compressed': BuildPack.__prepare_zip, 'application/x-zip-compressed': BuildPackage.__prepare_zip,
'text/plain': BuildPack.__prepare_dockerfile, 'text/plain': BuildPackage.__prepare_dockerfile,
'application/octet-stream': BuildPack.__prepare_dockerfile, 'application/octet-stream': BuildPackage.__prepare_dockerfile,
'application/x-tar': BuildPack.__prepare_tarball, 'application/x-tar': BuildPackage.__prepare_tarball,
'application/gzip': BuildPack.__prepare_tarball, 'application/gzip': BuildPackage.__prepare_tarball,
'application/x-gzip': BuildPack.__prepare_tarball, 'application/x-gzip': BuildPackage.__prepare_tarball,
} }
c_type = buildpack_resource.headers['content-type'] c_type = requests_file.headers['content-type']
c_type = c_type.split(';')[0] if ';' in c_type else c_type c_type = c_type.split(';')[0] if ';' in c_type else c_type
if c_type not in self._mime_processors: if c_type not in self._mime_processors:
@ -36,7 +37,7 @@ class BuildPackage(object):
except Exception as ex: except Exception as ex:
raise BuildPackageException(ex.message) raise BuildPackageException(ex.message)
def parse_dockerfile(self, build_subdirectory): def parse_dockerfile(self, subdirectory):
dockerfile_path = os.path.join(self._package_directory, subdirectory, 'Dockerfile') dockerfile_path = os.path.join(self._package_directory, subdirectory, 'Dockerfile')
if not os.path.exists(dockerfile_path): if not os.path.exists(dockerfile_path):
if subdirectory: if subdirectory:
@ -49,10 +50,10 @@ class BuildPackage(object):
with open(dockerfile_path, 'r') as dockerfileobj: with open(dockerfile_path, 'r') as dockerfileobj:
return parse_dockerfile(dockerfileobj.read()) return parse_dockerfile(dockerfileobj.read())
@classmethod @staticmethod
def from_url(url): def from_url(url):
buildpack_resource = requests.get(buildpack_url, stream=True) buildpack_resource = requests.get(url, stream=True)
return BuildPackage(buildpack_resource, c_type) return BuildPackage(buildpack_resource)
@staticmethod @staticmethod
def __prepare_zip(request_file): def __prepare_zip(request_file):

View file

@ -12,7 +12,7 @@ class BaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def schedule(self, job_item): def schedule(self, build_job):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled """ Schedules a queue item to be built. Returns True if the item was properly scheduled
and False if all workers are busy. and False if all workers are busy.
""" """
@ -29,7 +29,7 @@ class BaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def job_completed(self, job_item, job_status, build_component): def job_completed(self, build_job, job_status, build_component):
""" Method invoked once a job_item has completed, in some manner. The job_status will be """ Method invoked once a job_item has completed, in some manner. The job_status will be
one of: incomplete, error, complete. If incomplete, the job should be requeued. one of: incomplete, error, complete. If incomplete, the job should be requeued.
""" """

View file

@ -43,13 +43,13 @@ class EnterpriseManager(BaseManager):
self.build_components.append(component) self.build_components.append(component)
return realm return realm
def schedule(self, job_item): def schedule(self, build_job, loop):
if self.shutting_down: if self.shutting_down:
return False return False
for component in self.build_components: for component in self.build_components:
if component.is_ready(): if component.is_ready():
component.start_build(job_item) loop.call_soon(component.start_build, build_job)
return True return True
return False return False
@ -57,8 +57,8 @@ class EnterpriseManager(BaseManager):
def shutdown(self): def shutdown(self):
self.shutting_down = True self.shutting_down = True
def job_completed(self, job_item, job_status, build_component): def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(job_item, job_status) self.job_complete_callback(build_job, job_status)
def component_disposed(self, build_component, timed_out): def component_disposed(self, build_component, timed_out):
self.build_components.remove(build_component) self.build_components.remove(build_component)

View file

@ -11,6 +11,8 @@ from flask import Flask
from threading import Event, Lock from threading import Event, Lock
from trollius.coroutines import From from trollius.coroutines import From
from buildjob import BuildJob, BuildJobLoadException
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 30 WORK_CHECK_TIMEOUT = 30
@ -98,11 +100,11 @@ class BuilderServer(object):
self._current_components.remove(component) self._current_components.remove(component)
self._session_factory.remove(component) self._session_factory.remove(component)
def _job_complete(self, job_item, job_status): def _job_complete(self, build_job, job_status):
if job_status == 'incomplete': if job_status == 'incomplete':
self._queue.incomplete(job_item, restore_retry=True) self._queue.incomplete(build_job.job_item(), restore_retry=True)
elif job_status == 'error': elif job_status == 'error':
self._queue.incomplete(job_item, restore_retry=False) self._queue.incomplete(build_job.job_item(), restore_retry=False)
else: else:
self._queue.complete(job) self._queue.complete(job)
@ -121,8 +123,14 @@ class BuilderServer(object):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
continue continue
try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.') logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(job_item): if self._lifecycle_manager.schedule(build_job, self._loop):
self._job_count = self._job_count + 1 self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count) logger.debug('Build job scheduled. Running: %s', self._job_count)
else: else:
@ -134,6 +142,8 @@ class BuilderServer(object):
@trollius.coroutine @trollius.coroutine
def _initialize(self, loop, host): def _initialize(self, loop, host):
self._loop = loop
# Create the WAMP server. # Create the WAMP server.
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp = False) transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp = False)
transport_factory.setProtocolOptions(failByDrop = True) transport_factory.setProtocolOptions(failByDrop = True)