From f53dea46b7ba1f5753827dc98c2b8c40f7f65524 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 18 Feb 2015 14:13:36 -0500 Subject: [PATCH] buildman: address PR #11 comments --- buildman/enums.py | 12 +++++++++++ buildman/jobutil/buildreporter.py | 17 ++++++++------- buildman/server.py | 27 ++++++++++-------------- util/cloudwatch.py | 19 ++++++++--------- util/queuemetrics.py | 35 +++++++++++++------------------ 5 files changed, 55 insertions(+), 55 deletions(-) create mode 100644 buildman/enums.py diff --git a/buildman/enums.py b/buildman/enums.py new file mode 100644 index 000000000..3d38217fe --- /dev/null +++ b/buildman/enums.py @@ -0,0 +1,12 @@ +class BuildJobResult(object): + """ Build job result enum """ + INCOMPLETE = 'incomplete' + COMPLETE = 'complete' + ERROR = 'error' + + +class BuildServerStatus(object): + """ Build server status enum """ + STARTING = 'starting' + RUNNING = 'running' + SHUTDOWN = 'shutting_down' diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 0ff798eae..b295c33ea 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -1,5 +1,6 @@ from trollius import From +from buildman.enums import BuildJobResult from util.cloudwatch import get_queue @@ -28,6 +29,9 @@ class CloudWatchBuildReporter(BuildReporter): Implements a BuildReporter for Amazon's CloudWatch. """ def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): + if None in (queue, namespace_name, completed_name, failed_name, incompleted_name): + raise TypeError + self._queue = queue self._namespace_name = namespace_name self._completed_name = completed_name @@ -35,11 +39,11 @@ class CloudWatchBuildReporter(BuildReporter): self._incompleted_name = incompleted_name def report_completion_status(self, status): - if status == 'complete': + if status == BuildJobResult.COMPLETE: status_name = self._completed_name - elif status == 'error': + elif status == BuildJobResult.ERROR: status_name = self._failed_name - elif status == 'incomplete': + elif status == BuildJobResult.INCOMPLETE: status_name = self._incompleted_name else: return @@ -53,9 +57,8 @@ class BuildMetrics(object): """ def __init__(self, app=None): self._app = app - if app is None: - self._reporter = None - else: + self._reporter = NullReporter() + if app is not None: reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') if reporter_type == 'CloudWatch': namespace = app.config.get('BUILD_METRICS_NAMESPACE') @@ -65,8 +68,6 @@ class BuildMetrics(object): request_queue = get_queue(app) self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, failed_name, incompleted_name) - else: - self._reporter = NullReporter() def __getattr__(self, name): return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index 82eb39448..8f788d6cc 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -12,6 +12,7 @@ from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta +from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database @@ -26,12 +27,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 -class BuildJobResult(object): - """ Build job result enum """ - INCOMPLETE = 'incomplete' - COMPLETE = 'complete' - ERROR = 'error' - class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. @@ -39,7 +34,7 @@ class BuilderServer(object): def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None - self._current_status = 'starting' + self._current_status = BuildServerStatus.STARTING self._current_components = [] self._job_count = 0 @@ -59,7 +54,7 @@ class BuilderServer(object): self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() - self._current_status = 'running' + self._current_status = BuildServerStatus.RUNNING self._register_controller() @@ -83,13 +78,13 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) - tasks = [ + TASKS = [ Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), Task(self._queue_metrics_updater()), ] try: - loop.run_until_complete(trollius.wait(tasks)) + loop.run_until_complete(trollius.wait(TASKS)) except KeyboardInterrupt: pass finally: @@ -97,7 +92,7 @@ class BuilderServer(object): def close(self): logger.debug('Requested server shutdown') - self._current_status = 'shutting_down' + self._current_status = BuildServerStatus.SHUTDOWN self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') @@ -131,8 +126,6 @@ class BuilderServer(object): minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): - build_metrics.report_completion_status(job_status) - if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) else: @@ -140,12 +133,14 @@ class BuilderServer(object): self._job_count = self._job_count - 1 - if self._current_status == 'shutting_down' and not self._job_count: + if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() + build_metrics.report_completion_status(job_status) + @trollius.coroutine def _work_checker(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @@ -178,7 +173,7 @@ class BuilderServer(object): @trollius.coroutine def _queue_metrics_updater(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: yield From(trollius.sleep(30)) self._queue.update_metrics() diff --git a/util/cloudwatch.py b/util/cloudwatch.py index cf7aed392..eca563164 100644 --- a/util/cloudwatch.py +++ b/util/cloudwatch.py @@ -1,27 +1,26 @@ import logging import boto +import thread from Queue import Queue from threading import Thread logger = logging.getLogger(__name__) -queue = None def get_queue(app): """ Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. """ - global queue - if queue is None: - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') - queue = Queue() - sender = CloudWatchSender(queue, access_key, secret_key) - sender.start() - else: - return queue + access_key = app.config.get('CLOUDWATCH_AWS_ACCESS_KEY') + secret_key = app.config.get('CLOUDWATCH_AWS_SECRET_KEY') + if None in (access_key, secret_key): + raise TypeError + queue = Queue() + sender = CloudWatchSender(queue, access_key, secret_key) + sender.start() + return queue class CloudWatchSender(Thread): """ diff --git a/util/queuemetrics.py b/util/queuemetrics.py index d84ced129..0f7090801 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -14,6 +14,9 @@ class NullReporter(object): class CloudWatchReporter(object): """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): + if None in (request_queue, namespace, need_capacity_name, build_percent_name): + raise TypeError + self._namespace = namespace self._need_capacity_name = need_capacity_name self._build_percent_name = build_percent_name @@ -39,28 +42,18 @@ class QueueMetrics(object): QueueMetrics initializes a reporter for recording metrics of work queues. """ def __init__(self, app=None): - self.app = app - self.sender = None + self._app = app + self._reporter = NullReporter() if app is not None: - self.state = self.init_app(app) - else: - self.state = None + reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config.get('QUEUE_METRICS_NAMESPACE') + req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') + build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - def init_app(self, app): - analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - - if analytics_type == 'CloudWatch': - namespace = app.config.get('QUEUE_METRICS_NAMESPACE') - req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') - build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - - request_queue = get_queue(app) - reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - else: - reporter = NullReporter() - - return reporter + request_queue = get_queue(app) + self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) def __getattr__(self, name): - return getattr(self.state, name, None) + return getattr(self._reporter, name, None)