Merge branch 'bagger'

This commit is contained in:
Joseph Schorr 2014-12-01 12:48:59 -05:00
commit 72d613614d
36 changed files with 1123 additions and 17 deletions

View file

@ -52,6 +52,7 @@ ADD conf/init/nginx /etc/service/nginx
ADD conf/init/diffsworker /etc/service/diffsworker
ADD conf/init/notificationworker /etc/service/notificationworker
ADD conf/init/buildlogsarchiver /etc/service/buildlogsarchiver
ADD conf/init/buildmanager /etc/service/buildmanager
# Download any external libs.
RUN mkdir static/fonts static/ldn

0
buildman/__init__.py Normal file
View file

46
buildman/builder.py Normal file
View file

@ -0,0 +1,46 @@
import logging
import os
import features
from app import app, userfiles as user_files, build_logs, dockerfile_build_queue
from buildman.manager.enterprise import EnterpriseManager
from buildman.server import BuilderServer
from trollius import SSLContext
logger = logging.getLogger(__name__)
BUILD_MANAGERS = {
'enterprise': EnterpriseManager
}
def run_build_manager():
if not features.BUILD_SUPPORT:
logger.debug('Building is disabled. Please enable the feature flag')
return
build_manager_config = app.config.get('BUILD_MANAGER')
if build_manager_config is None:
return
logger.debug('Asking to start build manager with lifecycle "%s"', build_manager_config[0])
manager_klass = BUILD_MANAGERS.get(build_manager_config[0])
if manager_klass is None:
return
logger.debug('Starting build manager with lifecycle "%s"', build_manager_config[0])
ssl_context = None
if os.environ.get('SSL_CONFIG'):
logger.debug('Loading SSL cert and key')
ssl_context = SSLContext()
ssl_context.load_cert_chain(os.environ.get('SSL_CONFIG') + '/ssl.cert',
os.environ.get('SSL_CONFIG') + '/ssl.key')
server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs,
user_files, manager_klass)
server.run('0.0.0.0', ssl=ssl_context)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run_build_manager()

View file

View file

@ -0,0 +1,10 @@
from autobahn.asyncio.wamp import ApplicationSession
class BaseComponent(ApplicationSession):
""" Base class for all registered component sessions in the server. """
def __init__(self, config, **kwargs):
ApplicationSession.__init__(self, config)
self.server = None
self.parent_manager = None
self.build_logs = None
self.user_files = None

View file

@ -0,0 +1,365 @@
import datetime
import time
import logging
import json
import trollius
import re
from autobahn.wamp.exception import ApplicationError
from trollius.coroutines import From
from buildman.server import BuildJobResult
from buildman.component.basecomponent import BaseComponent
from buildman.jobutil.buildpack import BuildPackage, BuildPackageException
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.workererror import WorkerError
from data.database import BUILD_PHASE
HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
HEARTBEAT_TIMEOUT = 10
INITIAL_TIMEOUT = 25
SUPPORTED_WORKER_VERSIONS = ['0.1-beta']
logger = logging.getLogger(__name__)
class ComponentStatus(object):
""" ComponentStatus represents the possible states of a component. """
JOINING = 'joining'
WAITING = 'waiting'
RUNNING = 'running'
BUILDING = 'building'
TIMED_OUT = 'timeout'
class BuildComponent(BaseComponent):
""" An application session component which conducts one (or more) builds. """
def __init__(self, config, realm=None, token=None, **kwargs):
self.expected_token = token
self.builder_realm = realm
self.parent_manager = None
self.server_hostname = None
self._component_status = ComponentStatus.JOINING
self._last_heartbeat = None
self._current_job = None
self._build_status = None
self._image_info = None
BaseComponent.__init__(self, config, **kwargs)
def onConnect(self):
self.join(self.builder_realm)
def onJoin(self, details):
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
self._set_status(ComponentStatus.WAITING)
def is_ready(self):
""" Determines whether a build component is ready to begin a build. """
return self._component_status == ComponentStatus.RUNNING
def start_build(self, build_job):
""" Starts a build. """
self._current_job = build_job
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
self._image_info = {}
self._set_status(ComponentStatus.BUILDING)
# Retrieve the job's buildpack.
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
requires_cors=False)
logger.debug('Retreiving build package: %s', buildpack_url)
buildpack = None
try:
buildpack = BuildPackage.from_url(buildpack_url)
except BuildPackageException as bpe:
self._build_failure('Could not retrieve build package', bpe)
return
# Extract the base image information from the Dockerfile.
parsed_dockerfile = None
logger.debug('Parsing dockerfile')
build_config = build_job.build_config()
try:
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
except BuildPackageException as bpe:
self._build_failure('Could not find Dockerfile in build package', bpe)
return
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_failure('Missing FROM line in Dockerfile')
return
base_image_information = {
'repository': image_and_tag_tuple[0],
'tag': image_and_tag_tuple[1]
}
# Extract the number of steps from the Dockerfile.
with self._build_status as status_dict:
status_dict['total_commands'] = len(parsed_dockerfile.commands)
# Add the pull robot information, if any.
if build_config.get('pull_credentials') is not None:
base_image_information['username'] = build_config['pull_credentials'].get('username', '')
base_image_information['password'] = build_config['pull_credentials'].get('password', '')
# Retrieve the repository's fully qualified name.
repo = build_job.repo_build().repository
repository_name = repo.namespace_user.username + '/' + repo.name
# Parse the build queue item into build arguments.
# build_package: URL to the build package to download and untar/unzip.
# sub_directory: The location within the build package of the Dockerfile and the build context.
# repository: The repository for which this build is occurring.
# registry: The registry for which this build is occuring (e.g. 'quay.io', 'staging.quay.io').
# pull_token: The token to use when pulling the cache for building.
# push_token: The token to use to push the built image.
# tag_names: The name(s) of the tag(s) for the newly built image.
# base_image: The image name and credentials to use to conduct the base image pull.
# repository: The repository to pull.
# tag: The tag to pull.
# username: The username for pulling the base image (if any).
# password: The password for pulling the base image (if any).
build_arguments = {
'build_package': buildpack_url,
'sub_directory': build_config.get('build_subdir', ''),
'repository': repository_name,
'registry': self.server_hostname,
'pull_token': build_job.repo_build().access_token.code,
'push_token': build_job.repo_build().access_token.code,
'tag_names': build_config.get('docker_tags', ['latest']),
'base_image': base_image_information,
'cached_tag': build_job.determine_cached_tag() or ''
}
# Invoke the build.
logger.debug('Invoking build: %s', self.builder_realm)
logger.debug('With Arguments: %s', build_arguments)
return (self
.call("io.quay.builder.build", **build_arguments)
.add_done_callback(self._build_complete))
@staticmethod
def _total_completion(statuses, total_images):
""" Returns the current amount completion relative to the total completion of a build. """
percentage_with_sizes = float(len(statuses.values())) / total_images
sent_bytes = sum([status['current'] for status in statuses.values()])
total_bytes = sum([status['total'] for status in statuses.values()])
return float(sent_bytes) / total_bytes * percentage_with_sizes
@staticmethod
def _process_pushpull_status(status_dict, current_phase, docker_data, images):
""" Processes the status of a push or pull by updating the provided status_dict and images. """
if not docker_data:
return
num_images = 0
status_completion_key = ''
if current_phase == 'pushing':
status_completion_key = 'push_completion'
num_images = status_dict['total_commands']
elif current_phase == 'pulling':
status_completion_key = 'pull_completion'
elif current_phase == 'priming-cache':
status_completion_key = 'cache_completion'
else:
return
if 'progressDetail' in docker_data and 'id' in docker_data:
image_id = docker_data['id']
detail = docker_data['progressDetail']
if 'current' in detail and 'total' in detail:
images[image_id] = detail
status_dict[status_completion_key] = \
BuildComponent._total_completion(images, max(len(images), num_images))
def _on_log_message(self, phase, json_data):
""" Tails log messages and updates the build status. """
# Parse any of the JSON data logged.
docker_data = {}
if json_data:
try:
docker_data = json.loads(json_data)
except ValueError:
pass
# Extract the current status message (if any).
fully_unwrapped = ''
keys_to_extract = ['error', 'status', 'stream']
for key in keys_to_extract:
if key in docker_data:
fully_unwrapped = docker_data[key]
break
# Determine if this is a step string.
current_step = None
current_status_string = str(fully_unwrapped.encode('utf-8'))
if current_status_string and phase == BUILD_PHASE.BUILDING:
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
if step_increment:
current_step = int(step_increment.group(1))
# Parse and update the phase and the status_dict. The status dictionary contains
# the pull/push progress, as well as the current step index.
with self._build_status as status_dict:
if self._build_status.set_phase(phase):
logger.debug('Build %s has entered a new phase: %s', self.builder_realm, phase)
BuildComponent._process_pushpull_status(status_dict, phase, docker_data, self._image_info)
# If the current message represents the beginning of a new step, then update the
# current command index.
if current_step is not None:
status_dict['current_command'] = current_step
# If the json data contains an error, then something went wrong with a push or pull.
if 'error' in docker_data:
self._build_status.set_error(docker_data['error'])
if current_step is not None:
self._build_status.set_command(current_status_string)
elif phase == BUILD_PHASE.BUILDING:
self._build_status.append_log(current_status_string)
def _build_failure(self, error_message, exception=None):
""" Handles and logs a failed build. """
self._build_status.set_error(error_message, {
'internal_error': exception.message if exception else None
})
build_id = self._current_job.repo_build().uuid
logger.warning('Build %s failed with message: %s', build_id, error_message)
# Mark that the build has finished (in an error state)
self._build_finished(BuildJobResult.ERROR)
def _build_complete(self, result):
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
try:
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
result.result()
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
self._build_finished(BuildJobResult.COMPLETE)
except ApplicationError as aex:
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
# Write the error to the log.
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data(),
internal_error=worker_error.is_internal_error())
# Mark the build as completed.
if worker_error.is_internal_error():
self._build_finished(BuildJobResult.INCOMPLETE)
else:
self._build_finished(BuildJobResult.ERROR)
def _build_finished(self, job_status):
""" Alerts the parent that a build has completed and sets the status back to running. """
self.parent_manager.job_completed(self._current_job, job_status, self)
self._current_job = None
# Set the component back to a running state.
self._set_status(ComponentStatus.RUNNING)
@staticmethod
def _ping():
""" Ping pong. """
return 'pong'
def _on_ready(self, token, version):
if not version in SUPPORTED_WORKER_VERSIONS:
logger.warning('Build component (token "%s") is running an out-of-date version: %s', version)
return False
if self._component_status != 'waiting':
logger.warning('Build component (token "%s") is already connected', self.expected_token)
return False
if token != self.expected_token:
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token)
return False
self._set_status(ComponentStatus.RUNNING)
# Start the heartbeat check and updating loop.
loop = trollius.get_event_loop()
loop.create_task(self._heartbeat())
logger.debug('Build worker %s is connected and ready', self.builder_realm)
return True
def _set_status(self, phase):
self._component_status = phase
def _on_heartbeat(self):
""" Updates the last known heartbeat. """
self._last_heartbeat = datetime.datetime.now()
@trollius.coroutine
def _heartbeat(self):
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
and updating the heartbeat in the build status dictionary (if applicable). This allows
the build system to catch crashes from either end.
"""
yield From(trollius.sleep(INITIAL_TIMEOUT))
while True:
# If the component is no longer running or actively building, nothing more to do.
if (self._component_status != ComponentStatus.RUNNING and
self._component_status != ComponentStatus.BUILDING):
return
# If there is an active build, write the heartbeat to its status.
build_status = self._build_status
if build_status is not None:
with build_status as status_dict:
status_dict['heartbeat'] = int(time.time())
# Mark the build item.
current_job = self._current_job
if current_job is not None:
self.parent_manager.job_heartbeat(current_job)
# Check the heartbeat from the worker.
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
if self._last_heartbeat and self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
self._timeout()
return
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
def _timeout(self):
self._set_status(ComponentStatus.TIMED_OUT)
logger.warning('Build component with realm %s has timed out', self.builder_realm)
self._dispose(timed_out=True)
def _dispose(self, timed_out=False):
# If we still have a running job, then it has not completed and we need to tell the parent
# manager.
if self._current_job is not None:
if timed_out:
self._build_status.set_error('Build worker timed out', internal_error=True)
self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self)
self._build_status = None
self._current_job = None
# Unregister the current component so that it cannot be invoked again.
self.parent_manager.build_component_disposed(self, timed_out)

View file

View file

@ -0,0 +1,60 @@
from data import model
import json
class BuildJobLoadException(Exception):
""" Exception raised if a build job could not be instantiated for some reason. """
pass
class BuildJob(object):
""" Represents a single in-progress build job. """
def __init__(self, job_item):
self._job_item = job_item
try:
self._job_details = json.loads(job_item.body)
except ValueError:
raise BuildJobLoadException(
'Could not parse build queue item config with ID %s' % self._job_details['build_uuid']
)
try:
self._repo_build = model.get_repository_build(self._job_details['namespace'],
self._job_details['repository'],
self._job_details['build_uuid'])
except model.InvalidRepositoryBuildException:
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self._job_details['build_uuid'])
try:
self._build_config = json.loads(self._repo_build.job_config)
except ValueError:
raise BuildJobLoadException(
'Could not parse repository build job config with ID %s' % self._job_details['build_uuid']
)
def determine_cached_tag(self):
""" Returns the tag to pull to prime the cache or None if none. """
# TODO(jschorr): Change this to use the more complicated caching rules, once we have caching
# be a pull of things besides the constructed tags.
tags = self._build_config.get('docker_tags', ['latest'])
existing_tags = model.list_repository_tags(self._job_details['namespace'],
self._job_details['repository'])
cached_tags = set(tags) & set([tag.name for tag in existing_tags])
if cached_tags:
return list(cached_tags)[0]
return None
def job_item(self):
""" Returns the job's queue item. """
return self._job_item
def repo_build(self):
""" Returns the repository build DB row for the job. """
return self._repo_build
def build_config(self):
""" Returns the parsed repository build config for the job. """
return self._build_config

View file

@ -0,0 +1,88 @@
import tarfile
import requests
import os
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from util.dockerfileparse import parse_dockerfile
from util.safetar import safe_extractall
class BuildPackageException(Exception):
""" Exception raised when retrieving or parsing a build package. """
pass
class BuildPackage(object):
""" Helper class for easy reading and updating of a Dockerfile build pack. """
def __init__(self, requests_file):
self._mime_processors = {
'application/zip': BuildPackage._prepare_zip,
'application/x-zip-compressed': BuildPackage._prepare_zip,
'text/plain': BuildPackage._prepare_dockerfile,
'application/octet-stream': BuildPackage._prepare_dockerfile,
'application/x-tar': BuildPackage._prepare_tarball,
'application/gzip': BuildPackage._prepare_tarball,
'application/x-gzip': BuildPackage._prepare_tarball,
}
c_type = requests_file.headers['content-type']
c_type = c_type.split(';')[0] if ';' in c_type else c_type
if c_type not in self._mime_processors:
raise BuildPackageException('Unknown build package mime type: %s' % c_type)
self._package_directory = None
try:
self._package_directory = self._mime_processors[c_type](requests_file)
except Exception as ex:
raise BuildPackageException(ex.message)
def parse_dockerfile(self, subdirectory):
dockerfile_path = os.path.join(self._package_directory, subdirectory, 'Dockerfile')
if not os.path.exists(dockerfile_path):
if subdirectory:
message = 'Build package did not contain a Dockerfile at sub directory %s.' % subdirectory
else:
message = 'Build package did not contain a Dockerfile at the root directory.'
raise BuildPackageException(message)
with open(dockerfile_path, 'r') as dockerfileobj:
return parse_dockerfile(dockerfileobj.read())
@staticmethod
def from_url(url):
buildpack_resource = requests.get(url, stream=True)
return BuildPackage(buildpack_resource)
@staticmethod
def _prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.content)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
return build_dir
@staticmethod
def _prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def _prepare_tarball(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream:
safe_extractall(tar_stream, build_dir)
return build_dir

View file

@ -0,0 +1,52 @@
from data.database import BUILD_PHASE
class StatusHandler(object):
""" Context wrapper for writing status to build logs. """
def __init__(self, build_logs, repository_build):
self._current_phase = None
self._repository_build = repository_build
self._uuid = repository_build.uuid
self._build_logs = build_logs
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
}
# Write the initial status.
self.__exit__(None, None, None)
def _append_log_message(self, log_message, log_type=None, log_data=None):
self._build_logs.append_log_message(self._uuid, log_message, log_type, log_data)
def append_log(self, log_message, extra_data=None):
self._append_log_message(log_message, log_data=extra_data)
def set_command(self, command, extra_data=None):
self._append_log_message(command, self._build_logs.COMMAND, extra_data)
def set_error(self, error_message, extra_data=None, internal_error=False):
self.set_phase(BUILD_PHASE.INTERNAL_ERROR if internal_error else BUILD_PHASE.ERROR)
extra_data = extra_data or {}
extra_data['internal_error'] = internal_error
self._append_log_message(error_message, self._build_logs.ERROR, extra_data)
def set_phase(self, phase, extra_data=None):
if phase == self._current_phase:
return False
self._current_phase = phase
self._append_log_message(phase, self._build_logs.PHASE, extra_data)
self._repository_build.phase = phase
self._repository_build.save()
return True
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
self._build_logs.set_status(self._uuid, self._status)

View file

@ -0,0 +1,84 @@
class WorkerError(object):
""" Helper class which represents errors raised by a build worker. """
def __init__(self, error_code, base_message=None):
self._error_code = error_code
self._base_message = base_message
self._error_handlers = {
'io.quay.builder.buildpackissue': {
'message': 'Could not load build package',
'is_internal': True
},
'io.quay.builder.cannotextractbuildpack': {
'message': 'Could not extract the contents of the build package'
},
'io.quay.builder.cannotpullforcache': {
'message': 'Could not pull cached image',
'is_internal': True
},
'io.quay.builder.cannotpullbaseimage': {
'message': 'Could not pull base image',
'show_base_error': True
},
'io.quay.builder.internalerror': {
'message': 'An internal error occurred while building. Please submit a ticket.'
},
'io.quay.builder.buildrunerror': {
'message': 'Could not start the build process',
'is_internal': True
},
'io.quay.builder.builderror': {
'message': 'A build step failed',
'show_base_error': True
},
'io.quay.builder.tagissue': {
'message': 'Could not tag built image',
'is_internal': True
},
'io.quay.builder.pushissue': {
'message': 'Could not push built image',
'show_base_error': True,
'is_internal': True
},
'io.quay.builder.dockerconnecterror': {
'message': 'Could not connect to Docker daemon',
'is_internal': True
},
'io.quay.builder.missingorinvalidargument': {
'message': 'Missing required arguments for builder',
'is_internal': True
}
}
def is_internal_error(self):
handler = self._error_handlers.get(self._error_code)
return handler.get('is_internal', False) if handler else True
def public_message(self):
handler = self._error_handlers.get(self._error_code)
if not handler:
return 'An unknown error occurred'
message = handler['message']
if handler.get('show_base_error', False) and self._base_message:
message = message + ': ' + self._base_message
return message
def extra_data(self):
if self._base_message:
return {
'base_error': self._base_message
}
return {}

View file

View file

@ -0,0 +1,49 @@
class BaseManager(object):
""" Base for all worker managers. """
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
job_complete_callback):
self.register_component = register_component
self.unregister_component = unregister_component
self.job_heartbeat_callback = job_heartbeat_callback
self.job_complete_callback = job_complete_callback
def job_heartbeat(self, build_job):
""" Method invoked to tell the manager that a job is still running. This method will be called
every few minutes. """
self.job_heartbeat_callback(build_job)
def setup_time(self):
""" Returns the number of seconds that the build system should wait before allowing the job
to be picked up again after called 'schedule'.
"""
raise NotImplementedError
def shutdown(self):
""" Indicates that the build controller server is in a shutdown state and that no new jobs
or workers should be performed. Existing workers should be cleaned up once their jobs
have completed
"""
raise NotImplementedError
def schedule(self, build_job, loop):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled
and False if all workers are busy.
"""
raise NotImplementedError
def initialize(self):
""" Runs any initialization code for the manager. Called once the server is in a ready state.
"""
raise NotImplementedError
def build_component_disposed(self, build_component, timed_out):
""" Method invoked whenever a build component has been disposed. The timed_out boolean indicates
whether the component's heartbeat timed out.
"""
raise NotImplementedError
def job_completed(self, build_job, job_status, build_component):
""" Method invoked once a job_item has completed, in some manner. The job_status will be
one of: incomplete, error, complete. If incomplete, the job should be requeued.
"""
raise NotImplementedError

View file

@ -0,0 +1,72 @@
import logging
import uuid
from buildman.component.basecomponent import BaseComponent
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.basemanager import BaseManager
from trollius.coroutines import From
REGISTRATION_REALM = 'registration'
logger = logging.getLogger(__name__)
class DynamicRegistrationComponent(BaseComponent):
""" Component session that handles dynamic registration of the builder components. """
def onConnect(self):
self.join(REGISTRATION_REALM)
def onJoin(self, details):
logger.debug('Registering registration method')
yield From(self.register(self._worker_register, u'io.quay.buildworker.register'))
def _worker_register(self):
realm = self.parent_manager.add_build_component()
logger.debug('Registering new build component+worker with realm %s', realm)
return realm
class EnterpriseManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
build_components = []
shutting_down = False
def initialize(self):
# Add a component which is used by build workers for dynamic registration. Unlike
# production, build workers in enterprise are long-lived and register dynamically.
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
def setup_time(self):
# Builders are already registered, so the setup time should be essentially instant. We therefore
# only return a minute here.
return 60
def add_build_component(self):
""" Adds a new build component for an Enterprise Registry. """
# Generate a new unique realm ID for the build worker.
realm = str(uuid.uuid4())
component = self.register_component(realm, BuildComponent, token="")
self.build_components.append(component)
return realm
def schedule(self, build_job, loop):
""" Schedules a build for an Enterprise Registry. """
if self.shutting_down:
return False
for component in self.build_components:
if component.is_ready():
loop.call_soon(component.start_build, build_job)
return True
return False
def shutdown(self):
self.shutting_down = True
def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(build_job, job_status)
def build_component_disposed(self, build_component, timed_out):
self.build_components.remove(build_component)

177
buildman/server.py Normal file
View file

@ -0,0 +1,177 @@
import logging
import trollius
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory
from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event
from trollius.coroutines import From
from datetime import datetime, timedelta
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data.queue import WorkQueue
logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
WEBSOCKET_PORT = 8787
CONTROLLER_PORT = 8686
class BuildJobResult(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
self._loop = None
self._current_status = 'starting'
self._current_components = []
self._job_count = 0
self._session_factory = RouterSessionFactory(RouterFactory())
self._server_hostname = server_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete
)
self._shutdown_event = Event()
self._current_status = 'running'
self._register_controller()
def _register_controller(self):
controller_app = Flask('controller')
server = self
@controller_app.route('/status')
def status():
return server._current_status
self._controller_app = controller_app
def run(self, host, ssl=None):
logger.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize()
logger.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
trollius.Task(self._initialize(loop, host, ssl))
logger.debug('Starting server on port %s, with controller on port %s', WEBSOCKET_PORT,
CONTROLLER_PORT)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
def close(self):
logger.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logger.debug('Shutting down server')
def _register_component(self, realm, component_klass, **kwargs):
""" Registers a component with the server. The component_klass must derive from
BaseComponent.
"""
logger.debug('Registering component with realm %s', realm)
component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs)
component.server = self
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.server_hostname = self._server_hostname
self._current_components.append(component)
self._session_factory.add(component)
return component
def _unregister_component(self, component):
logger.debug('Unregistering component with realm %s and token %s',
component.builder_realm, component.expected_token)
self._current_components.remove(component)
self._session_factory.remove(component)
def _job_heartbeat(self, build_job):
WorkQueue.extend_processing(build_job.job_item(), seconds_from_now=JOB_TIMEOUT_SECONDS,
retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION)
def _job_complete(self, build_job, job_status):
if job_status == BuildJobResult.INCOMPLETE:
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
elif job_status == BuildJobResult.ERROR:
self._queue.incomplete(build_job.job_item(), restore_retry=False)
else:
self._queue.complete(build_job.job_item())
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
# TODO(jschorr): check for work here?
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
logger.debug('Checking for more work')
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
continue
try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(build_job, self._loop):
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host, ssl=None):
self._loop = loop
# Create the WAMP server.
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False)
transport_factory.setProtocolOptions(failByDrop=True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=CONTROLLER_PORT, ssl=ssl)
yield From(loop.create_server(transport_factory, host, WEBSOCKET_PORT, ssl=ssl))
# Initialize the work queue checker.
yield From(self._work_checker())

View file

@ -23,3 +23,11 @@ upstream verbs_app_server {
upstream registry_app_server {
server unix:/tmp/gunicorn_registry.sock fail_timeout=0;
}
upstream build_manager_controller_server {
server localhost:8686;
}
upstream build_manager_websocket_server {
server localhost:8787;
}

2
conf/init/buildmanager/log/run Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec svlogd /var/log/buildmanager/

8
conf/init/buildmanager/run Executable file
View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting internal build manager'
cd /
venv/bin/python -m buildman.builder 2>&1
echo 'Internal build manager exited'

View file

@ -60,3 +60,15 @@ location /v1/_ping {
add_header X-Docker-Registry-Standalone 0;
return 200 'true';
}
location ~ ^/b1/controller(/?)(.*) {
proxy_pass http://build_manager_controller_server/$2;
proxy_read_timeout 2000;
}
location ~ ^/b1/socket(/?)(.*) {
proxy_pass http://build_manager_websocket_server/$2;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

View file

@ -163,6 +163,8 @@ class DefaultConfig(object):
# Feature Flag: Whether users can be renamed
FEATURE_USER_RENAME = False
BUILD_MANAGER = ('enterprise', {})
DISTRIBUTED_STORAGE_CONFIG = {
'local_eu': ['LocalStorage', {'storage_path': 'test/data/registry/eu'}],
'local_us': ['LocalStorage', {'storage_path': 'test/data/registry/us'}],

View file

@ -420,6 +420,7 @@ class RepositoryTag(BaseModel):
class BUILD_PHASE(object):
""" Build phases enum """
ERROR = 'error'
INTERNAL_ERROR = 'internalerror'
UNPACKING = 'unpacking'
PULLING = 'pulling'
BUILDING = 'building'

View file

@ -127,12 +127,15 @@ class WorkQueue(object):
incomplete_item_obj.save()
self._currently_processing = False
def extend_processing(self, queue_item, seconds_from_now):
@staticmethod
def extend_processing(queue_item, seconds_from_now, retry_count=None,
minimum_extension=MINIMUM_EXTENSION):
new_expiration = datetime.utcnow() + timedelta(seconds=seconds_from_now)
# Only actually write the new expiration to the db if it moves the expiration some minimum
queue_item_obj = QueueItem.get(QueueItem.id == queue_item.id)
if new_expiration - queue_item_obj.processing_expires > MINIMUM_EXTENSION:
with self._transaction_factory(db):
queue_item_obj.processing_expires = new_expiration
queue_item_obj.save()
if new_expiration - queue_item.processing_expires > minimum_extension:
if retry_count is not None:
queue_item.retries_remaining = retry_count
queue_item.processing_expires = new_expiration
queue_item.save()

View file

@ -1,5 +1,7 @@
import logging
import json
import time
import datetime
from flask import request, redirect
@ -9,7 +11,7 @@ from endpoints.api import (RepositoryParamResource, parse_args, query_param, nic
ApiResource, internal_only, format_date, api, Unauthorized, NotFound)
from endpoints.common import start_build
from endpoints.trigger import BuildTrigger
from data import model
from data import model, database
from auth.auth_context import get_authenticated_user
from auth.permissions import ModifyRepositoryPermission, AdministerOrganizationPermission
from data.buildlogs import BuildStatusRetrievalError
@ -65,6 +67,13 @@ def build_status_view(build_obj, can_write=False):
status = {}
phase = 'cannot_load'
# If the status contains a heartbeat, then check to see if has been written in the last few
# minutes. If not, then the build timed out.
if status is not None and 'heartbeat' in status and status['heartbeat']:
heartbeat = datetime.datetime.fromtimestamp(status['heartbeat'])
if datetime.datetime.now() - heartbeat > datetime.timedelta(minutes=1):
phase = database.BUILD_PHASE.INTERNAL_ERROR
logger.debug('Can write: %s job_config: %s', can_write, build_obj.job_config)
resp = {
'id': build_obj.uuid,

View file

@ -150,8 +150,10 @@ def raise_unsupported():
class GithubBuildTrigger(BuildTrigger):
@staticmethod
def _get_client(auth_token):
return Github(auth_token, client_id=github_trigger.client_id(),
client_secret=github_trigger.client_secret())
return Github(auth_token,
base_url=github_trigger.api_endpoint(),
client_id=github_trigger.client_id(),
client_secret=github_trigger.client_secret())
@classmethod
def service_name(cls):

1
local-run.sh Executable file
View file

@ -0,0 +1 @@
gunicorn -c conf/gunicorn_local.py application:application

1
local-test.sh Executable file
View file

@ -0,0 +1 @@
TEST=true python -m unittest discover

View file

@ -1,3 +1,6 @@
autobahn
aiowsgi
trollius
peewee
flask
py-bcrypt

View file

@ -17,7 +17,9 @@ SQLAlchemy==0.9.8
Werkzeug==0.9.6
git+https://github.com/DevTable/aniso8601-fake.git
git+https://github.com/DevTable/anunidecode.git
aiowsgi==0.3
alembic==0.6.7
autobahn==0.9.3-3
backports.ssl-match-hostname==3.4.0.2
beautifulsoup4==4.3.2
blinker==1.3
@ -51,6 +53,7 @@ reportlab==2.7
requests==2.4.3
six==1.8.0
stripe==1.19.1
trollius==1.0.3
tzlocal==1.1.2
websocket-client==0.21.0
wsgiref==0.1.2

View file

@ -1 +0,0 @@
gunicorn -c conf/gunicorn_local.py application:application

View file

@ -864,6 +864,10 @@ i.toggle-icon:hover {
background-color: red;
}
.phase-icon.internalerror {
background-color: #DFFF00;
}
.phase-icon.waiting, .phase-icon.unpacking, .phase-icon.starting, .phase-icon.initializing {
background-color: #ddd;
}
@ -876,6 +880,10 @@ i.toggle-icon:hover {
background-color: #f0ad4e;
}
.phase-icon.priming-cache {
background-color: #ddd;
}
.phase-icon.pushing {
background-color: #5cb85c;
}
@ -4872,3 +4880,11 @@ i.slack-icon {
width: 120px;
padding-right: 10px;
}
.progress.active .progress-bar {
/* Note: There is a bug in Chrome which results in high CPU usage for active progress-bars
due to their animation. This enables the GPU for the rendering, which cuts CPU usage in
half (although it is still not great)
*/
transform: translateZ(0);
}

View file

@ -9,7 +9,6 @@
</span>
</span>
<div class="alert alert-danger" ng-if="error.message == 'HTTP code: 403' && getLocalPullInfo().isLocal">
<div ng-if="getLocalPullInfo().login">
Note: The credentials <b>{{ getLocalPullInfo().login.username }}</b> for registry <b>{{ getLocalPullInfo().login.registry }}</b> cannot

View file

@ -1,7 +1,10 @@
<span class="trigger-description-element" ng-switch on="trigger.service">
<span ng-switch-when="github">
<i class="fa fa-github fa-lg" style="margin-right: 6px" data-title="GitHub" bs-tooltip="tooltip.title"></i>
Push to GitHub repository <a href="https://github.com/{{ trigger.config.build_source }}" target="_new">{{ trigger.config.build_source }}</a>
Push to GitHub <span ng-if="KeyService.isEnterprise('github-trigger')">Enterprise</span> repository
<a href="{{ KeyService['githubTriggerEndpoint'] }}{{ trigger.config.build_source }}" target="_new">
{{ trigger.config.build_source }}
</a>
<div style="margin-top: 4px; margin-left: 26px; font-size: 12px; color: gray;" ng-if="!short">
<div>
<span class="trigger-description-subtitle">Branches/Tags:</span>

View file

@ -1730,14 +1730,22 @@ quayApp = angular.module('quay', quayDependencies, function($provide, cfpLoading
keyService['githubEndpoint'] = oauth['GITHUB_LOGIN_CONFIG']['GITHUB_ENDPOINT'];
keyService['githubTriggerAuthorizeUrl'] = oauth['GITHUB_LOGIN_CONFIG']['AUTHORIZE_ENDPOINT'];
keyService['githubTriggerEndpoint'] = oauth['GITHUB_TRIGGER_CONFIG']['GITHUB_ENDPOINT'];
keyService['githubTriggerAuthorizeUrl'] = oauth['GITHUB_TRIGGER_CONFIG']['AUTHORIZE_ENDPOINT'];
keyService['githubLoginScope'] = 'user:email';
keyService['googleLoginScope'] = 'openid email';
keyService.isEnterprise = function(service) {
var isGithubEnterprise = keyService['githubLoginUrl'].indexOf('https://github.com/') < 0;
return service == 'github' && isGithubEnterprise;
switch (service) {
case 'github':
return keyService['githubLoginUrl'].indexOf('https://github.com/') < 0;
case 'github-trigger':
return keyService['githubTriggerAuthorizeUrl'].indexOf('https://github.com/') < 0;
}
return false;
};
keyService.getExternalLoginUrl = function(service, action) {
@ -4747,6 +4755,11 @@ quayApp.directive('buildLogError', function () {
'entries': '=entries'
},
controller: function($scope, $element, Config) {
$scope.isInternalError = function() {
var entry = $scope.entries[$scope.entries.length - 1];
return entry && entry.data && entry.data['internal_error'];
};
$scope.getLocalPullInfo = function() {
if ($scope.entries.__localpull !== undefined) {
return $scope.entries.__localpull;
@ -4802,7 +4815,9 @@ quayApp.directive('triggerDescription', function () {
'trigger': '=trigger',
'short': '=short'
},
controller: function($scope, $element) {
controller: function($scope, $element, KeyService, TriggerService) {
$scope.KeyService = KeyService;
$scope.TriggerService = TriggerService;
}
};
return directiveDefinitionObject;
@ -5680,6 +5695,9 @@ quayApp.directive('buildMessage', function () {
case 'building':
return 'Building image from Dockerfile';
case 'priming-cache':
return 'Priming cache for build';
case 'pushing':
return 'Pushing image built from Dockerfile';
@ -5688,6 +5706,9 @@ quayApp.directive('buildMessage', function () {
case 'error':
return 'Dockerfile build failed';
case 'internalerror':
return 'An internal system error occurred while building; the build will be retried in the next few minutes.';
}
};
}
@ -5721,6 +5742,10 @@ quayApp.directive('buildProgress', function () {
return buildInfo.status.push_completion * 100;
break;
case 'priming-cache':
return buildInfo.status.cache_completion * 100;
break;
case 'complete':
return 100;
break;

View file

@ -1331,6 +1331,7 @@ function RepoAdminCtrl($scope, Restangular, ApiService, KeyService, TriggerServi
$scope.Features = Features;
$scope.TriggerService = TriggerService;
$scope.KeyService = KeyService;
$scope.permissions = {'team': [], 'user': [], 'loading': 2};
$scope.logsShown = 0;

View file

@ -308,7 +308,8 @@
<ul class="dropdown-menu dropdown-menu-right pull-right">
<li>
<a href="{{ TriggerService.getRedirectUrl('github', repo.namespace, repo.name) }}">
<i class="fa fa-github fa-lg"></i>GitHub - Repository Push
<i class="fa fa-github fa-lg"></i>
GitHub <span ng-if="KeyService.isEnterprise('github-trigger')">Enterprise</span> - Repository Push
</a>
</li>
</ul>

View file

@ -52,6 +52,9 @@ class GithubOAuthConfig(OAuthConfig):
def _api_endpoint(self):
return self.config.get('API_ENDPOINT', self._get_url(self._endpoint(), '/api/v3/'))
def api_endpoint(self):
return self._api_endpoint()[0:-1]
def user_endpoint(self):
api_endpoint = self._api_endpoint()
return self._get_url(api_endpoint, 'user')