This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/server.py
2015-02-03 16:29:47 -05:00

182 lines
6.2 KiB
Python

import logging
import trollius
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory
from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event
from trollius.coroutines import From
from datetime import timedelta
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database
from data.queue import WorkQueue
from app import app
logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
HEARTBEAT_PERIOD_SEC = 30
class BuildJobResult(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_hostname):
self._loop = None
self._current_status = 'starting'
self._current_components = []
self._job_count = 0
self._session_factory = RouterSessionFactory(RouterFactory())
self._registry_hostname = registry_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
)
self._lifecycle_manager_config = lifecycle_manager_config
self._shutdown_event = Event()
self._current_status = 'running'
self._register_controller()
def _register_controller(self):
controller_app = Flask('controller')
server = self
@controller_app.route('/status')
def status():
return server._current_status
self._controller_app = controller_app
def run(self, host, websocket_port, controller_port, ssl=None):
logger.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize(self._lifecycle_manager_config)
logger.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
controller_port)
try:
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
except KeyboardInterrupt:
pass
finally:
loop.close()
def close(self):
logger.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logger.debug('Shutting down server')
def _register_component(self, realm, component_klass, **kwargs):
""" Registers a component with the server. The component_klass must derive from
BaseComponent.
"""
logger.debug('Registering component with realm %s', realm)
component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs)
component.server = self
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.registry_hostname = self._registry_hostname
self._current_components.append(component)
self._session_factory.add(component)
return component
def _unregister_component(self, component):
logger.debug('Unregistering component with realm %s and token %s',
component.builder_realm, component.expected_token)
self._current_components.remove(component)
self._session_factory.remove(component)
def _job_heartbeat(self, build_job):
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION)
def _job_complete(self, build_job, job_status):
if job_status == BuildJobResult.INCOMPLETE:
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
else:
self._queue.complete(build_job.job_item)
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
with database.CloseForLongOperation(app.config):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
logger.debug('Checking for more work for %d active workers',
self._lifecycle_manager.num_workers())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
continue
try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
continue
logger.debug('Build job found. Checking for an avaliable worker.')
scheduled = yield From(self._lifecycle_manager.schedule(build_job))
if scheduled:
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
@trollius.coroutine
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
self._loop = loop
# Create the WAMP server.
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False)
transport_factory.setProtocolOptions(failByDrop=True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
# Initialize the work queue checker.
yield From(self._work_checker())