import logging import trollius from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory from autobahn.asyncio.websocket import WampWebSocketServerFactory from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, build_metrics logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 JOB_TIMEOUT_SECONDS = 300 MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 class BuildJobResult(object): """ Build job result enum """ INCOMPLETE = 'incomplete' COMPLETE = 'complete' ERROR = 'error' class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None self._current_status = 'starting' self._current_components = [] self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) self._registry_hostname = registry_hostname self._queue = queue self._build_logs = build_logs self._user_files = user_files self._lifecycle_manager = lifecycle_manager_klass( self._register_component, self._unregister_component, self._job_heartbeat, self._job_complete, manager_hostname, HEARTBEAT_PERIOD_SEC, ) self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() self._current_status = 'running' self._register_controller() def _register_controller(self): controller_app = Flask('controller') server = self @controller_app.route('/status') def status(): return server._current_status self._controller_app = controller_app def run(self, host, websocket_port, controller_port, ssl=None): logger.debug('Initializing the lifecycle manager') self._lifecycle_manager.initialize(self._lifecycle_manager_config) logger.debug('Initializing all members of the event loop') loop = trollius.get_event_loop() logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) tasks = [ Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), Task(self._queue_metrics_updater()), ] try: loop.run_until_complete(trollius.wait(tasks)) except KeyboardInterrupt: pass finally: loop.close() def close(self): logger.debug('Requested server shutdown') self._current_status = 'shutting_down' self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') def _register_component(self, realm, component_klass, **kwargs): """ Registers a component with the server. The component_klass must derive from BaseComponent. """ logger.debug('Registering component with realm %s', realm) component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self component.parent_manager = self._lifecycle_manager component.build_logs = self._build_logs component.user_files = self._user_files component.registry_hostname = self._registry_hostname self._current_components.append(component) self._session_factory.add(component) return component def _unregister_component(self, component): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) self._current_components.remove(component) self._session_factory.remove(component) def _job_heartbeat(self, build_job): self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): build_metrics.report_completion_status(job_status) if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) else: self._queue.complete(build_job.job_item) self._job_count = self._job_count - 1 if self._current_status == 'shutting_down' and not self._job_count: self._shutdown_event.set() @trollius.coroutine def _work_checker(self): while self._current_status == 'running': with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) continue try: build_job = BuildJob(job_item) except BuildJobLoadException as irbe: logger.exception(irbe) self._queue.incomplete(job_item, restore_retry=False) continue logger.debug('Build job found. Checking for an avaliable worker.') scheduled = yield From(self._lifecycle_manager.schedule(build_job)) if scheduled: status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler.set_phase('build-scheduled') self._job_count = self._job_count + 1 logger.debug('Build job scheduled. Running: %s', self._job_count) else: logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) @trollius.coroutine def _queue_metrics_updater(self): while self._current_status == 'running': yield From(trollius.sleep(30)) self._queue.update_metrics() @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): self._loop = loop # Create the WAMP server. transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False) transport_factory.setProtocolOptions(failByDrop=True) # Initialize the controller server and the WAMP server create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl) yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl)) # Initialize the work queue checker. yield From(self._work_checker())