WIP: Start implementation of the build manager/controller. This code is not yet working completely.

This commit is contained in:
Joseph Schorr 2014-11-11 18:23:15 -05:00
parent 2ccbea95a5
commit eacf3f01d2
9 changed files with 576 additions and 0 deletions

146
buildman/server.py Normal file
View file

@ -0,0 +1,146 @@
import logging
import trollius
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
from autobahn.asyncio.websocket import WampWebSocketServerFactory, WampWebSocketServerProtocol
from autobahn.wamp import types
from autobahn.wamp.exception import ApplicationError
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
from threading import Event, Lock
from trollius.coroutines import From
logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 30
TIMEOUT_PERIOD_MINUTES = 20
RESERVATION_SECONDS = (TIMEOUT_PERIOD_MINUTES + 5) * 60
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
_loop = None
_current_status = 'starting'
_current_components = []
_job_count = 0
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
self._session_factory = RouterSessionFactory(RouterFactory())
self._server_hostname = server_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
self._register_component, self._unregister_component, self._job_complete)
self._shutdown_event = Event()
self._current_status = 'running'
self._register_controller()
def _register_controller(self):
controller_app = Flask('controller')
server = self
@controller_app.route('/status')
def status():
return server._current_status
self._controller_app = controller_app
def run(self, host):
logging.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize()
logging.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
trollius.Task(self._initialize(loop, host))
logging.debug('Starting server on port 8080, with controller on port 8181')
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
def close(self):
logging.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logging.debug('Shutting down server')
def _register_component(self, realm, component_klass, **kwargs):
""" Registers a component with the server. The component_klass must derive from
BaseComponent.
"""
logging.debug('Registering component with realm %s', realm)
component = component_klass(types.ComponentConfig(realm = realm), realm=realm, **kwargs)
component.server = self
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.server_hostname = self._server_hostname
self._current_components.append(component)
self._session_factory.add(component)
return component
def _unregister_component(self, component):
logging.debug('Unregistering component with realm %s and token %s',
component.builder_realm, component.expected_token)
self._current_components.remove(component)
self._session_factory.remove(component)
def _job_complete(self, job_item, job_status):
if job_status == 'incomplete':
self._queue.incomplete(job_item, restore_retry=True)
elif job_status == 'error':
self._queue.incomplete(job_item, restore_retry=False)
else:
self._queue.complete(job)
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
logger.debug('Checking for more work')
job_item = self._queue.get(processing_time=RESERVATION_SECONDS)
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
continue
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(job_item):
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host):
# Create the WAMP server.
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp = False)
transport_factory.setProtocolOptions(failByDrop = True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=8181)
yield From(loop.create_server(transport_factory, host, 8080))
# Initialize the work queue checker.
yield self._work_checker()