Merge branch 'ephemeral'

This commit is contained in:
Jake Moshenko 2015-01-29 12:46:50 -05:00
commit f881c6c6e6
26 changed files with 1045 additions and 1010 deletions

View file

@ -9,14 +9,8 @@ version = 1
[[container]]
name = "quay"
Dockerfile = "Dockerfile.web"
Dockerfile = "Dockerfile"
project = "quay"
tags = ["git:short"]
[[container]]
name = "builder"
Dockerfile = "Dockerfile.buildworker"
project = "builder"
tags = ["git:short"]
# vim:ft=toml

View file

@ -1,16 +1,12 @@
# vim:ft=dockerfile
###############################
# BEGIN COMMON SECION
###############################
FROM phusion/baseimage:0.9.15
FROM phusion/baseimage:0.9.16
ENV DEBIAN_FRONTEND noninteractive
ENV HOME /root
# Install the dependencies.
RUN apt-get update # 11DEC2014
RUN apt-get update # 29JAN2015
# New ubuntu packages should be added as their own apt-get install lines below the existing install commands
RUN apt-get install -y git python-virtualenv python-dev libjpeg8 libjpeg62 libjpeg62-dev libevent-2.0.5 libevent-dev gdebi-core g++ libmagic1 phantomjs nodejs npm libldap-2.4-2 libldap2-dev libsasl2-modules libsasl2-dev libpq5 libpq-dev libfreetype6-dev libffi-dev
@ -20,15 +16,6 @@ ADD requirements.txt requirements.txt
RUN virtualenv --distribute venv
RUN venv/bin/pip install -r requirements.txt
RUN apt-get remove -y --auto-remove python-dev g++ libjpeg62-dev libevent-dev libldap2-dev libsasl2-dev libpq-dev libffi-dev
###############################
# END COMMON SECION
###############################
# Remove SSH.
RUN rm -rf /etc/service/sshd /etc/my_init.d/00_regen_ssh_host_keys.sh
# Install the binary dependencies
ADD binary_dependencies binary_dependencies
RUN gdebi --n binary_dependencies/*.deb
@ -41,6 +28,10 @@ RUN npm install -g grunt-cli
ADD grunt grunt
RUN cd grunt && npm install
RUN apt-get remove -y --auto-remove python-dev g++ libjpeg62-dev libevent-dev libldap2-dev libsasl2-dev libpq-dev libffi-dev
RUN apt-get autoremove -y
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Add all of the files!
ADD . .
@ -65,14 +56,9 @@ ADD conf/init/buildmanager /etc/service/buildmanager
RUN mkdir static/fonts static/ldn
RUN venv/bin/python -m external_libraries
RUN apt-get autoremove -y
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Run the tests
RUN TEST=true venv/bin/python -m unittest discover
VOLUME ["/conf/stack", "/var/log", "/datastorage", "/tmp"]
VOLUME ["/conf/stack", "/var/log", "/datastorage", "/tmp", "/conf/etcd"]
EXPOSE 443 8443 80
CMD ["/sbin/my_init"]

View file

@ -1,49 +0,0 @@
# vim:ft=dockerfile
###############################
# BEGIN COMMON SECION
###############################
FROM phusion/baseimage:0.9.15
ENV DEBIAN_FRONTEND noninteractive
ENV HOME /root
# Install the dependencies.
RUN apt-get update # 11DEC2014
# New ubuntu packages should be added as their own apt-get install lines below the existing install commands
RUN apt-get install -y git python-virtualenv python-dev libjpeg8 libjpeg62 libjpeg62-dev libevent-2.0.5 libevent-dev gdebi-core g++ libmagic1 phantomjs nodejs npm libldap-2.4-2 libldap2-dev libsasl2-modules libsasl2-dev libpq5 libpq-dev libfreetype6-dev libffi-dev
# Build the python dependencies
ADD requirements.txt requirements.txt
RUN virtualenv --distribute venv
RUN venv/bin/pip install -r requirements.txt
RUN apt-get remove -y --auto-remove python-dev g++ libjpeg62-dev libevent-dev libldap2-dev libsasl2-dev libpq-dev libffi-dev
###############################
# END COMMON SECION
###############################
RUN apt-get install -y lxc aufs-tools
RUN usermod -v 100000-200000 -w 100000-200000 root
ADD binary_dependencies/builder binary_dependencies/builder
RUN gdebi --n binary_dependencies/builder/*.deb
ADD . .
ADD conf/init/svlogd_config /svlogd_config
ADD conf/init/preplogsdir.sh /etc/my_init.d/
ADD conf/init/tutumdocker /etc/service/tutumdocker
ADD conf/init/dockerfilebuild /etc/service/dockerfilebuild
RUN apt-get remove -y --auto-remove nodejs npm git phantomjs
RUN apt-get autoremove -y
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
VOLUME ["/var/lib/docker", "/var/lib/lxc", "/conf/stack", "/var/log"]
CMD ["/sbin/my_init"]

27
buildman/asyncutil.py Normal file
View file

@ -0,0 +1,27 @@
from functools import partial, wraps
from trollius import get_event_loop
class AsyncWrapper(object):
""" Wrapper class which will transform a syncronous library to one that can be used with
trollius coroutines.
"""
def __init__(self, delegate, loop=None, executor=None):
self._loop = loop if loop is not None else get_event_loop()
self._delegate = delegate
self._executor = executor
def __getattr__(self, attrib):
delegate_attr = getattr(self._delegate, attrib)
if not callable(delegate_attr):
return delegate_attr
def wrapper(*args, **kwargs):
""" Wraps the delegate_attr with primitives that will transform sync calls to ones shelled
out to a thread pool.
"""
callable_delegate_attr = partial(delegate_attr, *args, **kwargs)
return self._loop.run_in_executor(self._executor, callable_delegate_attr)
return wrapper

View file

@ -6,6 +6,7 @@ import time
from app import app, userfiles as user_files, build_logs, dockerfile_build_queue
from buildman.manager.enterprise import EnterpriseManager
from buildman.manager.ephemeral import EphemeralBuilderManager
from buildman.server import BuilderServer
from trollius import SSLContext
@ -13,11 +14,17 @@ from trollius import SSLContext
logger = logging.getLogger(__name__)
BUILD_MANAGERS = {
'enterprise': EnterpriseManager
'enterprise': EnterpriseManager,
'ephemeral': EphemeralBuilderManager,
}
EXTERNALLY_MANAGED = 'external'
DEFAULT_WEBSOCKET_PORT = 8787
DEFAULT_CONTROLLER_PORT = 8686
LOG_FORMAT = "%(asctime)s [%(process)d] [%(levelname)s] [%(name)s] %(message)s"
def run_build_manager():
if not features.BUILD_SUPPORT:
logger.debug('Building is disabled. Please enable the feature flag')
@ -39,6 +46,19 @@ def run_build_manager():
if manager_klass is None:
return
manager_hostname = os.environ.get('BUILDMAN_HOSTNAME',
app.config.get('BUILDMAN_HOSTNAME',
app.config['SERVER_HOSTNAME']))
websocket_port = int(os.environ.get('BUILDMAN_WEBSOCKET_PORT',
app.config.get('BUILDMAN_WEBSOCKET_PORT',
DEFAULT_WEBSOCKET_PORT)))
controller_port = int(os.environ.get('BUILDMAN_CONTROLLER_PORT',
app.config.get('BUILDMAN_CONTROLLER_PORT',
DEFAULT_CONTROLLER_PORT)))
logger.debug('Will pass buildman hostname %s to builders for websocket connection',
manager_hostname)
logger.debug('Starting build manager with lifecycle "%s"', build_manager_config[0])
ssl_context = None
if os.environ.get('SSL_CONFIG'):
@ -48,9 +68,10 @@ def run_build_manager():
os.path.join(os.environ.get('SSL_CONFIG'), 'ssl.key'))
server = BuilderServer(app.config['SERVER_HOSTNAME'], dockerfile_build_queue, build_logs,
user_files, manager_klass)
server.run('0.0.0.0', ssl=ssl_context)
user_files, manager_klass, build_manager_config[1], manager_hostname)
server.run('0.0.0.0', websocket_port, controller_port, ssl=ssl_context)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
logging.getLogger('peewee').setLevel(logging.WARN)
run_build_manager()

View file

@ -6,10 +6,10 @@ import trollius
import re
from autobahn.wamp.exception import ApplicationError
from trollius.coroutines import From
from buildman.server import BuildJobResult
from buildman.component.basecomponent import BaseComponent
from buildman.jobutil.buildjob import BuildJobLoadException
from buildman.jobutil.buildpack import BuildPackage, BuildPackageException
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.workererror import WorkerError
@ -39,7 +39,7 @@ class BuildComponent(BaseComponent):
self.builder_realm = realm
self.parent_manager = None
self.server_hostname = None
self.registry_hostname = None
self._component_status = ComponentStatus.JOINING
self._last_heartbeat = None
@ -54,27 +54,28 @@ class BuildComponent(BaseComponent):
def onJoin(self, details):
logger.debug('Registering methods and listeners for component %s', self.builder_realm)
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
yield trollius.From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
yield trollius.From(self.register(self._ping, u'io.quay.buildworker.ping'))
yield trollius.From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
yield trollius.From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
self._set_status(ComponentStatus.WAITING)
yield trollius.From(self._set_status(ComponentStatus.WAITING))
def is_ready(self):
""" Determines whether a build component is ready to begin a build. """
return self._component_status == ComponentStatus.RUNNING
@trollius.coroutine
def start_build(self, build_job):
""" Starts a build. """
self._current_job = build_job
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
self._image_info = {}
self._set_status(ComponentStatus.BUILDING)
yield trollius.From(self._set_status(ComponentStatus.BUILDING))
# Retrieve the job's buildpack.
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
buildpack_url = self.user_files.get_file_url(build_job.repo_build.resource_key,
requires_cors=False)
logger.debug('Retreiving build package: %s', buildpack_url)
@ -83,23 +84,27 @@ class BuildComponent(BaseComponent):
buildpack = BuildPackage.from_url(buildpack_url)
except BuildPackageException as bpe:
self._build_failure('Could not retrieve build package', bpe)
return
raise trollius.Return()
# Extract the base image information from the Dockerfile.
parsed_dockerfile = None
logger.debug('Parsing dockerfile')
build_config = build_job.build_config()
try:
build_config = build_job.build_config
except BuildJobLoadException as irbe:
self._build_failure('Could not load build job information', irbe)
try:
parsed_dockerfile = buildpack.parse_dockerfile(build_config.get('build_subdir'))
except BuildPackageException as bpe:
self._build_failure('Could not find Dockerfile in build package', bpe)
return
raise trollius.Return()
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_failure('Missing FROM line in Dockerfile')
return
raise trollius.Return()
base_image_information = {
'repository': image_and_tag_tuple[0],
@ -116,7 +121,7 @@ class BuildComponent(BaseComponent):
base_image_information['password'] = build_config['pull_credentials'].get('password', '')
# Retrieve the repository's fully qualified name.
repo = build_job.repo_build().repository
repo = build_job.repo_build.repository
repository_name = repo.namespace_user.username + '/' + repo.name
# Parse the build queue item into build arguments.
@ -136,9 +141,9 @@ class BuildComponent(BaseComponent):
'build_package': buildpack_url,
'sub_directory': build_config.get('build_subdir', ''),
'repository': repository_name,
'registry': self.server_hostname,
'pull_token': build_job.repo_build().access_token.code,
'push_token': build_job.repo_build().access_token.code,
'registry': self.registry_hostname,
'pull_token': build_job.repo_build.access_token.code,
'push_token': build_job.repo_build.access_token.code,
'tag_names': build_config.get('docker_tags', ['latest']),
'base_image': base_image_information,
'cached_tag': build_job.determine_cached_tag() or ''
@ -148,9 +153,7 @@ class BuildComponent(BaseComponent):
logger.debug('Invoking build: %s', self.builder_realm)
logger.debug('With Arguments: %s', build_arguments)
return (self
.call("io.quay.builder.build", **build_arguments)
.add_done_callback(self._build_complete))
self.call("io.quay.builder.build", **build_arguments).add_done_callback(self._build_complete)
@staticmethod
def _total_completion(statuses, total_images):
@ -244,11 +247,11 @@ class BuildComponent(BaseComponent):
'internal_error': exception.message if exception else None
})
build_id = self._current_job.repo_build().uuid
build_id = self._current_job.repo_build.uuid
logger.warning('Build %s failed with message: %s', build_id, error_message)
# Mark that the build has finished (in an error state)
self._build_finished(BuildJobResult.ERROR)
trollius.async(self._build_finished(BuildJobResult.ERROR))
def _build_complete(self, result):
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
@ -256,7 +259,7 @@ class BuildComponent(BaseComponent):
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
result.result()
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
self._build_finished(BuildJobResult.COMPLETE)
trollius.async(self._build_finished(BuildJobResult.COMPLETE))
except ApplicationError as aex:
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
@ -266,50 +269,58 @@ class BuildComponent(BaseComponent):
# Mark the build as completed.
if worker_error.is_internal_error():
self._build_finished(BuildJobResult.INCOMPLETE)
trollius.async(self._build_finished(BuildJobResult.INCOMPLETE))
else:
self._build_finished(BuildJobResult.ERROR)
trollius.async(self._build_finished(BuildJobResult.ERROR))
@trollius.coroutine
def _build_finished(self, job_status):
""" Alerts the parent that a build has completed and sets the status back to running. """
self.parent_manager.job_completed(self._current_job, job_status, self)
yield trollius.From(self.parent_manager.job_completed(self._current_job, job_status, self))
self._current_job = None
# Set the component back to a running state.
self._set_status(ComponentStatus.RUNNING)
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
@staticmethod
def _ping():
""" Ping pong. """
return 'pong'
@trollius.coroutine
def _on_ready(self, token, version):
if not version in SUPPORTED_WORKER_VERSIONS:
logger.warning('Build component (token "%s") is running an out-of-date version: %s', version)
return False
logger.warning('Build component (token "%s") is running an out-of-date version: %s', token,
version)
raise trollius.Return(False)
if self._component_status != 'waiting':
logger.warning('Build component (token "%s") is already connected', self.expected_token)
return False
raise trollius.Return(False)
if token != self.expected_token:
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token)
return False
logger.warning('Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token,
token)
raise trollius.Return(False)
self._set_status(ComponentStatus.RUNNING)
yield trollius.From(self._set_status(ComponentStatus.RUNNING))
# Start the heartbeat check and updating loop.
loop = trollius.get_event_loop()
loop.create_task(self._heartbeat())
logger.debug('Build worker %s is connected and ready', self.builder_realm)
return True
raise trollius.Return(True)
@trollius.coroutine
def _set_status(self, phase):
if phase == ComponentStatus.RUNNING:
yield trollius.From(self.parent_manager.build_component_ready(self))
self._component_status = phase
def _on_heartbeat(self):
""" Updates the last known heartbeat. """
self._last_heartbeat = datetime.datetime.now()
self._last_heartbeat = datetime.datetime.utcnow()
@trollius.coroutine
def _heartbeat(self):
@ -317,7 +328,7 @@ class BuildComponent(BaseComponent):
and updating the heartbeat in the build status dictionary (if applicable). This allows
the build system to catch crashes from either end.
"""
yield From(trollius.sleep(INITIAL_TIMEOUT))
yield trollius.From(trollius.sleep(INITIAL_TIMEOUT))
while True:
# If the component is no longer running or actively building, nothing more to do.
@ -331,22 +342,23 @@ class BuildComponent(BaseComponent):
with build_status as status_dict:
status_dict['heartbeat'] = int(time.time())
# Mark the build item.
current_job = self._current_job
if current_job is not None:
self.parent_manager.job_heartbeat(current_job)
yield trollius.From(self.parent_manager.job_heartbeat(current_job))
# Check the heartbeat from the worker.
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
if self._last_heartbeat and self._last_heartbeat < datetime.datetime.now() - HEARTBEAT_DELTA:
self._timeout()
if (self._last_heartbeat and
self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA):
yield trollius.From(self._timeout())
return
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
yield trollius.From(trollius.sleep(HEARTBEAT_TIMEOUT))
@trollius.coroutine
def _timeout(self):
self._set_status(ComponentStatus.TIMED_OUT)
yield trollius.From(self._set_status(ComponentStatus.TIMED_OUT))
logger.warning('Build component with realm %s has timed out', self.builder_realm)
self._dispose(timed_out=True)

View file

@ -1,6 +1,9 @@
import json
from cachetools import lru_cache
from data import model
import json
class BuildJobLoadException(Exception):
""" Exception raised if a build job could not be instantiated for some reason. """
@ -9,50 +12,46 @@ class BuildJobLoadException(Exception):
class BuildJob(object):
""" Represents a single in-progress build job. """
def __init__(self, job_item):
self._job_item = job_item
self.job_item = job_item
try:
self._job_details = json.loads(job_item.body)
self.job_details = json.loads(job_item.body)
except ValueError:
raise BuildJobLoadException(
'Could not parse build queue item config with ID %s' % self._job_details['build_uuid']
'Could not parse build queue item config with ID %s' % self.job_details['build_uuid']
)
@lru_cache(maxsize=1)
def _load_repo_build(self):
try:
self._repo_build = model.get_repository_build(self._job_details['build_uuid'])
return model.get_repository_build(self.job_details['build_uuid'])
except model.InvalidRepositoryBuildException:
raise BuildJobLoadException(
'Could not load repository build with ID %s' % self._job_details['build_uuid'])
'Could not load repository build with ID %s' % self.job_details['build_uuid'])
@property
def repo_build(self):
return self._load_repo_build()
@property
def build_config(self):
try:
self._build_config = json.loads(self._repo_build.job_config)
return json.loads(self.repo_build.job_config)
except ValueError:
raise BuildJobLoadException(
'Could not parse repository build job config with ID %s' % self._job_details['build_uuid']
'Could not parse repository build job config with ID %s' % self.job_details['build_uuid']
)
def determine_cached_tag(self):
""" Returns the tag to pull to prime the cache or None if none. """
# TODO(jschorr): Change this to use the more complicated caching rules, once we have caching
# be a pull of things besides the constructed tags.
tags = self._build_config.get('docker_tags', ['latest'])
existing_tags = model.list_repository_tags(self._repo_build.repository.namespace_user.username,
self._repo_build.repository.name)
tags = self.build_config.get('docker_tags', ['latest'])
existing_tags = model.list_repository_tags(self.repo_build.repository.namespace_user.username,
self.repo_build.repository.name)
cached_tags = set(tags) & set([tag.name for tag in existing_tags])
if cached_tags:
return list(cached_tags)[0]
return None
def job_item(self):
""" Returns the job's queue item. """
return self._job_item
def repo_build(self):
""" Returns the repository build DB row for the job. """
return self._repo_build
def build_config(self):
""" Returns the parsed repository build config for the job. """
return self._build_config

View file

@ -1,12 +1,12 @@
from data.database import BUILD_PHASE
from data import model
class StatusHandler(object):
""" Context wrapper for writing status to build logs. """
def __init__(self, build_logs, repository_build):
def __init__(self, build_logs, repository_build_uuid):
self._current_phase = None
self._repository_build = repository_build
self._uuid = repository_build.uuid
self._uuid = repository_build_uuid
self._build_logs = build_logs
self._status = {
@ -41,8 +41,11 @@ class StatusHandler(object):
self._current_phase = phase
self._append_log_message(phase, self._build_logs.PHASE, extra_data)
self._repository_build.phase = phase
self._repository_build.save()
# Update the repository build with the new phase
repo_build = model.get_repository_build(self._uuid)
repo_build.phase = phase
repo_build.save()
return True
def __enter__(self):

View file

@ -1,12 +1,17 @@
from trollius import coroutine
class BaseManager(object):
""" Base for all worker managers. """
def __init__(self, register_component, unregister_component, job_heartbeat_callback,
job_complete_callback):
job_complete_callback, manager_hostname, heartbeat_period_sec):
self.register_component = register_component
self.unregister_component = unregister_component
self.job_heartbeat_callback = job_heartbeat_callback
self.job_complete_callback = job_complete_callback
self.manager_hostname = manager_hostname
self.heartbeat_period_sec = heartbeat_period_sec
@coroutine
def job_heartbeat(self, build_job):
""" Method invoked to tell the manager that a job is still running. This method will be called
every few minutes. """
@ -25,26 +30,36 @@ class BaseManager(object):
"""
raise NotImplementedError
def schedule(self, build_job, loop):
@coroutine
def schedule(self, build_job):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled
and False if all workers are busy.
"""
raise NotImplementedError
def initialize(self):
def initialize(self, manager_config):
""" Runs any initialization code for the manager. Called once the server is in a ready state.
"""
raise NotImplementedError
@coroutine
def build_component_ready(self, build_component):
""" Method invoked whenever a build component announces itself as ready.
"""
raise NotImplementedError
def build_component_disposed(self, build_component, timed_out):
""" Method invoked whenever a build component has been disposed. The timed_out boolean indicates
whether the component's heartbeat timed out.
"""
raise NotImplementedError
@coroutine
def job_completed(self, build_job, job_status, build_component):
""" Method invoked once a job_item has completed, in some manner. The job_status will be
one of: incomplete, error, complete. If incomplete, the job should be requeued.
one of: incomplete, error, complete. Implementations of this method should call
self.job_complete_callback with a status of Incomplete if they wish for the job to be
automatically requeued.
"""
raise NotImplementedError

View file

@ -5,7 +5,7 @@ from buildman.component.basecomponent import BaseComponent
from buildman.component.buildcomponent import BuildComponent
from buildman.manager.basemanager import BaseManager
from trollius.coroutines import From
from trollius import From, Return, coroutine
REGISTRATION_REALM = 'registration'
logger = logging.getLogger(__name__)
@ -28,10 +28,15 @@ class DynamicRegistrationComponent(BaseComponent):
class EnterpriseManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
build_components = []
shutting_down = False
def initialize(self):
def __init__(self, *args, **kwargs):
self.ready_components = set()
self.all_components = set()
self.shutting_down = False
super(EnterpriseManager, self).__init__(*args, **kwargs)
def initialize(self, manager_config):
# Add a component which is used by build workers for dynamic registration. Unlike
# production, build workers in enterprise are long-lived and register dynamically.
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
@ -45,30 +50,37 @@ class EnterpriseManager(BaseManager):
""" Adds a new build component for an Enterprise Registry. """
# Generate a new unique realm ID for the build worker.
realm = str(uuid.uuid4())
component = self.register_component(realm, BuildComponent, token="")
self.build_components.append(component)
new_component = self.register_component(realm, BuildComponent, token="")
self.all_components.add(new_component)
return realm
def schedule(self, build_job, loop):
@coroutine
def schedule(self, build_job):
""" Schedules a build for an Enterprise Registry. """
if self.shutting_down:
return False
if self.shutting_down or not self.ready_components:
raise Return(False)
for component in self.build_components:
if component.is_ready():
loop.call_soon(component.start_build, build_job)
return True
component = self.ready_components.pop()
return False
yield From(component.start_build(build_job))
raise Return(True)
@coroutine
def build_component_ready(self, build_component):
self.ready_components.add(build_component)
def shutdown(self):
self.shutting_down = True
@coroutine
def job_completed(self, build_job, job_status, build_component):
self.job_complete_callback(build_job, job_status)
def build_component_disposed(self, build_component, timed_out):
self.build_components.remove(build_component)
self.all_components.remove(build_component)
if build_component in self.ready_components:
self.ready_components.remove(build_component)
def num_workers(self):
return len(self.build_components)
return len(self.all_components)

View file

@ -0,0 +1,325 @@
import logging
import etcd
import uuid
import calendar
import os.path
import json
from datetime import datetime, timedelta
from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor
from urllib3.exceptions import ReadTimeoutError, ProtocolError
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper
from util.morecollections import AttrDict
logger = logging.getLogger(__name__)
ETCD_BUILDER_PREFIX = 'building/'
ETCD_REALM_PREFIX = 'realm/'
ETCD_DISABLE_TIMEOUT = 0
class EtcdAction(object):
GET = 'get'
SET = 'set'
EXPIRE = 'expire'
UPDATE = 'update'
DELETE = 'delete'
CREATE = 'create'
COMPARE_AND_SWAP = 'compareAndSwap'
COMPARE_AND_DELETE = 'compareAndDelete'
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
_etcd_client_klass = etcd.Client
def __init__(self, *args, **kwargs):
self._shutting_down = False
self._manager_config = None
self._async_thread_executor = None
self._etcd_client = None
self._component_to_job = {}
self._job_uuid_to_component = {}
self._component_to_builder = {}
self._executor = None
# Map of etcd keys being watched to the tasks watching them
self._watch_tasks = {}
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def _watch_etcd(self, etcd_key, change_callback, recursive=True):
watch_task_key = (etcd_key, recursive)
def callback_wrapper(changed_key_future):
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback)
if changed_key_future.cancelled():
# Due to lack of interest, tomorrow has been cancelled
return
try:
etcd_result = changed_key_future.result()
except (ReadTimeoutError, ProtocolError):
return
change_callback(etcd_result)
if not self._shutting_down:
watch_future = self._etcd_client.watch(etcd_key, recursive=recursive,
timeout=ETCD_DISABLE_TIMEOUT)
watch_future.add_done_callback(callback_wrapper)
logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '')
self._watch_tasks[watch_task_key] = async(watch_future)
def _handle_builder_expiration(self, etcd_result):
if etcd_result.action == EtcdAction.EXPIRE:
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
job_metadata = json.loads(etcd_result._prev_node.value)
if 'builder_id' in job_metadata:
logger.info('Terminating expired build node.')
async(self._executor.stop_builder(job_metadata['builder_id']))
def _handle_realm_change(self, etcd_result):
if etcd_result.action == EtcdAction.CREATE:
# We must listen on the realm created by ourselves or another worker
realm_spec = json.loads(etcd_result.value)
self._register_realm(realm_spec)
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
# We must stop listening for new connections on the specified realm, if we did not get the
# connection
realm_spec = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
component = self._job_uuid_to_component.pop(build_job.job_details['build_uuid'], None)
if component is not None:
# We were not the manager which the worker connected to, remove the bookkeeping for it
logger.debug('Unregistering unused component on realm: %s', realm_spec['realm'])
del self._component_to_job[component]
del self._component_to_builder[component]
self.unregister_component(component)
else:
logger.warning('Unexpected action (%s) on realm key: %s', etcd_result.action, etcd_result.key)
def _register_realm(self, realm_spec):
logger.debug('Registering realm with manager: %s', realm_spec['realm'])
component = self.register_component(realm_spec['realm'], BuildComponent,
token=realm_spec['token'])
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
self._component_to_job[component] = build_job
self._component_to_builder[component] = realm_spec['builder_id']
self._job_uuid_to_component[build_job.job_details['build_uuid']] = component
@coroutine
def _register_existing_realms(self):
try:
all_realms = yield From(self._etcd_client.read(ETCD_REALM_PREFIX, recursive=True))
for realm in all_realms.children:
if not realm.dir:
self._register_realm(json.loads(realm.value))
except KeyError:
# no realms have been registered yet
pass
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.manager_hostname)
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
etcd_auth = self._manager_config.get('ETCD_CERT_AND_KEY', None)
etcd_ca_cert = self._manager_config.get('ETCD_CA_CERT', None)
etcd_protocol = 'http' if etcd_auth is None else 'https'
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
self._async_thread_executor = ThreadPoolExecutor(worker_threads)
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port,
cert=etcd_auth, ca_cert=etcd_ca_cert,
protocol=etcd_protocol),
executor=self._async_thread_executor)
self._watch_etcd(ETCD_BUILDER_PREFIX, self._handle_builder_expiration)
self._watch_etcd(ETCD_REALM_PREFIX, self._handle_realm_change)
# Load components for all realms currently known to the cluster
async(self._register_existing_realms())
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
return setup_time
def shutdown(self):
logger.debug('Shutting down worker.')
self._shutting_down = True
for (etcd_key, _), task in self._watch_tasks.items():
if not task.done():
logger.debug('Canceling watch task for %s', etcd_key)
task.cancel()
if self._async_thread_executor is not None:
logger.debug('Shutting down thread pool executor.')
self._async_thread_executor.shutdown()
@coroutine
def schedule(self, build_job):
build_uuid = build_job.job_details['build_uuid']
logger.debug('Calling schedule with job: %s', build_uuid)
# Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
try:
building = yield From(self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True))
workers_alive = sum(1 for child in building.children if not child.dir)
except KeyError:
workers_alive = 0
logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
allowed_worker_count)
raise Return(False)
job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4())
token = str(uuid.uuid4())
ttl = self.setup_time()
expiration = datetime.utcnow() + timedelta(seconds=ttl)
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
payload = {
'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()),
}
try:
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl))
except KeyError:
# The job was already taken by someone else, we are probably a retry
logger.error('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
raise Return(False)
logger.debug('Starting builder with executor: %s', self._executor)
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
# Store the builder in etcd associated with the job id
payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl))
# Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({
'realm': realm,
'token': token,
'builder_id': builder_id,
'job_queue_item': build_job.job_item,
})
try:
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
ttl=ttl))
except KeyError:
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.')
raise Return(False)
raise Return(True)
@coroutine
def build_component_ready(self, build_component):
try:
# Clean up the bookkeeping for allowing any manager to take the job
job = self._component_to_job.pop(build_component)
del self._job_uuid_to_component[job.job_details['build_uuid']]
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
logger.debug('Sending build %s to newly ready component on realm %s',
job.job_details['build_uuid'], build_component.builder_realm)
yield From(build_component.start_build(job))
except KeyError:
logger.debug('Builder is asking for more work, but work already completed')
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
# TODO make it so that I don't have to unregister the component if it timed out
self.unregister_component(build_component)
@coroutine
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
# Kill the ephmeral builder
yield From(self._executor.stop_builder(self._component_to_builder.pop(build_component)))
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
yield From(self._etcd_client.delete(job_key))
self.job_complete_callback(build_job, job_status)
@coroutine
def job_heartbeat(self, build_job):
# Extend the deadline in etcd
job_key = self._etcd_job_key(build_job)
build_job_metadata_response = yield From(self._etcd_client.read(job_key))
build_job_metadata = json.loads(build_job_metadata_response.value)
max_expiration = datetime.utcfromtimestamp(build_job_metadata['max_expiration'])
max_expiration_remaining = max_expiration - datetime.utcnow()
max_expiration_sec = max(0, int(max_expiration_remaining.total_seconds()))
ttl = min(self.heartbeat_period_sec * 2, max_expiration_sec)
new_expiration = datetime.utcnow() + timedelta(seconds=ttl)
payload = {
'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'],
'max_expiration': build_job_metadata['max_expiration'],
}
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
self.job_heartbeat_callback(build_job)
@staticmethod
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
"""
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
@staticmethod
def _etcd_realm_key(realm):
""" Create a key which is used to track an incoming connection on a realm.
"""
return os.path.join(ETCD_REALM_PREFIX, realm)
def num_workers(self):
""" Return the number of workers we're managing locally.
"""
return len(self._component_to_builder)

View file

@ -0,0 +1,228 @@
import logging
import os
import uuid
import threading
import boto.ec2
import requests
import cachetools
from jinja2 import FileSystemLoader, Environment
from trollius import coroutine, From, Return, get_event_loop
from functools import partial
from buildman.asyncutil import AsyncWrapper
logger = logging.getLogger(__name__)
ONE_HOUR = 60*60
ENV = Environment(loader=FileSystemLoader('buildman/templates'))
TEMPLATE = ENV.get_template('cloudconfig.yaml')
class ExecutorException(Exception):
""" Exception raised when there is a problem starting or stopping a builder.
"""
pass
class BuilderExecutor(object):
def __init__(self, executor_config, manager_hostname):
self.executor_config = executor_config
self.manager_hostname = manager_hostname
""" Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
starting and stopping builders.
"""
@coroutine
def start_builder(self, realm, token, build_uuid):
""" Create a builder with the specified config. Returns a unique id which can be used to manage
the builder.
"""
raise NotImplementedError
@coroutine
def stop_builder(self, builder_id):
""" Stop a builder which is currently running.
"""
raise NotImplementedError
def get_manager_websocket_url(self):
return 'ws://{0}:'
def generate_cloud_config(self, realm, token, coreos_channel, manager_hostname,
quay_username=None, quay_password=None):
if quay_username is None:
quay_username = self.executor_config['QUAY_USERNAME']
if quay_password is None:
quay_password = self.executor_config['QUAY_PASSWORD']
return TEMPLATE.render(
realm=realm,
token=token,
quay_username=quay_username,
quay_password=quay_password,
manager_hostname=manager_hostname,
coreos_channel=coreos_channel,
)
class EC2Executor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
providers.
"""
COREOS_STACK_URL = 'http://%s.release.core-os.net/amd64-usr/current/coreos_production_ami_hvm.txt'
def __init__(self, *args, **kwargs):
self._loop = get_event_loop()
super(EC2Executor, self).__init__(*args, **kwargs)
def _get_conn(self):
""" Creates an ec2 connection which can be used to manage instances.
"""
return AsyncWrapper(boto.ec2.connect_to_region(
self.executor_config['EC2_REGION'],
aws_access_key_id=self.executor_config['AWS_ACCESS_KEY'],
aws_secret_access_key=self.executor_config['AWS_SECRET_KEY'],
))
@classmethod
@cachetools.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
""" Retrieve the CoreOS AMI id from the canonical listing.
"""
stack_list_string = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).text
stack_amis = dict([stack.split('=') for stack in stack_list_string.split('|')])
return stack_amis[ec2_region]
@coroutine
def start_builder(self, realm, token, build_uuid):
region = self.executor_config['EC2_REGION']
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = yield From(self._loop.run_in_executor(None, get_ami_callable))
user_data = self.generate_cloud_config(realm, token, channel, self.manager_hostname)
logger.debug('Generated cloud config: %s', user_data)
ec2_conn = self._get_conn()
ssd_root_ebs = boto.ec2.blockdevicemapping.BlockDeviceType(
size=8,
volume_type='gp2',
delete_on_termination=True,
)
block_devices = boto.ec2.blockdevicemapping.BlockDeviceMapping()
block_devices['/dev/xvda'] = ssd_root_ebs
reservation = yield From(ec2_conn.run_instances(
coreos_ami,
instance_type=self.executor_config['EC2_INSTANCE_TYPE'],
security_groups=self.executor_config['EC2_SECURITY_GROUP_IDS'],
key_name=self.executor_config.get('EC2_KEY_NAME', None),
user_data=user_data,
instance_initiated_shutdown_behavior='terminate',
block_device_map=block_devices,
))
if not reservation.instances:
raise ExecutorException('Unable to spawn builder instance.')
elif len(reservation.instances) != 1:
raise ExecutorException('EC2 started wrong number of instances!')
launched = AsyncWrapper(reservation.instances[0])
yield From(launched.add_tags({
'Name': 'Quay Ephemeral Builder',
'Realm': realm,
'Token': token,
'BuildUUID': build_uuid,
}))
raise Return(launched.id)
@coroutine
def stop_builder(self, builder_id):
ec2_conn = self._get_conn()
terminated_instances = yield From(ec2_conn.terminate_instances([builder_id]))
if builder_id not in [si.id for si in terminated_instances]:
raise ExecutorException('Unable to terminate instance: %s' % builder_id)
class PopenExecutor(BuilderExecutor):
""" Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
"""
def __init__(self, executor_config, manager_hostname):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
""" Executor which uses Popen to fork a quay-builder process.
"""
@coroutine
def start_builder(self, realm, token, build_uuid):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug('Forking process for build')
import subprocess
builder_env = {
'TOKEN': token,
'REALM': realm,
'ENDPOINT': 'ws://localhost:8787',
'DOCKER_TLS_VERIFY': os.environ.get('DOCKER_TLS_VERIFY', ''),
'DOCKER_CERT_PATH': os.environ.get('DOCKER_CERT_PATH', ''),
'DOCKER_HOST': os.environ.get('DOCKER_HOST', ''),
}
logpipe = LogPipe(logging.INFO)
spawned = subprocess.Popen('/Users/jake/bin/quay-builder', stdout=logpipe, stderr=logpipe,
env=builder_env)
builder_id = str(uuid.uuid4())
self._jobs[builder_id] = (spawned, logpipe)
logger.debug('Builder spawned with id: %s', builder_id)
raise Return(builder_id)
@coroutine
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException('Builder id not being tracked by executor.')
logger.debug('Killing builder with id: %s', builder_id)
spawned, logpipe = self._jobs[builder_id]
if spawned.poll() is None:
spawned.kill()
logpipe.close()
class LogPipe(threading.Thread):
""" Adapted from http://codereview.stackexchange.com/a/17959
"""
def __init__(self, level):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
self.daemon = False
self.level = level
self.fd_read, self.fd_write = os.pipe()
self.pipe_reader = os.fdopen(self.fd_read)
self.start()
def fileno(self):
"""Return the write file descriptor of the pipe
"""
return self.fd_write
def run(self):
"""Run the thread, logging everything.
"""
for line in iter(self.pipe_reader.readline, ''):
logging.log(self.level, line.strip('\n'))
self.pipe_reader.close()
def close(self):
"""Close the write end of the pipe.
"""
os.close(self.fd_write)

View file

@ -21,8 +21,7 @@ TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
WEBSOCKET_PORT = 8787
CONTROLLER_PORT = 8686
HEARTBEAT_PERIOD_SEC = 30
class BuildJobResult(object):
""" Build job result enum """
@ -34,14 +33,15 @@ class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_hostname):
self._loop = None
self._current_status = 'starting'
self._current_components = []
self._job_count = 0
self._session_factory = RouterSessionFactory(RouterFactory())
self._server_hostname = server_hostname
self._registry_hostname = registry_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
@ -49,8 +49,11 @@ class BuilderServer(object):
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
)
self._lifecycle_manager_config = lifecycle_manager_config
self._shutdown_event = Event()
self._current_status = 'running'
@ -67,18 +70,17 @@ class BuilderServer(object):
self._controller_app = controller_app
def run(self, host, ssl=None):
def run(self, host, websocket_port, controller_port, ssl=None):
logger.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize()
self._lifecycle_manager.initialize(self._lifecycle_manager_config)
logger.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
trollius.Task(self._initialize(loop, host, ssl))
logger.debug('Starting server on port %s, with controller on port %s', WEBSOCKET_PORT,
CONTROLLER_PORT)
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
controller_port)
try:
loop.run_forever()
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
except KeyboardInterrupt:
pass
finally:
@ -102,7 +104,7 @@ class BuilderServer(object):
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.server_hostname = self._server_hostname
component.registry_hostname = self._registry_hostname
self._current_components.append(component)
self._session_factory.add(component)
@ -116,16 +118,16 @@ class BuilderServer(object):
self._session_factory.remove(component)
def _job_heartbeat(self, build_job):
WorkQueue.extend_processing(build_job.job_item(), seconds_from_now=JOB_TIMEOUT_SECONDS,
WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION)
def _job_complete(self, build_job, job_status):
if job_status == BuildJobResult.INCOMPLETE:
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30)
elif job_status == BuildJobResult.ERROR:
self._queue.incomplete(build_job.job_item(), restore_retry=False)
self._queue.incomplete(build_job.job_item, restore_retry=False)
else:
self._queue.complete(build_job.job_item())
self._queue.complete(build_job.job_item)
self._job_count = self._job_count - 1
@ -137,7 +139,8 @@ class BuilderServer(object):
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers())
logger.debug('Checking for more work for %d active workers',
self._lifecycle_manager.num_workers())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
@ -151,18 +154,16 @@ class BuilderServer(object):
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(build_job, self._loop):
scheduled = yield From(self._lifecycle_manager.schedule(build_job))
if scheduled:
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host, ssl=None):
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
self._loop = loop
# Create the WAMP server.
@ -170,8 +171,8 @@ class BuilderServer(object):
transport_factory.setProtocolOptions(failByDrop=True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=CONTROLLER_PORT, ssl=ssl)
yield From(loop.create_server(transport_factory, host, WEBSOCKET_PORT, ssl=ssl))
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
# Initialize the work queue checker.
yield From(self._work_checker())

View file

@ -0,0 +1,31 @@
#cloud-config
write_files:
- path: /root/overrides.list
permission: '0644'
content: |
REALM={{ realm }}
TOKEN={{ token }}
SERVER=wss://{{ manager_hostname }}
coreos:
update:
reboot-strategy: off
group: {{ coreos_channel }}
units:
- name: quay-builder.service
command: start
content: |
[Unit]
Description=Quay builder container
Author=Jake Moshenko
After=docker.service
[Service]
TimeoutStartSec=600
TimeoutStopSec=2000
ExecStartPre=/usr/bin/docker login -u {{ quay_username }} -p {{ quay_password }} -e unused quay.io
ExecStart=/usr/bin/docker run --rm --net=host --name quay-builder --privileged --env-file /root/overrides.list -v /var/run/docker.sock:/var/run/docker.sock -v /usr/share/ca-certificates:/etc/ssl/certs quay.io/coreos/registry-build-worker:scratched
ExecStop=/usr/bin/docker stop quay-builder
ExecStopPost=/bin/sh -xc "/bin/sleep 120; /usr/bin/systemctl --no-block poweroff"

View file

@ -1,2 +0,0 @@
#!/bin/sh
exec svlogd /var/log/dockerfilebuild/

View file

@ -1,6 +0,0 @@
#! /bin/bash
sv start tutumdocker || exit 1
cd /
venv/bin/python -m workers.dockerfilebuild

View file

@ -1,2 +0,0 @@
#!/bin/sh
exec svlogd /var/log/tutumdocker/

View file

@ -1,96 +0,0 @@
#!/bin/bash
# First, make sure that cgroups are mounted correctly.
CGROUP=/sys/fs/cgroup
[ -d $CGROUP ] ||
mkdir $CGROUP
mountpoint -q $CGROUP ||
mount -n -t tmpfs -o uid=0,gid=0,mode=0755 cgroup $CGROUP || {
echo "Could not make a tmpfs mount. Did you use -privileged?"
exit 1
}
if [ -d /sys/kernel/security ] && ! mountpoint -q /sys/kernel/security
then
mount -t securityfs none /sys/kernel/security || {
echo "Could not mount /sys/kernel/security."
echo "AppArmor detection and -privileged mode might break."
}
fi
# Mount the cgroup hierarchies exactly as they are in the parent system.
for SUBSYS in $(cut -d: -f2 /proc/1/cgroup)
do
[ -d $CGROUP/$SUBSYS ] || mkdir $CGROUP/$SUBSYS
mountpoint -q $CGROUP/$SUBSYS ||
mount -n -t cgroup -o $SUBSYS cgroup $CGROUP/$SUBSYS
# The two following sections address a bug which manifests itself
# by a cryptic "lxc-start: no ns_cgroup option specified" when
# trying to start containers withina container.
# The bug seems to appear when the cgroup hierarchies are not
# mounted on the exact same directories in the host, and in the
# container.
# Named, control-less cgroups are mounted with "-o name=foo"
# (and appear as such under /proc/<pid>/cgroup) but are usually
# mounted on a directory named "foo" (without the "name=" prefix).
# Systemd and OpenRC (and possibly others) both create such a
# cgroup. To avoid the aforementioned bug, we symlink "foo" to
# "name=foo". This shouldn't have any adverse effect.
echo $SUBSYS | grep -q ^name= && {
NAME=$(echo $SUBSYS | sed s/^name=//)
ln -s $SUBSYS $CGROUP/$NAME
}
# Likewise, on at least one system, it has been reported that
# systemd would mount the CPU and CPU accounting controllers
# (respectively "cpu" and "cpuacct") with "-o cpuacct,cpu"
# but on a directory called "cpu,cpuacct" (note the inversion
# in the order of the groups). This tries to work around it.
[ $SUBSYS = cpuacct,cpu ] && ln -s $SUBSYS $CGROUP/cpu,cpuacct
done
# Note: as I write those lines, the LXC userland tools cannot setup
# a "sub-container" properly if the "devices" cgroup is not in its
# own hierarchy. Let's detect this and issue a warning.
grep -q :devices: /proc/1/cgroup ||
echo "WARNING: the 'devices' cgroup should be in its own hierarchy."
grep -qw devices /proc/1/cgroup ||
echo "WARNING: it looks like the 'devices' cgroup is not mounted."
# Now, close extraneous file descriptors.
pushd /proc/self/fd >/dev/null
for FD in *
do
case "$FD" in
# Keep stdin/stdout/stderr
[012])
;;
# Nuke everything else
*)
eval exec "$FD>&-"
;;
esac
done
popd >/dev/null
# If a pidfile is still around (for example after a container restart),
# delete it so that docker can start.
rm -rf /var/run/docker.pid
chmod 777 /var/lib/lxc
chmod 777 /var/lib/docker
# If we were given a PORT environment variable, start as a simple daemon;
# otherwise, spawn a shell as well
if [ "$PORT" ]
then
exec docker -d -H 0.0.0.0:$PORT
else
docker -d -D -e lxc 2>&1
fi

View file

@ -78,7 +78,8 @@ class WorkQueue(object):
def get(self, processing_time=300):
"""
Get an available item and mark it as unavailable for the default of five
minutes.
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()

View file

@ -72,8 +72,8 @@ def build_status_view(build_obj, can_write=False):
# minutes. If not, then the build timed out.
if phase != database.BUILD_PHASE.COMPLETE and phase != database.BUILD_PHASE.ERROR:
if status is not None and 'heartbeat' in status and status['heartbeat']:
heartbeat = datetime.datetime.fromtimestamp(status['heartbeat'])
if datetime.datetime.now() - heartbeat > datetime.timedelta(minutes=1):
heartbeat = datetime.datetime.utcfromtimestamp(status['heartbeat'])
if datetime.datetime.utcnow() - heartbeat > datetime.timedelta(minutes=1):
phase = database.BUILD_PHASE.INTERNAL_ERROR
logger.debug('Can write: %s job_config: %s', can_write, build_obj.job_config)

View file

@ -211,7 +211,7 @@ def start_build(repository, dockerfile_id, tags, build_name, subdir, manual,
dockerfile_build_queue.put([repository.namespace_user.username, repository.name], json.dumps({
'build_uuid': build_request.uuid,
'pull_credentials': model.get_pull_credentials(pull_robot_name) if pull_robot_name else None
}), retries_remaining=1)
}), retries_remaining=3)
# Add the build to the repo's log.
metadata = {

View file

@ -40,4 +40,7 @@ git+https://github.com/DevTable/aniso8601-fake.git
git+https://github.com/DevTable/anunidecode.git
git+https://github.com/DevTable/avatar-generator.git
git+https://github.com/DevTable/pygithub.git
git+https://github.com/jplana/python-etcd.git
gipc
cachetools
mock

View file

@ -22,6 +22,7 @@ backports.ssl-match-hostname==3.4.0.2
beautifulsoup4==4.3.2
blinker==1.3
boto==2.35.1
cachetools==1.0.0
docker-py==0.7.1
ecdsa==0.11
futures==2.2.0
@ -35,6 +36,7 @@ itsdangerous==0.24
jsonschema==2.4.0
marisa-trie==0.7
mixpanel-py==3.2.1
mock==1.0.1
paramiko==1.15.2
peewee==2.4.5
psycopg2==2.5.4
@ -61,3 +63,4 @@ git+https://github.com/DevTable/anunidecode.git
git+https://github.com/DevTable/avatar-generator.git
git+https://github.com/DevTable/pygithub.git
git+https://github.com/NateFerrero/oauth2lib.git
git+https://github.com/jplana/python-etcd.git

233
test/test_buildman.py Normal file
View file

@ -0,0 +1,233 @@
import unittest
import etcd
import os.path
import time
import json
from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock
from threading import Event
from urllib3.exceptions import ReadTimeoutError
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
ETCD_REALM_PREFIX, EtcdAction)
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
future = coro(*args, **kwargs)
loop = get_event_loop()
loop.run_until_complete(future)
return wrapper
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
self.etcd_wait_event = Event()
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def hang_until_event(*args, **kwargs):
time.sleep(.01) # 10ms to simulate network latency
self.etcd_wait_event.wait()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
return self.etcd_client_mock
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
return self.test_executor
def _create_build_job(self):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
mock_job.job_item = {
'body': json.dumps(mock_job.job_details),
'id': 1,
}
return mock_job
def setUp(self):
EphemeralBuilderManager._executors['test'] = self._create_mock_executor
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
)
self.manager.initialize({'EXECUTOR': 'test'})
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID)
def tearDown(self):
self.etcd_wait_event.set()
self.manager.shutdown()
del EphemeralBuilderManager._executors['test']
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call(ETCD_REALM_PREFIX, recursive=True, timeout=0)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID)
realm_created.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join(ETCD_REALM_PREFIX, REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join(ETCD_REALM_PREFIX, REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
@async_test
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call(ETCD_BUILDER_PREFIX, recursive=True, timeout=0)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
self.manager._handle_builder_expiration(expired_result)
yield From(sleep(.01))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_builder_expiration(set_result)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'builder_id': '123',
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(return_value=builder_result)
yield From(self.manager.job_heartbeat(self.mock_job))
# Wait for threads to complete
yield From(sleep(.01))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)

View file

@ -1,704 +0,0 @@
import logging.config
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
import logging
import argparse
import os
import requests
import re
import json
import shutil
import tarfile
from docker import Client
from docker.utils import kwargs_from_env
from docker.errors import APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from functools import partial
from datetime import datetime, timedelta
from threading import Event
from uuid import uuid4
from collections import defaultdict
from requests.exceptions import ConnectionError
from data import model
from data.database import BUILD_PHASE
from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue
from endpoints.notificationhelper import spawn_notification
from util.safetar import safe_extractall
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
logger = logging.getLogger(__name__)
TIMEOUT_PERIOD_MINUTES = 20
CACHE_EXPIRATION_PERIOD_HOURS = 24
NO_TAGS = ['<none>:<none>']
RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60
def build_docker_args():
args = kwargs_from_env()
if 'tls' in args and os.environ.get('IGNORE_TLS_ISSUES', False):
args['tls'].verify = False
return args
def matches_system_error(status_str):
""" Returns true if the given status string matches a known system error in the
Docker builder.
"""
KNOWN_MATCHES = ['lxc-start: invalid', 'lxc-start: failed to', 'lxc-start: Permission denied']
for match in KNOWN_MATCHES:
# 10 because we might have a Unix control code at the start.
found = status_str.find(match[0:len(match) + 10])
if found >= 0 and found <= 10:
return True
return False
class StatusWrapper(object):
def __init__(self, build_uuid):
self._uuid = build_uuid
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
}
self.__exit__(None, None, None)
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
build_logs.set_status(self._uuid, self._status)
class _IncompleteJsonError(Exception):
def __init__(self, start_from):
self.start_from = start_from
class _StreamingJSONDecoder(json.JSONDecoder):
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
def decode(self, s, _w=WHITESPACE.match):
"""Return the Python representation of ``s`` (a ``str`` or ``unicode``
instance containing a JSON document)
"""
start_from = 0
while start_from < len(s):
try:
obj, end = self.raw_decode(s[start_from:], idx=_w(s[start_from:], 0).end())
except ValueError:
raise _IncompleteJsonError(start_from)
end = _w(s[start_from:], end).end()
start_from += end
yield obj
class StreamingDockerClient(Client):
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
content_buf = ''
for content in response.iter_content(chunk_size=256):
content_buf += content
try:
for val in json.loads(content_buf, cls=_StreamingJSONDecoder):
yield val
content_buf = ''
except _IncompleteJsonError as exc:
content_buf = content_buf[exc.start_from:]
class DockerfileBuildContext(object):
def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
push_token, build_uuid, cache_size_gb, pull_credentials=None):
self._build_dir = build_context_dir
self._dockerfile_subdir = dockerfile_subdir
self._repo = repo
self._tag_names = tag_names
self._push_token = push_token
self._status = StatusWrapper(build_uuid)
self._build_logger = partial(build_logs.append_log_message, build_uuid)
self._pull_credentials = pull_credentials
self._cache_size_gb = cache_size_gb
# Note: We have two different clients here because we (potentially) login
# with both, but with different credentials that we do not want shared between
# the build and push operations.
self._push_cl = StreamingDockerClient(timeout=1200, **build_docker_args())
self._build_cl = StreamingDockerClient(timeout=1200, **build_docker_args())
dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
'Dockerfile')
if not os.path.exists(dockerfile_path):
raise RuntimeError('Build job did not contain a Dockerfile.')
# Compute the number of steps
with open(dockerfile_path, 'r') as dockerfileobj:
self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read())
self.__inject_quay_repo_env(self._parsed_dockerfile, repo)
self._num_steps = len(self._parsed_dockerfile.commands)
with open(dockerfile_path, 'w') as dockerfileobj:
dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile))
logger.debug('Will build and push to repo %s with tags named: %s', self._repo,
self._tag_names)
def __enter__(self):
try:
self.__cleanup_containers()
self.__cleanup_images()
self.__prune_cache()
except APIError:
sentry.client.captureException()
message = 'Docker installation is no longer healthy.'
logger.exception(message)
raise WorkerUnhealthyException(message)
return self
def __exit__(self, exc_type, value, traceback):
shutil.rmtree(self._build_dir)
try:
self.__cleanup_containers()
except APIError:
sentry.client.captureException()
message = 'Docker installation is no longer healthy.'
logger.exception(message)
raise WorkerUnhealthyException(message)
@staticmethod
def __inject_quay_repo_env(parsed_dockerfile, quay_reponame):
env_command = {
'command': 'ENV',
'parameters': 'QUAY_REPOSITORY %s' % quay_reponame
}
for index, command in reversed(list(enumerate(parsed_dockerfile.commands))):
if command['command'] == 'FROM':
new_command_index = index + 1
logger.debug('Injecting env command at dockerfile index: %s', new_command_index)
parsed_dockerfile.commands.insert(new_command_index, env_command)
break
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status['current'] for status in statuses.values()])
total_bytes = sum([status['total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
@staticmethod
def __monitor_completion(status_stream, required_message, status_updater, status_completion_key,
num_images=0):
images = {}
for status in status_stream:
logger.debug('%s: %s', status_completion_key, status)
if 'status' in status:
status_msg = status['status']
if status_msg == required_message:
if 'progressDetail' in status and 'id' in status:
image_id = status['id']
detail = status['progressDetail']
if 'current' in detail and 'total' in detail:
images[image_id] = detail
with status_updater as status_update:
status_update[status_completion_key] = \
DockerfileBuildContext.__total_completion(images, max(len(images), num_images))
elif 'errorDetail' in status:
message = 'Error pushing image.'
if 'message' in status['errorDetail']:
message = str(status['errorDetail']['message'])
raise RuntimeError(message)
def pull(self):
image_and_tag_tuple = self._parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_logger('Missing FROM command in Dockerfile', build_logs.ERROR)
raise JobException('Missing FROM command in Dockerfile')
image_and_tag = ':'.join(image_and_tag_tuple)
# Login with the specified credentials (if any).
if self._pull_credentials:
logger.debug('Logging in with pull credentials: %s@%s',
self._pull_credentials['username'], self._pull_credentials['registry'])
self._build_logger('Pulling base image: %s' % image_and_tag, log_data={
'phasestep': 'login',
'username': self._pull_credentials['username'],
'registry': self._pull_credentials['registry']
})
self._build_cl.login(self._pull_credentials['username'], self._pull_credentials['password'],
registry=self._pull_credentials['registry'], reauth=True)
else:
self._build_logger('Pulling base image: %s' % image_and_tag, log_data={
'phasestep': 'pull',
'repo_url': image_and_tag
})
pull_status = self._build_cl.pull(image_and_tag, stream=True)
self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion')
def build(self, reservation_extension_method):
# Start the build itself.
logger.debug('Starting build.')
with self._status as status:
status['total_commands'] = self._num_steps
logger.debug('Building to tags named: %s', self._tag_names)
context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
logger.debug('Final context path: %s exists: %s', context_path,
os.path.exists(context_path))
build_status = self._build_cl.build(path=context_path, stream=True)
current_step = 0
built_image = None
for status in build_status:
fully_unwrapped = ""
if isinstance(status, dict):
keys_to_extract = ['error', 'status', 'stream']
for key in keys_to_extract:
if key in status:
fully_unwrapped = status[key]
break
if not fully_unwrapped:
logger.debug('Status dict did not have any extractable keys and was: %s', status)
elif isinstance(status, basestring):
fully_unwrapped = status
status_str = str(fully_unwrapped.encode('utf-8'))
# Check for system errors when building.
# DISABLED: LXC is super flaky, but this is causing build nodes to spasm.
#if matches_system_error(status_str):
# raise WorkerUnhealthyException(status_str)
logger.debug('Status: %s', status_str)
step_increment = re.search(r'Step ([0-9]+) :', status_str)
if step_increment:
self._build_logger(status_str, build_logs.COMMAND)
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s', current_step, self._num_steps)
with self._status as status_update:
status_update['current_command'] = current_step
# Tell the queue that we're making progress every time we advance a step
reservation_extension_method(RESERVATION_TIME)
continue
else:
self._build_logger(status_str)
complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s', built_image)
continue
# Get the image count
if not built_image:
return
return built_image
def push(self, built_image):
# Login to the registry
host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo)
if not host:
raise RuntimeError('Invalid repo name: %s' % self._repo)
for protocol in ['https', 'http']:
registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
logger.debug('Attempting login to registry: %s', registry_endpoint)
try:
self._push_cl.login('$token', self._push_token, registry=registry_endpoint)
break
except APIError:
pass # Probably the wrong protocol
for tag in self._tag_names:
logger.debug('Tagging image %s as %s:%s', built_image, self._repo, tag)
self._push_cl.tag(built_image, self._repo, tag)
history = self._push_cl.history(built_image)
num_images = len(history)
logger.debug('Pushing to repo %s', self._repo)
resp = self._push_cl.push(self._repo, stream=True)
self.__monitor_completion(resp, 'Pushing', self._status, 'push_completion', num_images)
def __cleanup_containers(self):
# First clean up any containers that might be holding the images
for running in self._build_cl.containers(quiet=True):
logger.debug('Killing container: %s', running['Id'])
self._build_cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in self._build_cl.containers(all=True, quiet=True):
logger.debug('Removing container: %s', container['Id'])
self._build_cl.remove_container(container['Id'])
def __cleanup_images(self):
""" Remove tags on internal nodes, and remove images older than the expiratino time. """
ids_to_images, ids_to_children = self.__compute_image_graph()
# Untag all internal nodes, which are usually the base images
for internal_id in ids_to_children.keys():
internal = ids_to_images[internal_id]
if internal['RepoTags'] != NO_TAGS:
for tag_name in internal['RepoTags']:
self._build_cl.remove_image(tag_name)
# Make sure all of the leaves have gibberish tags, and remove those older than our expiration
leaves = set(ids_to_images.keys()) - set(ids_to_children.keys())
now = datetime.now()
for leaf_id in leaves:
leaf = ids_to_images[leaf_id]
created = datetime.fromtimestamp(leaf['Created'])
expiration = created + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS)
if expiration > now:
# Assign a new tag as a uuid to preserve this image
new_tag = str(uuid4())
self._build_cl.tag(leaf['Id'], new_tag)
# Remove all of the existing tags
if leaf['RepoTags'] != NO_TAGS:
for tag_name in leaf['RepoTags']:
self._build_cl.remove_image(tag_name)
def __prune_cache(self):
""" Remove the oldest leaf image until the cache size is the desired size. """
logger.debug('Pruning cache to size(gb): %s', self._cache_size_gb)
while self.__compute_cache_size_gb() > self._cache_size_gb:
logger.debug('Locating the oldest image in the cache to prune.')
# Find the oldest tagged image and remove it
oldest_creation_time = datetime.max
oldest_image = None
for image in self._build_cl.images():
created = datetime.fromtimestamp(image['Created'])
if created < oldest_creation_time:
oldest_creation_time = created
oldest_image = image
logger.debug('Removing oldest image from cache: %s', oldest_image['Id'])
# Remove all tags on the oldest image
if oldest_image['RepoTags'] == NO_TAGS:
# Remove the image id directly since there are no tags
self._build_cl.remove_image(oldest_image['Id'])
else:
# Remove all tags
for tag_name in oldest_image['RepoTags']:
self._build_cl.remove_image(tag_name)
def __compute_cache_size_gb(self):
all_images = self._build_cl.images(all=True)
size_in_bytes = sum([img['Size'] for img in all_images])
size_in_gb = float(size_in_bytes)/1024/1024/1024
logger.debug('Computed cache size(gb) of: %s', size_in_gb)
return size_in_gb
def __compute_image_graph(self):
all_images = self._build_cl.images(all=True)
ids_to_images = {}
ids_to_children = defaultdict(list)
for image in all_images:
if image['ParentId'] != '':
ids_to_children[image['ParentId']].append(image)
ids_to_images[image['Id']] = image
return (ids_to_images, ids_to_children)
class DockerfileBuildWorker(Worker):
def __init__(self, cache_size_gb, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
self._mime_processors = {
'application/zip': DockerfileBuildWorker.__prepare_zip,
'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip,
'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
'application/x-tar': DockerfileBuildWorker.__prepare_tarball,
'application/gzip': DockerfileBuildWorker.__prepare_tarball,
'application/x-gzip': DockerfileBuildWorker.__prepare_tarball,
}
self._timeout = Event()
self._cache_size_gb = cache_size_gb
@staticmethod
def __prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.content)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
return build_dir
@staticmethod
def __prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def __prepare_tarball(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream:
safe_extractall(tar_stream, build_dir)
return build_dir
def watchdog(self):
logger.debug('Running build watchdog code.')
try:
docker_cl = Client(**build_docker_args())
# Iterate the running containers and kill ones that have been running more than 20 minutes
for container in docker_cl.containers():
start_time = datetime.fromtimestamp(container['Created'])
running_time = datetime.now() - start_time
if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES):
logger.warning('Container has been running too long: %s with command: %s',
container['Id'], container['Command'])
docker_cl.kill(container['Id'])
self._timeout.set()
except ConnectionError as exc:
logger.exception('Watchdog exception')
raise WorkerUnhealthyException(exc.message)
def process_queue_item(self, job_details):
self._timeout.clear()
# Make sure we have more information for debugging problems
sentry.client.user_context(job_details)
repository_build = model.get_repository_build(job_details['build_uuid'])
pull_credentials = job_details.get('pull_credentials', None)
job_config = json.loads(repository_build.job_config)
resource_url = user_files.get_file_url(repository_build.resource_key, requires_cors=False)
tag_names = job_config['docker_tags']
build_subdir = job_config['build_subdir']
# TODO remove the top branch when there are no more jobs with a repository config
if 'repository' in job_config:
repo = job_config['repository']
else:
repo = '%s/%s/%s' % (job_config['registry'],
repository_build.repository.namespace_user.username,
repository_build.repository.name)
access_token = repository_build.access_token.code
log_appender = partial(build_logs.append_log_message, repository_build.uuid)
# Lookup and save the version of docker being used.
try:
docker_cl = Client(**build_docker_args())
docker_version = docker_cl.version().get('Version', '')
except ConnectionError as exc:
logger.exception('Initial connection exception')
raise WorkerUnhealthyException(exc.message)
dash = docker_version.find('-')
# Strip any -tutum or whatever off of the version.
if dash > 0:
docker_version = docker_version[:dash]
log_appender('initializing', build_logs.PHASE, log_data={
'docker_version': docker_version
})
log_appender('Docker version: %s' % docker_version)
start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url, repo))
logger.debug(start_msg)
docker_resource = requests.get(resource_url, stream=True)
c_type = docker_resource.headers['content-type']
if ';' in c_type:
c_type = c_type.split(';')[0]
filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
(c_type, repo, tag_names))
logger.info(filetype_msg)
log_appender(filetype_msg)
# Spawn a notification that the build has started.
event_data = {
'build_id': repository_build.uuid,
'build_name': repository_build.display_name,
'docker_tags': tag_names,
'trigger_id': repository_build.trigger.uuid,
'trigger_kind': repository_build.trigger.service.name
}
spawn_notification(repository_build.repository, 'build_start', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
# Setup a handler for spawning failure messages.
def spawn_failure(message, event_data):
event_data['error_message'] = message
spawn_notification(repository_build.repository, 'build_failure', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unknown mime-type: %s' % c_type
log_appender(message, build_logs.ERROR)
spawn_failure(message, event_data)
raise JobException(message)
# Try to build the build directory package from the buildpack.
log_appender('unpacking', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.UNPACKING
repository_build.save()
build_dir = None
try:
build_dir = self._mime_processors[c_type](docker_resource)
except Exception as ex:
cur_message = ex.message or 'Error while unpacking build package'
log_appender(cur_message, build_logs.ERROR)
spawn_failure(cur_message, event_data)
raise JobException(cur_message)
# Start the build process.
try:
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, access_token,
repository_build.uuid, self._cache_size_gb,
pull_credentials) as build_ctxt:
log_appender('pulling', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.PULLING
repository_build.save()
build_ctxt.pull()
self.extend_processing(RESERVATION_TIME)
log_appender('building', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.BUILDING
repository_build.save()
built_image = build_ctxt.build(self.extend_processing)
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unable to build dockerfile.'
if self._timeout.is_set():
message = 'Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES
log_appender(message, build_logs.ERROR)
raise JobException(message)
self.extend_processing(RESERVATION_TIME)
log_appender('pushing', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.PUSHING
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.COMPLETE
repository_build.save()
# Spawn a notification that the build has completed.
spawn_notification(repository_build.repository, 'build_success', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
except WorkerUnhealthyException as exc:
# Spawn a notification that the build has failed.
log_appender('Worker has become unhealthy. Will retry shortly.', build_logs.ERROR)
spawn_failure(exc.message, event_data)
# Raise the exception to the queue.
raise exc
except JobException as exc:
# Spawn a notification that the build has failed.
spawn_failure(exc.message, event_data)
# Raise the exception to the queue.
raise exc
except ConnectionError as exc:
# A connection exception means the worker has become unhealthy (Docker is down)
# so we re-raise as that exception.
logger.exception('Build connection exception')
log_appender('Docker daemon has gone away. Will retry shortly.', build_logs.ERROR)
raise WorkerUnhealthyException(exc.message)
except Exception as exc:
# Spawn a notification that the build has failed.
spawn_failure(exc.message, event_data)
# Write the error to the logs.
sentry.client.captureException()
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
# Raise the exception to the queue.
raise JobException(str(exc))
if __name__ == "__main__":
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
reservation_seconds=RESERVATION_TIME)
worker.start(start_status_server_port=8000)