Lint BuildManager
This commit is contained in:
parent
043a30ee96
commit
6df6f28edf
9 changed files with 187 additions and 173 deletions
|
@ -10,7 +10,7 @@ from trollius.coroutines import From
|
||||||
from buildman.basecomponent import BaseComponent
|
from buildman.basecomponent import BaseComponent
|
||||||
from buildman.buildpack import BuildPackage, BuildPackageException
|
from buildman.buildpack import BuildPackage, BuildPackageException
|
||||||
from buildman.buildstatus import StatusHandler
|
from buildman.buildstatus import StatusHandler
|
||||||
from buildman.server import BUILD_JOB_RESULT
|
from buildman.server import BuildJobResult
|
||||||
from buildman.workererror import WorkerError
|
from buildman.workererror import WorkerError
|
||||||
|
|
||||||
from data.database import BUILD_PHASE
|
from data.database import BUILD_PHASE
|
||||||
|
@ -18,9 +18,10 @@ from data.database import BUILD_PHASE
|
||||||
HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
|
HEARTBEAT_DELTA = datetime.timedelta(seconds=30)
|
||||||
HEARTBEAT_TIMEOUT = 10
|
HEARTBEAT_TIMEOUT = 10
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
class COMPONENT_STATUS(object):
|
class ComponentStatus(object):
|
||||||
|
""" ComponentStatus represents the possible states of a component. """
|
||||||
JOINING = 'joining'
|
JOINING = 'joining'
|
||||||
WAITING = 'waiting'
|
WAITING = 'waiting'
|
||||||
RUNNING = 'running'
|
RUNNING = 'running'
|
||||||
|
@ -34,7 +35,7 @@ class BuildComponent(BaseComponent):
|
||||||
expected_token = None
|
expected_token = None
|
||||||
builder_realm = None
|
builder_realm = None
|
||||||
|
|
||||||
_component_status = COMPONENT_STATUS.JOINING
|
_component_status = ComponentStatus.JOINING
|
||||||
_last_heartbeat = None
|
_last_heartbeat = None
|
||||||
_current_job = None
|
_current_job = None
|
||||||
_build_status = None
|
_build_status = None
|
||||||
|
@ -50,29 +51,31 @@ class BuildComponent(BaseComponent):
|
||||||
self.join(self.builder_realm)
|
self.join(self.builder_realm)
|
||||||
|
|
||||||
def onJoin(self, details):
|
def onJoin(self, details):
|
||||||
logger.debug('Registering methods and listeners for component %s' % self.builder_realm)
|
LOGGER.debug('Registering methods and listeners for component %s', self.builder_realm)
|
||||||
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
||||||
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
||||||
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
||||||
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
||||||
|
|
||||||
self._set_status(COMPONENT_STATUS.WAITING)
|
self._set_status(ComponentStatus.WAITING)
|
||||||
|
|
||||||
def is_ready(self):
|
def is_ready(self):
|
||||||
return self._component_status == COMPONENT_STATUS.RUNNING
|
""" Determines whether a build component is ready to begin a build. """
|
||||||
|
return self._component_status == ComponentStatus.RUNNING
|
||||||
|
|
||||||
def start_build(self, build_job):
|
def start_build(self, build_job):
|
||||||
|
""" Starts a build. """
|
||||||
self._current_job = build_job
|
self._current_job = build_job
|
||||||
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
||||||
self._image_info = {}
|
self._image_info = {}
|
||||||
|
|
||||||
self._set_status(COMPONENT_STATUS.BUILDING)
|
self._set_status(ComponentStatus.BUILDING)
|
||||||
|
|
||||||
# Retrieve the job's buildpack.
|
# Retrieve the job's buildpack.
|
||||||
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
||||||
requires_cors=False)
|
requires_cors=False)
|
||||||
|
|
||||||
logger.debug('Retreiving build package: %s' % buildpack_url)
|
LOGGER.debug('Retreiving build package: %s', buildpack_url)
|
||||||
buildpack = None
|
buildpack = None
|
||||||
try:
|
try:
|
||||||
buildpack = BuildPackage.from_url(buildpack_url)
|
buildpack = BuildPackage.from_url(buildpack_url)
|
||||||
|
@ -82,7 +85,7 @@ class BuildComponent(BaseComponent):
|
||||||
|
|
||||||
# Extract the base image information from the Dockerfile.
|
# Extract the base image information from the Dockerfile.
|
||||||
parsed_dockerfile = None
|
parsed_dockerfile = None
|
||||||
logger.debug('Parsing dockerfile')
|
LOGGER.debug('Parsing dockerfile')
|
||||||
|
|
||||||
build_config = build_job.build_config()
|
build_config = build_job.build_config()
|
||||||
try:
|
try:
|
||||||
|
@ -118,7 +121,7 @@ class BuildComponent(BaseComponent):
|
||||||
# build_package: URL to the build package to download and untar/unzip.
|
# build_package: URL to the build package to download and untar/unzip.
|
||||||
# sub_directory: The location within the build package of the Dockerfile and the build context.
|
# sub_directory: The location within the build package of the Dockerfile and the build context.
|
||||||
# repository: The repository for which this build is occurring.
|
# repository: The repository for which this build is occurring.
|
||||||
# registry: The registry for which this build is occuring. Example: 'quay.io', 'staging.quay.io'
|
# registry: The registry for which this build is occuring (e.g. 'quay.io', 'staging.quay.io').
|
||||||
# pull_token: The token to use when pulling the cache for building.
|
# pull_token: The token to use when pulling the cache for building.
|
||||||
# push_token: The token to use to push the built image.
|
# push_token: The token to use to push the built image.
|
||||||
# tag_names: The name(s) of the tag(s) for the newly built image.
|
# tag_names: The name(s) of the tag(s) for the newly built image.
|
||||||
|
@ -140,21 +143,22 @@ class BuildComponent(BaseComponent):
|
||||||
}
|
}
|
||||||
|
|
||||||
# Invoke the build.
|
# Invoke the build.
|
||||||
logger.debug('Invoking build: %s', self.builder_realm)
|
LOGGER.debug('Invoking build: %s', self.builder_realm)
|
||||||
logger.debug('With Arguments: %s', build_arguments)
|
LOGGER.debug('With Arguments: %s', build_arguments)
|
||||||
|
|
||||||
return (self.call("io.quay.builder.build", **build_arguments)
|
return (self.call("io.quay.builder.build", **build_arguments)
|
||||||
.add_done_callback(self._build_complete))
|
.add_done_callback(self._build_complete))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __total_completion(statuses, total_images):
|
def _total_completion(statuses, total_images):
|
||||||
|
""" Returns the current amount completion relative to the total completion of a build. """
|
||||||
percentage_with_sizes = float(len(statuses.values())) / total_images
|
percentage_with_sizes = float(len(statuses.values())) / total_images
|
||||||
sent_bytes = sum([status['current'] for status in statuses.values()])
|
sent_bytes = sum([status['current'] for status in statuses.values()])
|
||||||
total_bytes = sum([status['total'] for status in statuses.values()])
|
total_bytes = sum([status['total'] for status in statuses.values()])
|
||||||
return float(sent_bytes) / total_bytes * percentage_with_sizes
|
return float(sent_bytes) / total_bytes * percentage_with_sizes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __process_pushpull_status(status_dict, current_phase, docker_data, images):
|
def _process_pushpull_status(status_dict, current_phase, docker_data, images):
|
||||||
if not docker_data:
|
if not docker_data:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -178,7 +182,7 @@ class BuildComponent(BaseComponent):
|
||||||
if 'current' in detail and 'total' in detail:
|
if 'current' in detail and 'total' in detail:
|
||||||
images[image_id] = detail
|
images[image_id] = detail
|
||||||
status_dict[status_completion_key] = \
|
status_dict[status_completion_key] = \
|
||||||
BuildComponent.__total_completion(images, max(len(images), num_images))
|
BuildComponent._total_completion(images, max(len(images), num_images))
|
||||||
|
|
||||||
def _on_log_message(self, phase, json_data):
|
def _on_log_message(self, phase, json_data):
|
||||||
# Parse any of the JSON data logged.
|
# Parse any of the JSON data logged.
|
||||||
|
@ -210,7 +214,7 @@ class BuildComponent(BaseComponent):
|
||||||
# the pull/push progress, as well as the current step index.
|
# the pull/push progress, as well as the current step index.
|
||||||
with self._build_status as status_dict:
|
with self._build_status as status_dict:
|
||||||
self._build_status.set_phase(phase)
|
self._build_status.set_phase(phase)
|
||||||
BuildComponent.__process_pushpull_status(status_dict, phase, docker_data, self._image_info)
|
BuildComponent._process_pushpull_status(status_dict, phase, docker_data, self._image_info)
|
||||||
|
|
||||||
# If the current message represents the beginning of a new step, then update the
|
# If the current message represents the beginning of a new step, then update the
|
||||||
# current command index.
|
# current command index.
|
||||||
|
@ -228,91 +232,97 @@ class BuildComponent(BaseComponent):
|
||||||
|
|
||||||
|
|
||||||
def _build_failure(self, error_message, exception=None):
|
def _build_failure(self, error_message, exception=None):
|
||||||
|
""" Handles and logs a failed build. """
|
||||||
self._build_status.set_error(error_message, {
|
self._build_status.set_error(error_message, {
|
||||||
'internal_error': exception.message if exception else None
|
'internal_error': exception.message if exception else None
|
||||||
})
|
})
|
||||||
|
|
||||||
build_id = self._current_job.repo_build().uuid
|
build_id = self._current_job.repo_build().uuid
|
||||||
logger.warning('Build %s failed with message: %s', build_id, self._error_message)
|
LOGGER.warning('Build %s failed with message: %s', build_id, error_message)
|
||||||
|
|
||||||
# Mark that the build has finished (in an error state)
|
# Mark that the build has finished (in an error state)
|
||||||
self._build_finished(BUILD_JOB_RESULT.ERROR)
|
self._build_finished(BuildJobResult.ERROR)
|
||||||
|
|
||||||
def _build_complete(self, result):
|
def _build_complete(self, result):
|
||||||
|
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
|
||||||
try:
|
try:
|
||||||
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
||||||
result.result()
|
result.result()
|
||||||
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
||||||
self._build_finished(BUILD_JOB_RESULT.COMPLETE)
|
self._build_finished(BuildJobResult.COMPLETE)
|
||||||
except ApplicationError as ae:
|
except ApplicationError as aex:
|
||||||
worker_error = WorkerError(ae.error, ae.kwargs.get('base_error'))
|
worker_error = WorkerError(aex.error, aex.kwargs.get('base_error'))
|
||||||
|
|
||||||
# Write the error to the log.
|
# Write the error to the log.
|
||||||
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
|
self._build_status.set_error(worker_error.public_message(), worker_error.extra_data())
|
||||||
|
|
||||||
# Mark the build as completed.
|
# Mark the build as completed.
|
||||||
if worker_error.is_internal_error():
|
if worker_error.is_internal_error():
|
||||||
self._build_finished(BUILD_JOB_RESULT.INCOMPLETE)
|
self._build_finished(BuildJobResult.INCOMPLETE)
|
||||||
else:
|
else:
|
||||||
self._build_finished(BUILD_JOB_RESULT.ERROR)
|
self._build_finished(BuildJobResult.ERROR)
|
||||||
|
|
||||||
def _build_finished(self, job_status):
|
def _build_finished(self, job_status):
|
||||||
|
""" Alerts the parent that a build has completed and sets the status back to running. """
|
||||||
self.parent_manager.job_completed(self._current_job, job_status, self)
|
self.parent_manager.job_completed(self._current_job, job_status, self)
|
||||||
self._current_job = None
|
self._current_job = None
|
||||||
|
|
||||||
# Set the component back to a running state.
|
# Set the component back to a running state.
|
||||||
self._set_status(COMPONENT_STATUS.RUNNING)
|
self._set_status(ComponentStatus.RUNNING)
|
||||||
|
|
||||||
def _ping(self):
|
def _ping():
|
||||||
|
""" Ping pong. """
|
||||||
return 'pong'
|
return 'pong'
|
||||||
|
|
||||||
def _on_ready(self, token):
|
def _on_ready(self, token):
|
||||||
if self._component_status != 'waiting':
|
if self._component_status != 'waiting':
|
||||||
logger.warning('Build component with token %s is already connected', self.expected_token)
|
LOGGER.warning('Build component with token %s is already connected', self.expected_token)
|
||||||
return
|
return
|
||||||
|
|
||||||
if token != self.expected_token:
|
if token != self.expected_token:
|
||||||
logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token)
|
LOGGER.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._set_status(COMPONENT_STATUS.RUNNING)
|
self._set_status(ComponentStatus.RUNNING)
|
||||||
|
|
||||||
# Start the heartbeat check and updating loop.
|
# Start the heartbeat check and updating loop.
|
||||||
loop = trollius.get_event_loop()
|
loop = trollius.get_event_loop()
|
||||||
loop.create_task(self._heartbeat(loop))
|
loop.create_task(self._heartbeat())
|
||||||
logger.debug('Build worker %s is connected and ready' % self.builder_realm)
|
LOGGER.debug('Build worker %s is connected and ready', self.builder_realm)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _set_status(self, phase):
|
def _set_status(self, phase):
|
||||||
self._component_status = phase
|
self._component_status = phase
|
||||||
|
|
||||||
def _on_heartbeat(self):
|
def _on_heartbeat(self):
|
||||||
|
""" Updates the last known heartbeat. """
|
||||||
self._last_heartbeat = datetime.datetime.now()
|
self._last_heartbeat = datetime.datetime.now()
|
||||||
|
|
||||||
def _start_heartbeat(self, loop):
|
def _start_heartbeat(self, loop):
|
||||||
|
""" Begins an async loop to keep a heartbeat going with a client. """
|
||||||
trollius.set_event_loop(loop)
|
trollius.set_event_loop(loop)
|
||||||
loop.run_until_complete(self._heartbeat())
|
loop.run_until_complete(self._heartbeat())
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _heartbeat(self, loop):
|
def _heartbeat(self):
|
||||||
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
|
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
|
||||||
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
and updating the heartbeat in the build status dictionary (if applicable). This allows
|
||||||
the build system to catch crashes from either end.
|
the build system to catch crashes from either end.
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
# If the component is no longer running or actively building, nothing more to do.
|
# If the component is no longer running or actively building, nothing more to do.
|
||||||
if (self._component_status != COMPONENT_STATUS.RUNNING and
|
if (self._component_status != ComponentStatus.RUNNING and
|
||||||
self._component_status != COMPONENT_STATUS.BUILDING):
|
self._component_status != ComponentStatus.BUILDING):
|
||||||
return
|
return
|
||||||
|
|
||||||
# If there is an active build, write the heartbeat to its status.
|
# If there is an active build, write the heartbeat to its status.
|
||||||
build_status = self._build_status
|
build_status = self._build_status
|
||||||
if build_status is not None:
|
if build_status is not None:
|
||||||
with build_status as status_dict:
|
with build_status as status_dict:
|
||||||
status_dict['heartbeat'] = int(time.time())
|
status_dict['heartbeat'] = int(datetime.time())
|
||||||
|
|
||||||
# Check the heartbeat from the worker.
|
# Check the heartbeat from the worker.
|
||||||
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
LOGGER.debug('Checking heartbeat on realm %s', self.builder_realm)
|
||||||
if not self._last_heartbeat:
|
if not self._last_heartbeat:
|
||||||
self._timeout()
|
self._timeout()
|
||||||
return
|
return
|
||||||
|
@ -324,8 +334,8 @@ class BuildComponent(BaseComponent):
|
||||||
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
|
||||||
|
|
||||||
def _timeout(self):
|
def _timeout(self):
|
||||||
self._set_status(COMPONENT_STATUS.TIMED_OUT)
|
self._set_status(ComponentStatus.TIMED_OUT)
|
||||||
logger.warning('Build component %s timed out', self.expected_token)
|
LOGGER.warning('Build component %s timed out', self.expected_token)
|
||||||
self._dispose(timed_out=True)
|
self._dispose(timed_out=True)
|
||||||
|
|
||||||
def _dispose(self, timed_out=False):
|
def _dispose(self, timed_out=False):
|
||||||
|
@ -335,7 +345,7 @@ class BuildComponent(BaseComponent):
|
||||||
if timed_out:
|
if timed_out:
|
||||||
self._build_status.set_error('Build worker timed out. Build has been requeued')
|
self._build_status.set_error('Build worker timed out. Build has been requeued')
|
||||||
|
|
||||||
self.parent_manager.job_completed(self._current_job, BUILD_JOB_RESULT.INCOMPLETE, self)
|
self.parent_manager.job_completed(self._current_job, BuildJobResult.INCOMPLETE, self)
|
||||||
self._build_status = None
|
self._build_status = None
|
||||||
self._current_job = None
|
self._current_job = None
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ class BuildJobLoadException(Exception):
|
||||||
|
|
||||||
class BuildJob(object):
|
class BuildJob(object):
|
||||||
""" Represents a single in-progress build job. """
|
""" Represents a single in-progress build job. """
|
||||||
|
|
||||||
def __init__(self, job_item):
|
def __init__(self, job_item):
|
||||||
self._job_item = job_item
|
self._job_item = job_item
|
||||||
|
|
||||||
|
@ -16,7 +15,8 @@ class BuildJob(object):
|
||||||
self._job_details = json.loads(job_item.body)
|
self._job_details = json.loads(job_item.body)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise BuildJobLoadException(
|
raise BuildJobLoadException(
|
||||||
'Could not parse build queue item config with ID %s' % self._job_details['build_uuid'])
|
'Could not parse build queue item config with ID %s' % self._job_details['build_uuid']
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._repo_build = model.get_repository_build(self._job_details['namespace'],
|
self._repo_build = model.get_repository_build(self._job_details['namespace'],
|
||||||
|
@ -30,7 +30,8 @@ class BuildJob(object):
|
||||||
self._build_config = json.loads(self._repo_build.job_config)
|
self._build_config = json.loads(self._repo_build.job_config)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise BuildJobLoadException(
|
raise BuildJobLoadException(
|
||||||
'Could not parse repository build job config with ID %s' % self._job_details['build_uuid'])
|
'Could not parse repository build job config with ID %s' % self._job_details['build_uuid']
|
||||||
|
)
|
||||||
|
|
||||||
def determine_cached_tag(self):
|
def determine_cached_tag(self):
|
||||||
""" Returns the tag to pull to prime the cache or None if none. """
|
""" Returns the tag to pull to prime the cache or None if none. """
|
||||||
|
|
|
@ -4,7 +4,8 @@ import os
|
||||||
|
|
||||||
from tempfile import TemporaryFile, mkdtemp
|
from tempfile import TemporaryFile, mkdtemp
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile
|
from util.dockerfileparse import parse_dockerfile
|
||||||
|
from util.safetar import safe_extractall
|
||||||
|
|
||||||
class BuildPackageException(Exception):
|
class BuildPackageException(Exception):
|
||||||
""" Exception raised when retrieving or parsing a build package. """
|
""" Exception raised when retrieving or parsing a build package. """
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from functools import partial
|
|
||||||
from data.database import BUILD_PHASE
|
from data.database import BUILD_PHASE
|
||||||
|
|
||||||
class StatusHandler(object):
|
class StatusHandler(object):
|
||||||
|
|
|
@ -6,7 +6,7 @@ from app import app, userfiles as user_files, build_logs, dockerfile_build_queue
|
||||||
from buildman.manager.enterprise import EnterpriseManager
|
from buildman.manager.enterprise import EnterpriseManager
|
||||||
from buildman.server import BuilderServer
|
from buildman.server import BuilderServer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from buildman.buildcomponent import BuildComponent
|
||||||
from trollius.coroutines import From
|
from trollius.coroutines import From
|
||||||
|
|
||||||
REGISTRATION_REALM = 'registration'
|
REGISTRATION_REALM = 'registration'
|
||||||
logger = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DynamicRegistrationComponent(BaseComponent):
|
class DynamicRegistrationComponent(BaseComponent):
|
||||||
""" Component session that handles dynamic registration of the builder components. """
|
""" Component session that handles dynamic registration of the builder components. """
|
||||||
|
@ -17,12 +17,12 @@ class DynamicRegistrationComponent(BaseComponent):
|
||||||
self.join(REGISTRATION_REALM)
|
self.join(REGISTRATION_REALM)
|
||||||
|
|
||||||
def onJoin(self, details):
|
def onJoin(self, details):
|
||||||
logger.debug('Registering registration method')
|
LOGGER.debug('Registering registration method')
|
||||||
yield From(self.register(self._worker_register, u'io.quay.buildworker.register'))
|
yield From(self.register(self._worker_register, u'io.quay.buildworker.register'))
|
||||||
|
|
||||||
def _worker_register(self):
|
def _worker_register(self):
|
||||||
realm = self.parent_manager.add_build_component()
|
realm = self.parent_manager.add_build_component()
|
||||||
logger.debug('Registering new build component+worker with realm %s', realm)
|
LOGGER.debug('Registering new build component+worker with realm %s', realm)
|
||||||
return realm
|
return realm
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ class EnterpriseManager(BaseManager):
|
||||||
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
|
self.register_component(REGISTRATION_REALM, DynamicRegistrationComponent)
|
||||||
|
|
||||||
def add_build_component(self):
|
def add_build_component(self):
|
||||||
|
""" Adds a new build component for an Enterprise Registry. """
|
||||||
# Generate a new unique realm ID for the build worker.
|
# Generate a new unique realm ID for the build worker.
|
||||||
realm = str(uuid.uuid4())
|
realm = str(uuid.uuid4())
|
||||||
component = self.register_component(realm, BuildComponent, token="")
|
component = self.register_component(realm, BuildComponent, token="")
|
||||||
|
@ -44,6 +45,7 @@ class EnterpriseManager(BaseManager):
|
||||||
return realm
|
return realm
|
||||||
|
|
||||||
def schedule(self, build_job, loop):
|
def schedule(self, build_job, loop):
|
||||||
|
""" Schedules a build for an Enterprise Registry. """
|
||||||
if self.shutting_down:
|
if self.shutting_down:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -2,24 +2,23 @@ import logging
|
||||||
import trollius
|
import trollius
|
||||||
|
|
||||||
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
|
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
|
||||||
from autobahn.asyncio.websocket import WampWebSocketServerFactory, WampWebSocketServerProtocol
|
from autobahn.asyncio.websocket import WampWebSocketServerFactory
|
||||||
from autobahn.wamp import types
|
from autobahn.wamp import types
|
||||||
from autobahn.wamp.exception import ApplicationError
|
|
||||||
|
|
||||||
from aiowsgi import create_server as create_wsgi_server
|
from aiowsgi import create_server as create_wsgi_server
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
from threading import Event, Lock
|
from threading import Event
|
||||||
from trollius.coroutines import From
|
from trollius.coroutines import From
|
||||||
|
|
||||||
from buildjob import BuildJob, BuildJobLoadException
|
from buildman.buildjob import BuildJob, BuildJobLoadException
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
WORK_CHECK_TIMEOUT = 10
|
WORK_CHECK_TIMEOUT = 10
|
||||||
TIMEOUT_PERIOD_MINUTES = 20
|
TIMEOUT_PERIOD_MINUTES = 20
|
||||||
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
|
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
|
||||||
|
|
||||||
class BUILD_JOB_RESULT(object):
|
class BuildJobResult(object):
|
||||||
""" Build job result enum """
|
""" Build job result enum """
|
||||||
INCOMPLETE = 'incomplete'
|
INCOMPLETE = 'incomplete'
|
||||||
COMPLETE = 'complete'
|
COMPLETE = 'complete'
|
||||||
|
@ -29,20 +28,22 @@ class BuilderServer(object):
|
||||||
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
||||||
controller.
|
controller.
|
||||||
"""
|
"""
|
||||||
_loop = None
|
|
||||||
_current_status = 'starting'
|
|
||||||
_current_components = []
|
|
||||||
_job_count = 0
|
|
||||||
|
|
||||||
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
|
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
|
||||||
self._session_factory = RouterSessionFactory(RouterFactory())
|
self._loop = None
|
||||||
|
self._current_status = 'starting'
|
||||||
|
self._current_components = []
|
||||||
|
self._job_count = 0
|
||||||
|
|
||||||
|
self._session_factory = RouterSessionFactory(RouterFactory())
|
||||||
self._server_hostname = server_hostname
|
self._server_hostname = server_hostname
|
||||||
self._queue = queue
|
self._queue = queue
|
||||||
self._build_logs = build_logs
|
self._build_logs = build_logs
|
||||||
self._user_files = user_files
|
self._user_files = user_files
|
||||||
self._lifecycle_manager = lifecycle_manager_klass(
|
self._lifecycle_manager = lifecycle_manager_klass(
|
||||||
self._register_component, self._unregister_component, self._job_complete)
|
self._register_component,
|
||||||
|
self._unregister_component,
|
||||||
|
self._job_complete
|
||||||
|
)
|
||||||
|
|
||||||
self._shutdown_event = Event()
|
self._shutdown_event = Event()
|
||||||
self._current_status = 'running'
|
self._current_status = 'running'
|
||||||
|
@ -107,9 +108,9 @@ class BuilderServer(object):
|
||||||
self._session_factory.remove(component)
|
self._session_factory.remove(component)
|
||||||
|
|
||||||
def _job_complete(self, build_job, job_status):
|
def _job_complete(self, build_job, job_status):
|
||||||
if job_status == BUILD_JOB_RESULT.INCOMPLETE:
|
if job_status == BuildJobResult.INCOMPLETE:
|
||||||
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
|
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
|
||||||
elif job_status == BUILD_JOB_RESULT.ERROR:
|
elif job_status == BuildJobResult.ERROR:
|
||||||
self._queue.incomplete(build_job.job_item(), restore_retry=False)
|
self._queue.incomplete(build_job.job_item(), restore_retry=False)
|
||||||
else:
|
else:
|
||||||
self._queue.complete(build_job.job_item())
|
self._queue.complete(build_job.job_item())
|
||||||
|
@ -119,30 +120,30 @@ class BuilderServer(object):
|
||||||
if self._current_status == 'shutting_down' and not self._job_count:
|
if self._current_status == 'shutting_down' and not self._job_count:
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
# TODO: check for work here?
|
# TODO:(jschorr) check for work here?
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _work_checker(self):
|
def _work_checker(self):
|
||||||
while self._current_status == 'running':
|
while self._current_status == 'running':
|
||||||
logger.debug('Checking for more work')
|
LOGGER.debug('Checking for more work')
|
||||||
job_item = self._queue.get(processing_time=RESERVATION_SECONDS)
|
job_item = self._queue.get(processing_time=RESERVATION_SECONDS)
|
||||||
if job_item is None:
|
if job_item is None:
|
||||||
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
LOGGER.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
||||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
build_job = BuildJob(job_item)
|
build_job = BuildJob(job_item)
|
||||||
except BuildJobLoadException as irbe:
|
except BuildJobLoadException as irbe:
|
||||||
logger.exception(irbe)
|
LOGGER.exception(irbe)
|
||||||
self._queue.incomplete(job_item, restore_retry=False)
|
self._queue.incomplete(job_item, restore_retry=False)
|
||||||
|
|
||||||
logger.debug('Build job found. Checking for an avaliable worker.')
|
LOGGER.debug('Build job found. Checking for an avaliable worker.')
|
||||||
if self._lifecycle_manager.schedule(build_job, self._loop):
|
if self._lifecycle_manager.schedule(build_job, self._loop):
|
||||||
self._job_count = self._job_count + 1
|
self._job_count = self._job_count + 1
|
||||||
logger.debug('Build job scheduled. Running: %s', self._job_count)
|
LOGGER.debug('Build job scheduled. Running: %s', self._job_count)
|
||||||
else:
|
else:
|
||||||
logger.debug('All workers are busy. Requeuing.')
|
LOGGER.debug('All workers are busy. Requeuing.')
|
||||||
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
|
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
|
||||||
|
|
||||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||||
|
|
Reference in a new issue