This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/server.py
2014-11-13 19:41:17 -05:00

164 lines
5.5 KiB
Python

import logging
import trollius
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory, WampWebSocketServerProtocol
from autobahn.wamp import types
from autobahn.wamp.exception import ApplicationError
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event, Lock
from trollius.coroutines import From
from buildjob import BuildJob, BuildJobLoadException
logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
class BUILD_JOB_RESULT(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
_loop = None
_current_status = 'starting'
_current_components = []
_job_count = 0
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
self._session_factory = RouterSessionFactory(RouterFactory())
self._server_hostname = server_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
self._register_component, self._unregister_component, self._job_complete)
self._shutdown_event = Event()
self._current_status = 'running'
self._register_controller()
def _register_controller(self):
controller_app = Flask('controller')
server = self
@controller_app.route('/status')
def status():
return server._current_status
self._controller_app = controller_app
def run(self, host):
logging.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize()
logging.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
trollius.Task(self._initialize(loop, host))
logging.debug('Starting server on port 8080, with controller on port 8181')
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
def close(self):
logging.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logging.debug('Shutting down server')
def _register_component(self, realm, component_klass, **kwargs):
""" Registers a component with the server. The component_klass must derive from
BaseComponent.
"""
logging.debug('Registering component with realm %s', realm)
component = component_klass(types.ComponentConfig(realm = realm), realm=realm, **kwargs)
component.server = self
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.server_hostname = self._server_hostname
self._current_components.append(component)
self._session_factory.add(component)
return component
def _unregister_component(self, component):
logging.debug('Unregistering component with realm %s and token %s',
component.builder_realm, component.expected_token)
self._current_components.remove(component)
self._session_factory.remove(component)
def _job_complete(self, build_job, job_status):
if job_status == BUILD_JOB_RESULT.INCOMPLETE:
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
elif job_status == BUILD_JOB_RESULT.ERROR:
self._queue.incomplete(build_job.job_item(), restore_retry=False)
else:
self._queue.complete(build_job.job_item())
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
# TODO: check for work here?
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
logger.debug('Checking for more work')
job_item = self._queue.get(processing_time=RESERVATION_SECONDS)
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
continue
try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(build_job, self._loop):
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host):
self._loop = loop
# Create the WAMP server.
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp = False)
transport_factory.setProtocolOptions(failByDrop = True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=8181)
yield From(loop.create_server(transport_factory, host, 8080))
# Initialize the work queue checker.
yield self._work_checker()