This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/server.py

204 lines
6.9 KiB
Python
Raw Normal View History

import logging
import trollius
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
2014-11-18 20:45:56 +00:00
from autobahn.asyncio.websocket import WampWebSocketServerFactory
from autobahn.wamp import types
from aiowsgi import create_server as create_wsgi_server
from flask import Flask
2014-11-18 20:45:56 +00:00
from threading import Event
2015-02-13 16:21:34 +00:00
from trollius.tasks import Task
from trollius.coroutines import From
from datetime import timedelta
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database
from app import app
2015-02-13 16:21:34 +00:00
# pylint: disable=invalid-name
logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
HEARTBEAT_PERIOD_SEC = 30
2015-02-13 16:21:34 +00:00
# pylint: disable=too-few-public-methods
2014-11-18 20:45:56 +00:00
class BuildJobResult(object):
""" Build job result enum """
INCOMPLETE = 'incomplete'
COMPLETE = 'complete'
ERROR = 'error'
2015-02-13 16:21:34 +00:00
# pylint: disable=too-many-instance-attributes
class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
2015-02-13 16:21:34 +00:00
# pylint: disable=too-many-arguments
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_hostname):
2014-11-18 20:45:56 +00:00
self._loop = None
self._current_status = 'starting'
self._current_components = []
self._job_count = 0
2014-11-18 20:45:56 +00:00
self._session_factory = RouterSessionFactory(RouterFactory())
self._registry_hostname = registry_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
self._lifecycle_manager = lifecycle_manager_klass(
2014-11-18 20:45:56 +00:00
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
2014-11-18 20:45:56 +00:00
)
self._lifecycle_manager_config = lifecycle_manager_config
self._shutdown_event = Event()
self._current_status = 'running'
self._register_controller()
def _register_controller(self):
controller_app = Flask('controller')
server = self
@controller_app.route('/status')
def status():
return server._current_status
self._controller_app = controller_app
def run(self, host, websocket_port, controller_port, ssl=None):
logger.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize(self._lifecycle_manager_config)
logger.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
controller_port)
2015-02-13 16:21:34 +00:00
tasks = [
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
Task(self._metrics_updater()),
]
try:
2015-02-13 16:21:34 +00:00
loop.run_until_complete(trollius.wait(tasks))
except KeyboardInterrupt:
2014-11-18 20:45:56 +00:00
pass
finally:
2014-11-18 20:45:56 +00:00
loop.close()
def close(self):
logger.debug('Requested server shutdown')
self._current_status = 'shutting_down'
self._lifecycle_manager.shutdown()
self._shutdown_event.wait()
logger.debug('Shutting down server')
def _register_component(self, realm, component_klass, **kwargs):
""" Registers a component with the server. The component_klass must derive from
BaseComponent.
"""
logger.debug('Registering component with realm %s', realm)
2014-11-18 20:45:56 +00:00
component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs)
component.server = self
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.registry_hostname = self._registry_hostname
self._current_components.append(component)
self._session_factory.add(component)
return component
def _unregister_component(self, component):
logger.debug('Unregistering component with realm %s and token %s',
2014-11-18 20:45:56 +00:00
component.builder_realm, component.expected_token)
self._current_components.remove(component)
self._session_factory.remove(component)
def _job_heartbeat(self, build_job):
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION)
def _job_complete(self, build_job, job_status):
if job_status == BuildJobResult.INCOMPLETE:
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
else:
self._queue.complete(build_job.job_item)
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
with database.CloseForLongOperation(app.config):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
logger.debug('Checking for more work for %d active workers',
self._lifecycle_manager.num_workers())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
continue
try:
build_job = BuildJob(job_item)
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
continue
logger.debug('Build job found. Checking for an avaliable worker.')
scheduled = yield From(self._lifecycle_manager.schedule(build_job))
if scheduled:
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase('build-scheduled')
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
2014-11-18 20:45:56 +00:00
2015-02-13 16:21:34 +00:00
@trollius.coroutine
def _metrics_updater(self):
while self._current_status == 'running':
yield From(trollius.sleep(30))
self._queue.update_metrics()
@trollius.coroutine
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
self._loop = loop
2014-11-18 20:45:56 +00:00
# Create the WAMP server.
2014-11-18 21:35:03 +00:00
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False)
2014-11-18 20:45:56 +00:00
transport_factory.setProtocolOptions(failByDrop=True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
2015-02-13 16:21:34 +00:00
# Initialize the coroutine reporting metrics
loop.create_task(self._metrics_updater())
# Initialize the work queue checker.
2014-11-18 21:34:09 +00:00
yield From(self._work_checker())