import logging import trollius import json from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory from autobahn.asyncio.websocket import WampWebSocketServerFactory from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event from trollius.coroutines import From from datetime import timedelta from buildman.enums import BuildJobResult, BuildServerStatus, RESULT_PHASES from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, metric_queue logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 JOB_TIMEOUT_SECONDS = 300 SETUP_LEEWAY_SECONDS = 30 MINIMUM_JOB_EXTENSION = timedelta(minutes=1) HEARTBEAT_PERIOD_SEC = 30 class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None self._current_status = BuildServerStatus.STARTING self._current_components = [] self._realm_map = {} self._job_count = 0 self._session_factory = RouterSessionFactory(RouterFactory()) self._registry_hostname = registry_hostname self._queue = queue self._build_logs = build_logs self._user_files = user_files self._lifecycle_manager = lifecycle_manager_klass( self._register_component, self._unregister_component, self._job_heartbeat, self._job_complete, manager_hostname, HEARTBEAT_PERIOD_SEC, ) self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() self._current_status = BuildServerStatus.RUNNING self._register_controller() def _register_controller(self): controller_app = Flask('controller') server = self @controller_app.route('/status') def status(): metrics = server._queue.get_metrics() (running_count, available_not_running_count, available_count) = metrics workers = [component for component in server._current_components if component.kind() == 'builder'] data = { 'status': server._current_status, 'running_local': server._job_count, 'running_total': running_count, 'workers': len(workers), 'job_total': available_count + running_count } return json.dumps(data) self._controller_app = controller_app def run(self, host, websocket_port, controller_port, ssl=None): logger.debug('Initializing the lifecycle manager') self._lifecycle_manager.initialize(self._lifecycle_manager_config) logger.debug('Initializing all members of the event loop') loop = trollius.get_event_loop() logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) try: loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl)) except KeyboardInterrupt: pass finally: loop.close() def close(self): logger.debug('Requested server shutdown') self._current_status = BuildServerStatus.SHUTDOWN self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') def _register_component(self, realm, component_klass, **kwargs): """ Registers a component with the server. The component_klass must derive from BaseComponent. """ logger.debug('Registering component with realm %s', realm) if realm in self._realm_map: logger.debug('Component with realm %s already registered', realm) return self._realm_map[realm] component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs) component.server = self component.parent_manager = self._lifecycle_manager component.build_logs = self._build_logs component.user_files = self._user_files component.registry_hostname = self._registry_hostname self._realm_map[realm] = component self._current_components.append(component) self._session_factory.add(component) return component def _unregister_component(self, component): logger.debug('Unregistering component with realm %s and token %s', component.builder_realm, component.expected_token) self._realm_map.pop(component.builder_realm) self._current_components.remove(component) self._session_factory.remove(component) def _job_heartbeat(self, build_job): self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status, executor_name=None, update_phase=False): if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) else: self._queue.complete(build_job.job_item) if update_phase: status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler.set_phase(RESULT_PHASES[job_status]) self._job_count = self._job_count - 1 if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() _report_completion_status(build_job, job_status, executor_name) @trollius.coroutine def _work_checker(self): logger.debug('Initializing work checker') while self._current_status == BuildServerStatus.RUNNING: with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) processing_time = self._lifecycle_manager.setup_time() + SETUP_LEEWAY_SECONDS job_item = self._queue.get(processing_time=processing_time, ordering_required=True) if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) continue try: build_job = BuildJob(job_item) except BuildJobLoadException as irbe: logger.exception(irbe) self._queue.incomplete(job_item, restore_retry=False) continue logger.debug('Checking for an avaliable worker for build job %s', build_job.repo_build.uuid) try: schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job)) except: logger.exception('Exception when scheduling job: %s', build_job.repo_build.uuid) self._current_status = BuildServerStatus.EXCEPTION self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT) return if schedule_success: logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED) self._job_count = self._job_count + 1 logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, self._job_count) else: logger.debug('All workers are busy for job %s Requeuing after %s seconds.', build_job.repo_build.uuid, retry_timeout) self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout) @trollius.coroutine def _queue_metrics_updater(self): logger.debug('Initializing queue metrics updater') while self._current_status == BuildServerStatus.RUNNING: logger.debug('Writing metrics') self._queue.update_metrics() logger.debug('Metrics going to sleep for 30 seconds') yield From(trollius.sleep(30)) @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): self._loop = loop # Create the WAMP server. transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False) transport_factory.setProtocolOptions(failByDrop=True) # Initialize the controller server and the WAMP server create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl) yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl)) # Initialize the metrics updater trollius.async(self._queue_metrics_updater()) # Initialize the work queue checker. yield From(self._work_checker()) def _report_completion_status(build_job, status, executor_name): metric_queue.build_counter.Inc(labelvalues=[status]) metric_queue.repository_build_completed.Inc(labelvalues=[build_job.namespace, build_job.repo_name, status, executor_name or 'executor']) if status == BuildJobResult.COMPLETE: status_name = 'CompleteBuilds' elif status == BuildJobResult.ERROR: status_name = 'FailedBuilds' elif status == BuildJobResult.INCOMPLETE: status_name = 'IncompletedBuilds' else: return metric_queue.put_deprecated(status_name, 1, unit='Count')