2014-11-11 23:23:15 +00:00
|
|
|
import logging
|
|
|
|
import trollius
|
2015-01-28 22:12:33 +00:00
|
|
|
import json
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
from autobahn.asyncio.wamp import RouterFactory, RouterSessionFactory
|
2014-11-18 20:45:56 +00:00
|
|
|
from autobahn.asyncio.websocket import WampWebSocketServerFactory
|
2014-11-11 23:23:15 +00:00
|
|
|
from autobahn.wamp import types
|
|
|
|
|
|
|
|
from aiowsgi import create_server as create_wsgi_server
|
|
|
|
from flask import Flask
|
2014-11-18 20:45:56 +00:00
|
|
|
from threading import Event
|
2014-11-11 23:23:15 +00:00
|
|
|
from trollius.coroutines import From
|
2014-12-16 18:37:40 +00:00
|
|
|
from datetime import timedelta
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2015-02-18 19:13:36 +00:00
|
|
|
from buildman.enums import BuildJobResult, BuildServerStatus
|
2015-02-12 21:38:43 +00:00
|
|
|
from buildman.jobutil.buildstatus import StatusHandler
|
2014-11-25 21:14:44 +00:00
|
|
|
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
2015-01-29 19:40:24 +00:00
|
|
|
from data import database
|
2015-08-11 20:39:33 +00:00
|
|
|
from app import app, metric_queue
|
2014-11-12 19:03:07 +00:00
|
|
|
|
2014-11-30 22:48:02 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2014-11-14 00:41:17 +00:00
|
|
|
WORK_CHECK_TIMEOUT = 10
|
2014-11-11 23:23:15 +00:00
|
|
|
TIMEOUT_PERIOD_MINUTES = 20
|
2014-11-21 19:27:06 +00:00
|
|
|
JOB_TIMEOUT_SECONDS = 300
|
|
|
|
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2014-12-22 22:24:44 +00:00
|
|
|
HEARTBEAT_PERIOD_SEC = 30
|
2014-11-25 21:14:44 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
class BuilderServer(object):
|
|
|
|
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
|
|
|
controller.
|
|
|
|
"""
|
2014-12-16 18:41:30 +00:00
|
|
|
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
2014-12-31 16:33:56 +00:00
|
|
|
lifecycle_manager_config, manager_hostname):
|
2014-11-18 20:45:56 +00:00
|
|
|
self._loop = None
|
2015-02-18 19:13:36 +00:00
|
|
|
self._current_status = BuildServerStatus.STARTING
|
2014-11-18 20:45:56 +00:00
|
|
|
self._current_components = []
|
2015-06-26 01:22:39 +00:00
|
|
|
self._realm_map = {}
|
2014-11-18 20:45:56 +00:00
|
|
|
self._job_count = 0
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2014-11-18 20:45:56 +00:00
|
|
|
self._session_factory = RouterSessionFactory(RouterFactory())
|
2014-12-16 18:41:30 +00:00
|
|
|
self._registry_hostname = registry_hostname
|
2014-11-11 23:23:15 +00:00
|
|
|
self._queue = queue
|
|
|
|
self._build_logs = build_logs
|
|
|
|
self._user_files = user_files
|
|
|
|
self._lifecycle_manager = lifecycle_manager_klass(
|
2014-11-18 20:45:56 +00:00
|
|
|
self._register_component,
|
|
|
|
self._unregister_component,
|
2014-11-21 19:27:06 +00:00
|
|
|
self._job_heartbeat,
|
2014-12-16 18:41:30 +00:00
|
|
|
self._job_complete,
|
2014-12-31 16:33:56 +00:00
|
|
|
manager_hostname,
|
2014-12-22 22:24:44 +00:00
|
|
|
HEARTBEAT_PERIOD_SEC,
|
2014-11-18 20:45:56 +00:00
|
|
|
)
|
2014-12-16 18:41:30 +00:00
|
|
|
self._lifecycle_manager_config = lifecycle_manager_config
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
self._shutdown_event = Event()
|
2015-02-18 19:13:36 +00:00
|
|
|
self._current_status = BuildServerStatus.RUNNING
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
self._register_controller()
|
|
|
|
|
|
|
|
def _register_controller(self):
|
|
|
|
controller_app = Flask('controller')
|
|
|
|
server = self
|
|
|
|
|
|
|
|
@controller_app.route('/status')
|
|
|
|
def status():
|
2015-10-06 05:28:43 +00:00
|
|
|
metrics = server._queue.get_metrics()
|
2015-02-18 00:15:54 +00:00
|
|
|
(running_count, available_not_running_count, available_count) = metrics
|
2015-01-28 22:12:33 +00:00
|
|
|
|
|
|
|
workers = [component for component in server._current_components
|
|
|
|
if component.kind() == 'builder']
|
|
|
|
|
|
|
|
data = {
|
|
|
|
'status': server._current_status,
|
|
|
|
'running_local': server._job_count,
|
|
|
|
'running_total': running_count,
|
|
|
|
'workers': len(workers),
|
2015-02-18 00:15:54 +00:00
|
|
|
'job_total': available_count + running_count
|
2015-01-28 22:12:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return json.dumps(data)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
self._controller_app = controller_app
|
|
|
|
|
2015-01-05 20:09:03 +00:00
|
|
|
def run(self, host, websocket_port, controller_port, ssl=None):
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Initializing the lifecycle manager')
|
2014-12-16 18:41:30 +00:00
|
|
|
self._lifecycle_manager.initialize(self._lifecycle_manager_config)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Initializing all members of the event loop')
|
2014-11-11 23:23:15 +00:00
|
|
|
loop = trollius.get_event_loop()
|
|
|
|
|
2015-01-05 20:09:03 +00:00
|
|
|
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
|
|
|
controller_port)
|
2015-02-13 16:21:34 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
try:
|
2015-02-25 18:55:18 +00:00
|
|
|
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
|
2014-11-11 23:23:15 +00:00
|
|
|
except KeyboardInterrupt:
|
2014-11-18 20:45:56 +00:00
|
|
|
pass
|
2014-11-11 23:23:15 +00:00
|
|
|
finally:
|
2014-11-18 20:45:56 +00:00
|
|
|
loop.close()
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
def close(self):
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Requested server shutdown')
|
2015-02-18 19:13:36 +00:00
|
|
|
self._current_status = BuildServerStatus.SHUTDOWN
|
2014-11-11 23:23:15 +00:00
|
|
|
self._lifecycle_manager.shutdown()
|
|
|
|
self._shutdown_event.wait()
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Shutting down server')
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
def _register_component(self, realm, component_klass, **kwargs):
|
|
|
|
""" Registers a component with the server. The component_klass must derive from
|
|
|
|
BaseComponent.
|
|
|
|
"""
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Registering component with realm %s', realm)
|
2015-06-26 01:22:39 +00:00
|
|
|
if realm in self._realm_map:
|
|
|
|
logger.debug('Component with realm %s already registered', realm)
|
|
|
|
return self._realm_map[realm]
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2014-11-18 20:45:56 +00:00
|
|
|
component = component_klass(types.ComponentConfig(realm=realm), realm=realm, **kwargs)
|
2014-11-11 23:23:15 +00:00
|
|
|
component.server = self
|
|
|
|
component.parent_manager = self._lifecycle_manager
|
|
|
|
component.build_logs = self._build_logs
|
|
|
|
component.user_files = self._user_files
|
2014-12-16 18:41:30 +00:00
|
|
|
component.registry_hostname = self._registry_hostname
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2015-06-26 01:22:39 +00:00
|
|
|
self._realm_map[realm] = component
|
2014-11-11 23:23:15 +00:00
|
|
|
self._current_components.append(component)
|
|
|
|
self._session_factory.add(component)
|
|
|
|
return component
|
|
|
|
|
|
|
|
def _unregister_component(self, component):
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('Unregistering component with realm %s and token %s',
|
2014-11-18 20:45:56 +00:00
|
|
|
component.builder_realm, component.expected_token)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2015-06-26 01:22:39 +00:00
|
|
|
self._realm_map.pop(component.builder_realm)
|
2014-11-11 23:23:15 +00:00
|
|
|
self._current_components.remove(component)
|
|
|
|
self._session_factory.remove(component)
|
|
|
|
|
2014-11-21 19:27:06 +00:00
|
|
|
def _job_heartbeat(self, build_job):
|
2015-01-29 23:40:41 +00:00
|
|
|
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
|
|
|
|
minimum_extension=MINIMUM_JOB_EXTENSION)
|
2014-11-21 19:27:06 +00:00
|
|
|
|
2014-11-12 19:03:07 +00:00
|
|
|
def _job_complete(self, build_job, job_status):
|
2014-11-18 20:45:56 +00:00
|
|
|
if job_status == BuildJobResult.INCOMPLETE:
|
2015-02-03 21:29:47 +00:00
|
|
|
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
|
2014-11-11 23:23:15 +00:00
|
|
|
else:
|
2015-01-29 19:50:07 +00:00
|
|
|
self._queue.complete(build_job.job_item)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
self._job_count = self._job_count - 1
|
|
|
|
|
2015-02-18 19:13:36 +00:00
|
|
|
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
|
2014-11-11 23:23:15 +00:00
|
|
|
self._shutdown_event.set()
|
|
|
|
|
2015-08-11 20:39:33 +00:00
|
|
|
report_completion_status(job_status)
|
2015-02-18 19:13:36 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
@trollius.coroutine
|
|
|
|
def _work_checker(self):
|
2015-02-25 20:15:22 +00:00
|
|
|
logger.debug('Initializing work checker')
|
2015-02-18 19:13:36 +00:00
|
|
|
while self._current_status == BuildServerStatus.RUNNING:
|
2015-01-29 23:40:01 +00:00
|
|
|
with database.CloseForLongOperation(app.config):
|
|
|
|
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
|
|
|
|
2014-12-22 22:24:44 +00:00
|
|
|
logger.debug('Checking for more work for %d active workers',
|
|
|
|
self._lifecycle_manager.num_workers())
|
2015-01-29 19:40:24 +00:00
|
|
|
|
2016-05-24 21:42:11 +00:00
|
|
|
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time(),
|
2016-06-03 19:42:14 +00:00
|
|
|
ordering_required=True)
|
2014-11-11 23:23:15 +00:00
|
|
|
if job_item is None:
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
2014-11-11 23:23:15 +00:00
|
|
|
continue
|
|
|
|
|
2014-11-12 19:03:07 +00:00
|
|
|
try:
|
|
|
|
build_job = BuildJob(job_item)
|
|
|
|
except BuildJobLoadException as irbe:
|
2014-11-30 22:48:02 +00:00
|
|
|
logger.exception(irbe)
|
2014-11-12 19:03:07 +00:00
|
|
|
self._queue.incomplete(job_item, restore_retry=False)
|
2015-01-29 23:01:42 +00:00
|
|
|
continue
|
2014-11-12 19:03:07 +00:00
|
|
|
|
2015-06-10 18:18:12 +00:00
|
|
|
logger.debug('Checking for an avaliable worker for build job %s',
|
|
|
|
build_job.repo_build.uuid)
|
2015-02-25 17:09:14 +00:00
|
|
|
|
|
|
|
try:
|
2015-06-10 18:17:32 +00:00
|
|
|
schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job))
|
2015-02-25 17:09:14 +00:00
|
|
|
except:
|
2015-06-10 18:18:12 +00:00
|
|
|
logger.exception('Exception when scheduling job: %s', build_job.repo_build.uuid)
|
2015-02-25 17:19:21 +00:00
|
|
|
self._current_status = BuildServerStatus.EXCEPTION
|
|
|
|
return
|
2015-02-25 17:09:14 +00:00
|
|
|
|
2015-06-10 18:17:32 +00:00
|
|
|
if schedule_success:
|
2015-02-25 21:00:14 +00:00
|
|
|
logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid)
|
2015-02-12 21:38:43 +00:00
|
|
|
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
|
2015-05-14 19:37:16 +00:00
|
|
|
status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)
|
2015-02-12 21:38:43 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
self._job_count = self._job_count + 1
|
2015-02-25 21:00:14 +00:00
|
|
|
logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid,
|
|
|
|
self._job_count)
|
2014-11-11 23:23:15 +00:00
|
|
|
else:
|
2015-06-10 18:18:12 +00:00
|
|
|
logger.debug('All workers are busy for job %s Requeuing after %s seconds.',
|
|
|
|
build_job.repo_build.uuid, retry_timeout)
|
2015-06-10 18:17:32 +00:00
|
|
|
self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout)
|
2014-11-18 20:45:56 +00:00
|
|
|
|
2015-02-13 16:21:34 +00:00
|
|
|
@trollius.coroutine
|
2015-02-17 17:23:08 +00:00
|
|
|
def _queue_metrics_updater(self):
|
2015-02-25 18:55:18 +00:00
|
|
|
logger.debug('Initializing queue metrics updater')
|
2015-02-18 19:13:36 +00:00
|
|
|
while self._current_status == BuildServerStatus.RUNNING:
|
2015-02-25 17:19:21 +00:00
|
|
|
logger.debug('Writing metrics')
|
2015-02-13 16:21:34 +00:00
|
|
|
self._queue.update_metrics()
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2015-02-25 20:15:22 +00:00
|
|
|
logger.debug('Metrics going to sleep for 30 seconds')
|
|
|
|
yield From(trollius.sleep(30))
|
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
@trollius.coroutine
|
2015-01-05 20:09:03 +00:00
|
|
|
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
2014-11-12 19:03:07 +00:00
|
|
|
self._loop = loop
|
2014-11-18 20:45:56 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
# Create the WAMP server.
|
2014-11-18 21:35:03 +00:00
|
|
|
transport_factory = WampWebSocketServerFactory(self._session_factory, debug_wamp=False)
|
2014-11-18 20:45:56 +00:00
|
|
|
transport_factory.setProtocolOptions(failByDrop=True)
|
2014-11-11 23:23:15 +00:00
|
|
|
|
|
|
|
# Initialize the controller server and the WAMP server
|
2015-01-05 20:09:03 +00:00
|
|
|
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
|
|
|
|
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
|
2014-11-11 23:23:15 +00:00
|
|
|
|
2015-02-25 18:55:18 +00:00
|
|
|
# Initialize the metrics updater
|
2015-02-25 21:00:46 +00:00
|
|
|
trollius.async(self._queue_metrics_updater())
|
2015-02-25 18:55:18 +00:00
|
|
|
|
2014-11-11 23:23:15 +00:00
|
|
|
# Initialize the work queue checker.
|
2014-11-18 21:34:09 +00:00
|
|
|
yield From(self._work_checker())
|
2015-08-11 20:39:33 +00:00
|
|
|
|
|
|
|
def report_completion_status(status):
|
|
|
|
if status == BuildJobResult.COMPLETE:
|
|
|
|
status_name = 'CompleteBuilds'
|
|
|
|
elif status == BuildJobResult.ERROR:
|
|
|
|
status_name = 'FailedBuilds'
|
|
|
|
elif status == BuildJobResult.INCOMPLETE:
|
|
|
|
status_name = 'IncompletedBuilds'
|
|
|
|
else:
|
|
|
|
return
|
|
|
|
|
|
|
|
metric_queue.put(status_name, 1, unit='Count')
|