import logging import trollius from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory from autobahn.asyncio.websocket import WampWebSocketServerFactory from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event from trollius.coroutines import From from datetime import timedelta from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data.queue import WorkQueue logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 JOB_TIMEOUT_SECONDS = 300 MINIMUM_JOB_EXTENSION = timedelta(minutes=2) WEBSOCKET_PORT = 8787 CONTROLLER_PORT = 8686 HEARTBEAT_PERIOD_SEC = 30 class BuildJobResult(object): """ Build job result enum """ INCOMPLETE = 'incomplete' COMPLETE = 'complete' ERROR = 'error' class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None self._current_status = 'starting' self._current_components = [] self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) self._registry_hostname = registry_hostname self._queue = queue self._build_logs = build_logs self._user_files = user_files self._lifecycle_manager = lifecycle_manager_klass( self._register_component, self._unregister_component, self._job_heartbeat, self._job_complete, manager_hostname, HEARTBEAT_PERIOD_SEC, ) self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() self._current_status = 'running' self._register_controller() def _register_controller(self): controller_app = Flask('controller') server = self @controller_app.route('/status') def status(): return server._current_status self._controller_app = controller_app def run(self, host, ssl=None): logger.debug('Initializing the lifecycle manager') self._lifecycle_manager.initialize(self._lifecycle_manager_config) logger.debug('Initializing all members of the event loop') loop = trollius.get_event_loop() trollius.Task(self._initialize(loop, host, ssl)) logger.debug('Starting server on port %s, with controller on port %s', WEBSOCKET_PORT, CONTROLLER_PORT) try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.close() def close(self): logger.debug('Requested server shutdown') self._current_status = 'shutting_down' self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') def _register_component(self, realm, component_klass, **kwargs): """ Registers a component with the server. The component_klass must derive from BaseComponent. """ logger.debug('Registering component with realm %s', realm) component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self component.parent_manager = self._lifecycle_manager component.build_logs = self._build_logs component.user_files = self._user_files component.registry_hostname = self._registry_hostname self._current_components.append(component) self._session_factory.add(component) return component def _unregister_component(self, component): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) self._current_components.remove(component) self._session_factory.remove(component) def _job_heartbeat(self, build_job): WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30) elif job_status == BuildJobResult.ERROR: self._queue.incomplete(build_job.job_item, restore_retry=False) else: self._queue.complete(build_job.job_item) self._job_count = self._job_count - 1 if self._current_status == 'shutting_down' and not self._job_count: self._shutdown_event.set() # TODO(jschorr): check for work here? @trollius.coroutine def _work_checker(self): while self._current_status == 'running': logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) continue try: build_job = BuildJob(job_item) except BuildJobLoadException as irbe: logger.exception(irbe) self._queue.incomplete(job_item, restore_retry=False) logger.debug('Build job found. Checking for an avaliable worker.') scheduled = yield From(self._lifecycle_manager.schedule(build_job)) if scheduled: self._job_count = self._job_count + 1 logger.debug('Build job scheduled. Running: %s', self._job_count) else: logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @trollius.coroutine def _initialize(self, loop, host, ssl=None): self._loop = loop # Create the WAMP server. transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False) transport_factory.setProtocolOptions(failByDrop=True) # Initialize the controller server and the WAMP server create_wsgi_server(self._controller_app, loop=loop, host=host, port=CONTROLLER_PORT, ssl=ssl) yield From(loop.create_server(transport_factory, host, WEBSOCKET_PORT, ssl=ssl)) # Initialize the work queue checker. yield From(self._work_checker())