buildman: address PR #11 comments
This commit is contained in:
parent
5790d7d8cc
commit
f53dea46b7
5 changed files with 55 additions and 55 deletions
12
buildman/enums.py
Normal file
12
buildman/enums.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
class BuildJobResult(object):
|
||||||
|
""" Build job result enum """
|
||||||
|
INCOMPLETE = 'incomplete'
|
||||||
|
COMPLETE = 'complete'
|
||||||
|
ERROR = 'error'
|
||||||
|
|
||||||
|
|
||||||
|
class BuildServerStatus(object):
|
||||||
|
""" Build server status enum """
|
||||||
|
STARTING = 'starting'
|
||||||
|
RUNNING = 'running'
|
||||||
|
SHUTDOWN = 'shutting_down'
|
|
@ -1,5 +1,6 @@
|
||||||
from trollius import From
|
from trollius import From
|
||||||
|
|
||||||
|
from buildman.enums import BuildJobResult
|
||||||
from util.cloudwatch import get_queue
|
from util.cloudwatch import get_queue
|
||||||
|
|
||||||
|
|
||||||
|
@ -28,6 +29,9 @@ class CloudWatchBuildReporter(BuildReporter):
|
||||||
Implements a BuildReporter for Amazon's CloudWatch.
|
Implements a BuildReporter for Amazon's CloudWatch.
|
||||||
"""
|
"""
|
||||||
def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name):
|
def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name):
|
||||||
|
if None in (queue, namespace_name, completed_name, failed_name, incompleted_name):
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
self._queue = queue
|
self._queue = queue
|
||||||
self._namespace_name = namespace_name
|
self._namespace_name = namespace_name
|
||||||
self._completed_name = completed_name
|
self._completed_name = completed_name
|
||||||
|
@ -35,11 +39,11 @@ class CloudWatchBuildReporter(BuildReporter):
|
||||||
self._incompleted_name = incompleted_name
|
self._incompleted_name = incompleted_name
|
||||||
|
|
||||||
def report_completion_status(self, status):
|
def report_completion_status(self, status):
|
||||||
if status == 'complete':
|
if status == BuildJobResult.COMPLETE:
|
||||||
status_name = self._completed_name
|
status_name = self._completed_name
|
||||||
elif status == 'error':
|
elif status == BuildJobResult.ERROR:
|
||||||
status_name = self._failed_name
|
status_name = self._failed_name
|
||||||
elif status == 'incomplete':
|
elif status == BuildJobResult.INCOMPLETE:
|
||||||
status_name = self._incompleted_name
|
status_name = self._incompleted_name
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
@ -53,9 +57,8 @@ class BuildMetrics(object):
|
||||||
"""
|
"""
|
||||||
def __init__(self, app=None):
|
def __init__(self, app=None):
|
||||||
self._app = app
|
self._app = app
|
||||||
if app is None:
|
self._reporter = NullReporter()
|
||||||
self._reporter = None
|
if app is not None:
|
||||||
else:
|
|
||||||
reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null')
|
reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null')
|
||||||
if reporter_type == 'CloudWatch':
|
if reporter_type == 'CloudWatch':
|
||||||
namespace = app.config.get('BUILD_METRICS_NAMESPACE')
|
namespace = app.config.get('BUILD_METRICS_NAMESPACE')
|
||||||
|
@ -65,8 +68,6 @@ class BuildMetrics(object):
|
||||||
request_queue = get_queue(app)
|
request_queue = get_queue(app)
|
||||||
self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name,
|
self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name,
|
||||||
failed_name, incompleted_name)
|
failed_name, incompleted_name)
|
||||||
else:
|
|
||||||
self._reporter = NullReporter()
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
return getattr(self._reporter, name, None)
|
return getattr(self._reporter, name, None)
|
||||||
|
|
|
@ -12,6 +12,7 @@ from trollius.tasks import Task
|
||||||
from trollius.coroutines import From
|
from trollius.coroutines import From
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from buildman.enums import BuildJobResult, BuildServerStatus
|
||||||
from buildman.jobutil.buildstatus import StatusHandler
|
from buildman.jobutil.buildstatus import StatusHandler
|
||||||
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
||||||
from data import database
|
from data import database
|
||||||
|
@ -26,12 +27,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
||||||
|
|
||||||
HEARTBEAT_PERIOD_SEC = 30
|
HEARTBEAT_PERIOD_SEC = 30
|
||||||
|
|
||||||
class BuildJobResult(object):
|
|
||||||
""" Build job result enum """
|
|
||||||
INCOMPLETE = 'incomplete'
|
|
||||||
COMPLETE = 'complete'
|
|
||||||
ERROR = 'error'
|
|
||||||
|
|
||||||
class BuilderServer(object):
|
class BuilderServer(object):
|
||||||
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
||||||
controller.
|
controller.
|
||||||
|
@ -39,7 +34,7 @@ class BuilderServer(object):
|
||||||
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
||||||
lifecycle_manager_config, manager_hostname):
|
lifecycle_manager_config, manager_hostname):
|
||||||
self._loop = None
|
self._loop = None
|
||||||
self._current_status = 'starting'
|
self._current_status = BuildServerStatus.STARTING
|
||||||
self._current_components = []
|
self._current_components = []
|
||||||
self._job_count = 0
|
self._job_count = 0
|
||||||
|
|
||||||
|
@ -59,7 +54,7 @@ class BuilderServer(object):
|
||||||
self._lifecycle_manager_config = lifecycle_manager_config
|
self._lifecycle_manager_config = lifecycle_manager_config
|
||||||
|
|
||||||
self._shutdown_event = Event()
|
self._shutdown_event = Event()
|
||||||
self._current_status = 'running'
|
self._current_status = BuildServerStatus.RUNNING
|
||||||
|
|
||||||
self._register_controller()
|
self._register_controller()
|
||||||
|
|
||||||
|
@ -83,13 +78,13 @@ class BuilderServer(object):
|
||||||
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
||||||
controller_port)
|
controller_port)
|
||||||
|
|
||||||
tasks = [
|
TASKS = [
|
||||||
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
|
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
|
||||||
Task(self._queue_metrics_updater()),
|
Task(self._queue_metrics_updater()),
|
||||||
]
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(trollius.wait(tasks))
|
loop.run_until_complete(trollius.wait(TASKS))
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
@ -97,7 +92,7 @@ class BuilderServer(object):
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
logger.debug('Requested server shutdown')
|
logger.debug('Requested server shutdown')
|
||||||
self._current_status = 'shutting_down'
|
self._current_status = BuildServerStatus.SHUTDOWN
|
||||||
self._lifecycle_manager.shutdown()
|
self._lifecycle_manager.shutdown()
|
||||||
self._shutdown_event.wait()
|
self._shutdown_event.wait()
|
||||||
logger.debug('Shutting down server')
|
logger.debug('Shutting down server')
|
||||||
|
@ -131,8 +126,6 @@ class BuilderServer(object):
|
||||||
minimum_extension=MINIMUM_JOB_EXTENSION)
|
minimum_extension=MINIMUM_JOB_EXTENSION)
|
||||||
|
|
||||||
def _job_complete(self, build_job, job_status):
|
def _job_complete(self, build_job, job_status):
|
||||||
build_metrics.report_completion_status(job_status)
|
|
||||||
|
|
||||||
if job_status == BuildJobResult.INCOMPLETE:
|
if job_status == BuildJobResult.INCOMPLETE:
|
||||||
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
|
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
|
||||||
else:
|
else:
|
||||||
|
@ -140,12 +133,14 @@ class BuilderServer(object):
|
||||||
|
|
||||||
self._job_count = self._job_count - 1
|
self._job_count = self._job_count - 1
|
||||||
|
|
||||||
if self._current_status == 'shutting_down' and not self._job_count:
|
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
|
build_metrics.report_completion_status(job_status)
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _work_checker(self):
|
def _work_checker(self):
|
||||||
while self._current_status == 'running':
|
while self._current_status == BuildServerStatus.RUNNING:
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||||
|
|
||||||
|
@ -178,7 +173,7 @@ class BuilderServer(object):
|
||||||
|
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _queue_metrics_updater(self):
|
def _queue_metrics_updater(self):
|
||||||
while self._current_status == 'running':
|
while self._current_status == BuildServerStatus.RUNNING:
|
||||||
yield From(trollius.sleep(30))
|
yield From(trollius.sleep(30))
|
||||||
self._queue.update_metrics()
|
self._queue.update_metrics()
|
||||||
|
|
||||||
|
|
|
@ -1,27 +1,26 @@
|
||||||
import logging
|
import logging
|
||||||
import boto
|
import boto
|
||||||
|
import thread
|
||||||
|
|
||||||
from Queue import Queue
|
from Queue import Queue
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
queue = None
|
|
||||||
|
|
||||||
def get_queue(app):
|
def get_queue(app):
|
||||||
"""
|
"""
|
||||||
Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them.
|
Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them.
|
||||||
"""
|
"""
|
||||||
global queue
|
access_key = app.config.get('CLOUDWATCH_AWS_ACCESS_KEY')
|
||||||
if queue is None:
|
secret_key = app.config.get('CLOUDWATCH_AWS_SECRET_KEY')
|
||||||
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
|
if None in (access_key, secret_key):
|
||||||
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
|
raise TypeError
|
||||||
queue = Queue()
|
|
||||||
sender = CloudWatchSender(queue, access_key, secret_key)
|
|
||||||
sender.start()
|
|
||||||
else:
|
|
||||||
return queue
|
|
||||||
|
|
||||||
|
queue = Queue()
|
||||||
|
sender = CloudWatchSender(queue, access_key, secret_key)
|
||||||
|
sender.start()
|
||||||
|
return queue
|
||||||
|
|
||||||
class CloudWatchSender(Thread):
|
class CloudWatchSender(Thread):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -14,6 +14,9 @@ class NullReporter(object):
|
||||||
class CloudWatchReporter(object):
|
class CloudWatchReporter(object):
|
||||||
""" CloudWatchReporter reports work queue metrics to CloudWatch """
|
""" CloudWatchReporter reports work queue metrics to CloudWatch """
|
||||||
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
|
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
|
||||||
|
if None in (request_queue, namespace, need_capacity_name, build_percent_name):
|
||||||
|
raise TypeError
|
||||||
|
|
||||||
self._namespace = namespace
|
self._namespace = namespace
|
||||||
self._need_capacity_name = need_capacity_name
|
self._need_capacity_name = need_capacity_name
|
||||||
self._build_percent_name = build_percent_name
|
self._build_percent_name = build_percent_name
|
||||||
|
@ -39,28 +42,18 @@ class QueueMetrics(object):
|
||||||
QueueMetrics initializes a reporter for recording metrics of work queues.
|
QueueMetrics initializes a reporter for recording metrics of work queues.
|
||||||
"""
|
"""
|
||||||
def __init__(self, app=None):
|
def __init__(self, app=None):
|
||||||
self.app = app
|
self._app = app
|
||||||
self.sender = None
|
self._reporter = NullReporter()
|
||||||
if app is not None:
|
if app is not None:
|
||||||
self.state = self.init_app(app)
|
reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
||||||
else:
|
if reporter_type == 'CloudWatch':
|
||||||
self.state = None
|
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
|
||||||
|
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
|
||||||
|
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
|
||||||
|
|
||||||
def init_app(self, app):
|
request_queue = get_queue(app)
|
||||||
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
|
||||||
|
build_percent_name)
|
||||||
if analytics_type == 'CloudWatch':
|
|
||||||
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
|
|
||||||
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
|
|
||||||
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
|
|
||||||
|
|
||||||
request_queue = get_queue(app)
|
|
||||||
reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
|
|
||||||
build_percent_name)
|
|
||||||
else:
|
|
||||||
reporter = NullReporter()
|
|
||||||
|
|
||||||
return reporter
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
return getattr(self.state, name, None)
|
return getattr(self._reporter, name, None)
|
||||||
|
|
Reference in a new issue