Merge branch 'master' into quark

This commit is contained in:
Joseph Schorr 2015-02-13 16:24:53 -05:00
commit fbdbc21eb1
137 changed files with 8691 additions and 2414 deletions

View file

@ -12,8 +12,11 @@ from threading import Event
from trollius.coroutines import From
from datetime import timedelta
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database
from data.queue import WorkQueue
from app import app
logger = logging.getLogger(__name__)
@ -22,8 +25,7 @@ TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
WEBSOCKET_PORT = 8787
CONTROLLER_PORT = 8686
HEARTBEAT_PERIOD_SEC = 30
class BuildJobResult(object):
""" Build job result enum """
@ -35,14 +37,15 @@ class BuilderServer(object):
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
controller.
"""
def __init__(self, server_hostname, queue, build_logs, user_files, lifecycle_manager_klass):
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
lifecycle_manager_config, manager_hostname):
self._loop = None
self._current_status = 'starting'
self._current_components = []
self._job_count = 0
self._session_factory = RouterSessionFactory(RouterFactory())
self._server_hostname = server_hostname
self._registry_hostname = registry_hostname
self._queue = queue
self._build_logs = build_logs
self._user_files = user_files
@ -50,8 +53,11 @@ class BuilderServer(object):
self._register_component,
self._unregister_component,
self._job_heartbeat,
self._job_complete
self._job_complete,
manager_hostname,
HEARTBEAT_PERIOD_SEC,
)
self._lifecycle_manager_config = lifecycle_manager_config
self._shutdown_event = Event()
self._current_status = 'running'
@ -81,18 +87,17 @@ class BuilderServer(object):
self._controller_app = controller_app
def run(self, host, ssl=None):
def run(self, host, websocket_port, controller_port, ssl=None):
logger.debug('Initializing the lifecycle manager')
self._lifecycle_manager.initialize()
self._lifecycle_manager.initialize(self._lifecycle_manager_config)
logger.debug('Initializing all members of the event loop')
loop = trollius.get_event_loop()
trollius.Task(self._initialize(loop, host, ssl))
logger.debug('Starting server on port %s, with controller on port %s', WEBSOCKET_PORT,
CONTROLLER_PORT)
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
controller_port)
try:
loop.run_forever()
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
except KeyboardInterrupt:
pass
finally:
@ -116,7 +121,7 @@ class BuilderServer(object):
component.parent_manager = self._lifecycle_manager
component.build_logs = self._build_logs
component.user_files = self._user_files
component.server_hostname = self._server_hostname
component.registry_hostname = self._registry_hostname
self._current_components.append(component)
self._session_factory.add(component)
@ -130,32 +135,32 @@ class BuilderServer(object):
self._session_factory.remove(component)
def _job_heartbeat(self, build_job):
WorkQueue.extend_processing(build_job.job_item(), seconds_from_now=JOB_TIMEOUT_SECONDS,
retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION)
self._queue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS,
minimum_extension=MINIMUM_JOB_EXTENSION)
def _job_complete(self, build_job, job_status):
if job_status == BuildJobResult.INCOMPLETE:
self._queue.incomplete(build_job.job_item(), restore_retry=True, retry_after=30)
elif job_status == BuildJobResult.ERROR:
self._queue.incomplete(build_job.job_item(), restore_retry=False)
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
else:
self._queue.complete(build_job.job_item())
self._queue.complete(build_job.job_item)
self._job_count = self._job_count - 1
if self._current_status == 'shutting_down' and not self._job_count:
self._shutdown_event.set()
# TODO(jschorr): check for work here?
@trollius.coroutine
def _work_checker(self):
while self._current_status == 'running':
logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers())
with database.CloseForLongOperation(app.config):
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
logger.debug('Checking for more work for %d active workers',
self._lifecycle_manager.num_workers())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
continue
try:
@ -163,20 +168,24 @@ class BuilderServer(object):
except BuildJobLoadException as irbe:
logger.exception(irbe)
self._queue.incomplete(job_item, restore_retry=False)
continue
logger.debug('Build job found. Checking for an avaliable worker.')
if self._lifecycle_manager.schedule(build_job, self._loop):
scheduled = yield From(self._lifecycle_manager.schedule(build_job))
if scheduled:
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase('build-scheduled')
self._job_count = self._job_count + 1
logger.debug('Build job scheduled. Running: %s', self._job_count)
else:
logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
@trollius.coroutine
def _initialize(self, loop, host, ssl=None):
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
self._loop = loop
# Create the WAMP server.
@ -184,8 +193,8 @@ class BuilderServer(object):
transport_factory.setProtocolOptions(failByDrop=True)
# Initialize the controller server and the WAMP server
create_wsgi_server(self._controller_app, loop=loop, host=host, port=CONTROLLER_PORT, ssl=ssl)
yield From(loop.create_server(transport_factory, host, WEBSOCKET_PORT, ssl=ssl))
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
# Initialize the work queue checker.
yield From(self._work_checker())