buildman: update metrics task
This commit is contained in:
parent
f796c281d5
commit
4a4118d0e0
1 changed files with 20 additions and 3 deletions
|
@ -8,14 +8,15 @@ from autobahn.wamp import types
|
||||||
from aiowsgi import create_server as create_wsgi_server
|
from aiowsgi import create_server as create_wsgi_server
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
from threading import Event
|
from threading import Event
|
||||||
|
from trollius.tasks import Task
|
||||||
from trollius.coroutines import From
|
from trollius.coroutines import From
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
||||||
from data import database
|
from data import database
|
||||||
from data.queue import WorkQueue
|
|
||||||
from app import app
|
from app import app
|
||||||
|
|
||||||
|
# pylint: disable=invalid-name
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
WORK_CHECK_TIMEOUT = 10
|
WORK_CHECK_TIMEOUT = 10
|
||||||
|
@ -25,16 +26,19 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
||||||
|
|
||||||
HEARTBEAT_PERIOD_SEC = 30
|
HEARTBEAT_PERIOD_SEC = 30
|
||||||
|
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
class BuildJobResult(object):
|
class BuildJobResult(object):
|
||||||
""" Build job result enum """
|
""" Build job result enum """
|
||||||
INCOMPLETE = 'incomplete'
|
INCOMPLETE = 'incomplete'
|
||||||
COMPLETE = 'complete'
|
COMPLETE = 'complete'
|
||||||
ERROR = 'error'
|
ERROR = 'error'
|
||||||
|
|
||||||
|
# pylint: disable=too-many-instance-attributes
|
||||||
class BuilderServer(object):
|
class BuilderServer(object):
|
||||||
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
||||||
controller.
|
controller.
|
||||||
"""
|
"""
|
||||||
|
# pylint: disable=too-many-arguments
|
||||||
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
||||||
lifecycle_manager_config, manager_hostname):
|
lifecycle_manager_config, manager_hostname):
|
||||||
self._loop = None
|
self._loop = None
|
||||||
|
@ -81,8 +85,14 @@ class BuilderServer(object):
|
||||||
|
|
||||||
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
||||||
controller_port)
|
controller_port)
|
||||||
|
|
||||||
|
tasks = [
|
||||||
|
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
|
||||||
|
Task(self._metrics_updater()),
|
||||||
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
|
loop.run_until_complete(trollius.wait(tasks))
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
@ -164,7 +174,11 @@ class BuilderServer(object):
|
||||||
logger.debug('All workers are busy. Requeuing.')
|
logger.debug('All workers are busy. Requeuing.')
|
||||||
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
|
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
|
||||||
|
|
||||||
|
@trollius.coroutine
|
||||||
|
def _metrics_updater(self):
|
||||||
|
while self._current_status == 'running':
|
||||||
|
yield From(trollius.sleep(30))
|
||||||
|
self._queue.update_metrics()
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
||||||
|
@ -178,5 +192,8 @@ class BuilderServer(object):
|
||||||
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
|
create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl)
|
||||||
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
|
yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl))
|
||||||
|
|
||||||
|
# Initialize the coroutine reporting metrics
|
||||||
|
loop.create_task(self._metrics_updater())
|
||||||
|
|
||||||
# Initialize the work queue checker.
|
# Initialize the work queue checker.
|
||||||
yield From(self._work_checker())
|
yield From(self._work_checker())
|
||||||
|
|
Reference in a new issue