From 4a4118d0e0d2939a4ca864a5f60c32e8104fdcde Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 13 Feb 2015 11:21:34 -0500 Subject: [PATCH 01/20] buildman: update metrics task --- buildman/server.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index 7b10995b4..ea689da3f 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -8,14 +8,15 @@ from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event +from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from data.queue import WorkQueue from app import app +# pylint: disable=invalid-name logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 @@ -25,16 +26,19 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 +# pylint: disable=too-few-public-methods class BuildJobResult(object): """ Build job result enum """ INCOMPLETE = 'incomplete' COMPLETE = 'complete' ERROR = 'error' +# pylint: disable=too-many-instance-attributes class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ + # pylint: disable=too-many-arguments def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None @@ -81,8 +85,14 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) + + tasks = [ + Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), + Task(self._metrics_updater()), + ] + try: - loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl)) + loop.run_until_complete(trollius.wait(tasks)) except KeyboardInterrupt: pass finally: @@ -164,7 +174,11 @@ class BuilderServer(object): logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) - + @trollius.coroutine + def _metrics_updater(self): + while self._current_status == 'running': + yield From(trollius.sleep(30)) + self._queue.update_metrics() @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): @@ -178,5 +192,8 @@ class BuilderServer(object): create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl) yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl)) + # Initialize the coroutine reporting metrics + loop.create_task(self._metrics_updater()) + # Initialize the work queue checker. yield From(self._work_checker()) From 6a3d2695740720f5b12be649b89260c5296cb591 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 13 Feb 2015 11:21:34 -0500 Subject: [PATCH 02/20] buildman: update metrics task --- buildman/server.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index ebbc558bb..21ce7cd89 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -8,15 +8,16 @@ from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event +from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from data.queue import WorkQueue from app import app +# pylint: disable=invalid-name logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 @@ -26,16 +27,19 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 +# pylint: disable=too-few-public-methods class BuildJobResult(object): """ Build job result enum """ INCOMPLETE = 'incomplete' COMPLETE = 'complete' ERROR = 'error' +# pylint: disable=too-many-instance-attributes class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ + # pylint: disable=too-many-arguments def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None @@ -82,8 +86,14 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) + + tasks = [ + Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), + Task(self._metrics_updater()), + ] + try: - loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl)) + loop.run_until_complete(trollius.wait(tasks)) except KeyboardInterrupt: pass finally: @@ -168,7 +178,11 @@ class BuilderServer(object): logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) - + @trollius.coroutine + def _metrics_updater(self): + while self._current_status == 'running': + yield From(trollius.sleep(30)) + self._queue.update_metrics() @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): @@ -182,5 +196,8 @@ class BuilderServer(object): create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl) yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl)) + # Initialize the coroutine reporting metrics + loop.create_task(self._metrics_updater()) + # Initialize the work queue checker. yield From(self._work_checker()) From 6a1dd376c206ca530b48cfc476fb1a82e7cc732c Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sat, 14 Feb 2015 16:30:10 -0500 Subject: [PATCH 03/20] util: add cloudwatch package This isolates the CloudWatch sending thread to it's own package where it can be shared among any other packages that want to asynchronously send metrics to CloudWatch. --- util/cloudwatch.py | 51 +++++++++++++++++++++++++++++++++++++++ util/queuemetrics.py | 57 +++++++------------------------------------- 2 files changed, 60 insertions(+), 48 deletions(-) create mode 100644 util/cloudwatch.py diff --git a/util/cloudwatch.py b/util/cloudwatch.py new file mode 100644 index 000000000..344433806 --- /dev/null +++ b/util/cloudwatch.py @@ -0,0 +1,51 @@ +import logging +import boto + +from Queue import Queue +from threading import Thread + + +logger = logging.getLogger(__name__) +queue = None + +def get_queue(app): + """ + Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. + """ + if queue is None: + global queue + access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') + secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') + queue = Queue() + sender = CloudWatchSender(queue, access_key, secret_key) + sender.start() + else: + return queue + + +class CloudWatchSender(Thread): + """ + CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. + """ + def __init__(self, request_queue, aws_access_key, aws_secret_key): + Thread.__init__(self) + self.daemon = True + + self._aws_access_key = aws_access_key + self._aws_secret_key = aws_secret_key + self._put_metrics_queue = request_queue + + def run(self): + try: + logger.debug('Starting CloudWatch sender process.') + connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) + except: + logger.exception('Failed to connect to CloudWatch.') + + while True: + put_metric_args, kwargs = self._put_metrics_queue.get() + logger.debug('Got queued put metrics request.') + try: + connection.put_metric_data(*put_metric_args, **kwargs) + except: + logger.exception('Failed to write to CloudWatch') diff --git a/util/queuemetrics.py b/util/queuemetrics.py index 1bebfa3a6..d84ced129 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -1,8 +1,6 @@ import logging -import boto -from Queue import Queue -from threading import Thread +from util.cloudwatch import get_queue logger = logging.getLogger(__name__) @@ -13,8 +11,8 @@ class NullReporter(object): pass -class QueueingCloudWatchReporter(object): - """ QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """ +class CloudWatchReporter(object): + """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace self._need_capacity_name = need_capacity_name @@ -36,35 +34,10 @@ class QueueingCloudWatchReporter(object): self._send_to_queue(self._namespace, self._build_percent_name, building_percent, unit='Percent') - -class SendToCloudWatch(Thread): - """ SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to - CloudWatch. """ - def __init__(self, request_queue, aws_access_key, aws_secret_key): - Thread.__init__(self) - self.daemon = True - - self._aws_access_key = aws_access_key - self._aws_secret_key = aws_secret_key - self._put_metrics_queue = request_queue - - def run(self): - try: - logger.debug('Starting CloudWatch sender process.') - connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) - except: - logger.exception('Failed to connect to CloudWatch.') - - while True: - put_metric_args, kwargs = self._put_metrics_queue.get() - logger.debug('Got queued put metrics request.') - try: - connection.put_metric_data(*put_metric_args, **kwargs) - except: - logger.exception('Failed to write to CloudWatch') - - class QueueMetrics(object): + """ + QueueMetrics initializes a reporter for recording metrics of work queues. + """ def __init__(self, app=None): self.app = app self.sender = None @@ -77,29 +50,17 @@ class QueueMetrics(object): analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') if analytics_type == 'CloudWatch': - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') namespace = app.config.get('QUEUE_METRICS_NAMESPACE') req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - request_queue = Queue() - reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - self.sender = SendToCloudWatch(request_queue, access_key, secret_key) + request_queue = get_queue(app) + reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) else: reporter = NullReporter() - # register extension with app - app.extensions = getattr(app, 'extensions', {}) - app.extensions['queuemetrics'] = reporter return reporter - def run(self): - logger.debug('Asked to start CloudWatch reporter') - if self.sender is not None: - logger.debug('Starting CloudWatch reporter') - self.sender.start() - def __getattr__(self, name): return getattr(self.state, name, None) From 0e7418ffce7a55011c700498c6d496df2c6cb858 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 10:56:09 -0500 Subject: [PATCH 04/20] buildman: add BuildMetrics and BuildReporter --- app.py | 2 + buildman/jobutil/buildreporter.py | 75 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 buildman/jobutil/buildreporter.py diff --git a/app.py b/app.py index c3b15d7aa..78243de75 100644 --- a/app.py +++ b/app.py @@ -32,6 +32,7 @@ from util.queuemetrics import QueueMetrics from util.config.provider import FileConfigProvider, TestConfigProvider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager +from buildman.jobutil.buildreporter import BuildMetrics OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -118,6 +119,7 @@ userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) queue_metrics = QueueMetrics(app) +build_metrics = BuildMetrics(app) tf = app.config['DB_TRANSACTION_FACTORY'] diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py new file mode 100644 index 000000000..0016778dc --- /dev/null +++ b/buildman/jobutil/buildreporter.py @@ -0,0 +1,75 @@ +import logging +from trollius import From + +from util.cloudwatch import get_queue + + +# pylint: disable=invalid-name +logger = logging.getLogger(__name__) + + +# pylint: disable=too-few-public-methods +class BuildReporter(object): + """ + Base class for reporting build statuses to a metrics service. + """ + def report_completion_status(self, status): + """ + Method to invoke the recording of build's completion status to a metric service. + This must _not_ block because it is assumed to run in an event loop. + """ + raise NotImplementedError + + +class NullReporter(BuildReporter): + """ + The /dev/null of BuildReporters. + """ + def report_completion_status(self, *args): + pass + + +# pylint: disable=too-many-instance-attributes +class CloudWatchBuildReporter(BuildReporter): + """ + Implements a BuildReporter for Amazon's CloudWatch. + """ + # pylint: disable=too-many-arguments + def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): + self._queue = queue + self._namespace_name = namespace_name + self._completed_name = completed_name + self._failed_name = failed_name + self._incompleted_name = incompleted_name + + def report_completion_status(self, status): + if status == 'complete': + status_name = self._completed_name + elif status == 'error': + status_name = self._failed_name + elif status == 'incomplete': + status_name = self._incompleted_name + else: + return + + yield From(self._queue.put(self._namespace_name, status_name, 1, unit='Count')) + + +class BuildMetrics(object): + """ + BuildMetrics initializes a reporter for recording the status of build completions. + """ + def __init__(self, app=None): + self._app = app + if app is not None: + reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config.get('BUILD_METRICS_NAMESPACE') + completed_name = app.config.get('BUILD_METRICS_COMPLETED_NAME') + failed_name = app.config.get('BUILD_METRICS_FAILED_NAME') + incompleted_name = app.config.get('BUILD_METRICS_INCOMPLETED_NAME') + request_queue = get_queue(app) + self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, + failed_name, incompleted_name) + else: + self._reporter = NullReporter() From 0a00453024f8e7fcbed0f8f83ef518830944a275 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 12:20:46 -0500 Subject: [PATCH 05/20] buildreporter: rm pylint comments --- buildman/jobutil/buildreporter.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 0016778dc..f8ccf1fd5 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -4,11 +4,9 @@ from trollius import From from util.cloudwatch import get_queue -# pylint: disable=invalid-name logger = logging.getLogger(__name__) -# pylint: disable=too-few-public-methods class BuildReporter(object): """ Base class for reporting build statuses to a metrics service. @@ -29,12 +27,10 @@ class NullReporter(BuildReporter): pass -# pylint: disable=too-many-instance-attributes class CloudWatchBuildReporter(BuildReporter): """ Implements a BuildReporter for Amazon's CloudWatch. """ - # pylint: disable=too-many-arguments def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): self._queue = queue self._namespace_name = namespace_name From ca0d2b17214776576b1ea5f13286a538768b0bb4 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 12:21:22 -0500 Subject: [PATCH 06/20] buildreporter: getattr method --- buildman/jobutil/buildreporter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index f8ccf1fd5..08e16576b 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -69,3 +69,6 @@ class BuildMetrics(object): failed_name, incompleted_name) else: self._reporter = NullReporter() + + def __getattr__(self, name): + return getattr(self._reporter, name, None) From ffb897dfe6c81f26854be7efcb9453c1064df7c3 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 12:22:23 -0500 Subject: [PATCH 07/20] buildman: add job status logging to managers --- buildman/manager/enterprise.py | 2 ++ buildman/manager/ephemeral.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index c56830a1c..f144fd504 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -1,6 +1,7 @@ import logging import uuid +from app import build_metrics from buildman.component.basecomponent import BaseComponent from buildman.component.buildcomponent import BuildComponent from buildman.manager.basemanager import BaseManager @@ -75,6 +76,7 @@ class EnterpriseManager(BaseManager): @coroutine def job_completed(self, build_job, job_status, build_component): + build_metrics.report(job_status) self.job_complete_callback(build_job, job_status) def build_component_disposed(self, build_component, timed_out): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index cfb52f8ad..123594190 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -10,6 +10,7 @@ from trollius import From, coroutine, Return, async from concurrent.futures import ThreadPoolExecutor from urllib3.exceptions import ReadTimeoutError, ProtocolError +from app import build_metrics from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent @@ -286,6 +287,8 @@ class EphemeralBuilderManager(BaseManager): job_key = self._etcd_job_key(build_job) yield From(self._etcd_client.delete(job_key)) + build_metrics.report(job_status) + self.job_complete_callback(build_job, job_status) @coroutine From 935db5c766f6ab10b08fdf5be3af6125a978a618 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 12:23:08 -0500 Subject: [PATCH 08/20] buildman: clarify queue metrics from job state metrics --- buildman/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index 21ce7cd89..e40452e3d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -89,7 +89,7 @@ class BuilderServer(object): tasks = [ Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), - Task(self._metrics_updater()), + Task(self._queue_metrics_updater()), ] try: @@ -179,7 +179,7 @@ class BuilderServer(object): self._queue.incomplete(job_item, restore_retry=True, retry_after=0) @trollius.coroutine - def _metrics_updater(self): + def _queue_metrics_updater(self): while self._current_status == 'running': yield From(trollius.sleep(30)) self._queue.update_metrics() From ef8d320c95ed93cbc3e0e8b2a311f8375e6d74a9 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 13:13:12 -0500 Subject: [PATCH 09/20] cloudwatch: global before reading queue Technically, it isn't necessary to use the global keyword before reading a global value, only modifying it. However, in this case it leaves a pretty annoying log line. --- util/cloudwatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/cloudwatch.py b/util/cloudwatch.py index 344433806..cf7aed392 100644 --- a/util/cloudwatch.py +++ b/util/cloudwatch.py @@ -12,8 +12,8 @@ def get_queue(app): """ Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. """ + global queue if queue is None: - global queue access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') queue = Queue() From 8df35e720e20d04c0c79231a96deea56e2eaef9d Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 13:16:03 -0500 Subject: [PATCH 10/20] web: remove manually starting of queuemetrics --- web.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/web.py b/web.py index 92b3d6758..7c945cc45 100644 --- a/web.py +++ b/web.py @@ -1,7 +1,7 @@ import logging import logging.config -from app import app as application, queue_metrics +from app import app as application from endpoints.api import api_bp from endpoints.web import web @@ -9,9 +9,6 @@ from endpoints.webhooks import webhooks from endpoints.realtime import realtime from endpoints.callbacks import callback -# Start the cloudwatch reporting. -queue_metrics.run() - application.register_blueprint(web) application.register_blueprint(callback, url_prefix='/oauth2') application.register_blueprint(api_bp, url_prefix='/api') From b8d9ef0fe9695889349b1ead1d80ecdbe7bb41de Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 14:18:32 -0500 Subject: [PATCH 11/20] buildman: remove old create_task for queue metrics --- buildman/server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index e40452e3d..f077f7e9f 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -196,8 +196,5 @@ class BuilderServer(object): create_wsgi_server(self._controller_app, loop=loop, host=host, port=controller_port, ssl=ssl) yield From(loop.create_server(transport_factory, host, websocket_port, ssl=ssl)) - # Initialize the coroutine reporting metrics - loop.create_task(self._metrics_updater()) - # Initialize the work queue checker. yield From(self._work_checker()) From 25fc999d508e9c91be2ac6d16fc8472569bfa5b7 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 15:30:09 -0500 Subject: [PATCH 12/20] buildreporter: handle app=None --- buildman/jobutil/buildreporter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 08e16576b..09a74a565 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -57,7 +57,9 @@ class BuildMetrics(object): """ def __init__(self, app=None): self._app = app - if app is not None: + if app is None: + self._reporter = None + else: reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') if reporter_type == 'CloudWatch': namespace = app.config.get('BUILD_METRICS_NAMESPACE') From d70c95e42e89fc652b56cb262f8a7fc177ed64d3 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 15:31:53 -0500 Subject: [PATCH 13/20] buildreporter: move reporting into server callback --- buildman/manager/enterprise.py | 2 -- buildman/manager/ephemeral.py | 3 --- buildman/server.py | 4 +++- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/buildman/manager/enterprise.py b/buildman/manager/enterprise.py index f144fd504..c56830a1c 100644 --- a/buildman/manager/enterprise.py +++ b/buildman/manager/enterprise.py @@ -1,7 +1,6 @@ import logging import uuid -from app import build_metrics from buildman.component.basecomponent import BaseComponent from buildman.component.buildcomponent import BuildComponent from buildman.manager.basemanager import BaseManager @@ -76,7 +75,6 @@ class EnterpriseManager(BaseManager): @coroutine def job_completed(self, build_job, job_status, build_component): - build_metrics.report(job_status) self.job_complete_callback(build_job, job_status) def build_component_disposed(self, build_component, timed_out): diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 123594190..cfb52f8ad 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -10,7 +10,6 @@ from trollius import From, coroutine, Return, async from concurrent.futures import ThreadPoolExecutor from urllib3.exceptions import ReadTimeoutError, ProtocolError -from app import build_metrics from buildman.manager.basemanager import BaseManager from buildman.manager.executor import PopenExecutor, EC2Executor from buildman.component.buildcomponent import BuildComponent @@ -287,8 +286,6 @@ class EphemeralBuilderManager(BaseManager): job_key = self._etcd_job_key(build_job) yield From(self._etcd_client.delete(job_key)) - build_metrics.report(job_status) - self.job_complete_callback(build_job, job_status) @coroutine diff --git a/buildman/server.py b/buildman/server.py index f077f7e9f..9aad39b5c 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -15,7 +15,7 @@ from datetime import timedelta from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from app import app +from app import app, build_metrics # pylint: disable=invalid-name logger = logging.getLogger(__name__) @@ -135,6 +135,8 @@ class BuilderServer(object): minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): + build_metrics.report(job_status) + if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) else: From 85edb651e2fe425d9d823ad27ae1b2e3099df8f0 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 15:32:25 -0500 Subject: [PATCH 14/20] buildserver: remove pylint comments --- buildman/server.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index 9aad39b5c..900746c6e 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -17,7 +17,6 @@ from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, build_metrics -# pylint: disable=invalid-name logger = logging.getLogger(__name__) WORK_CHECK_TIMEOUT = 10 @@ -27,19 +26,16 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 -# pylint: disable=too-few-public-methods class BuildJobResult(object): """ Build job result enum """ INCOMPLETE = 'incomplete' COMPLETE = 'complete' ERROR = 'error' -# pylint: disable=too-many-instance-attributes class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. """ - # pylint: disable=too-many-arguments def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None From 0e69222af163ed6c1eb48c2afefffd443dbe96ba Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 15:32:54 -0500 Subject: [PATCH 15/20] test: add TROLLIUSDEBUG=1 to test script --- local-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/local-test.sh b/local-test.sh index a54491969..2a85148bf 100755 --- a/local-test.sh +++ b/local-test.sh @@ -1 +1 @@ -TEST=true python -m unittest discover +TEST=true TROLLIUSDEBUG=1 python -m unittest discover From 1a71925125e076e7762494ffbc8f1a392f89b76c Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 17:02:37 -0500 Subject: [PATCH 16/20] buildreporter: remove unused logging --- buildman/jobutil/buildreporter.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 09a74a565..0ff798eae 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -1,12 +1,8 @@ -import logging from trollius import From from util.cloudwatch import get_queue -logger = logging.getLogger(__name__) - - class BuildReporter(object): """ Base class for reporting build statuses to a metrics service. From 5790d7d8cce8c6da4c71e17257b28e35e325fc61 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 17 Feb 2015 17:03:12 -0500 Subject: [PATCH 17/20] buildman: build_metrics call correct method --- buildman/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildman/server.py b/buildman/server.py index 900746c6e..82eb39448 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -131,7 +131,7 @@ class BuilderServer(object): minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): - build_metrics.report(job_status) + build_metrics.report_completion_status(job_status) if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) From f53dea46b7ba1f5753827dc98c2b8c40f7f65524 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 18 Feb 2015 14:13:36 -0500 Subject: [PATCH 18/20] buildman: address PR #11 comments --- buildman/enums.py | 12 +++++++++++ buildman/jobutil/buildreporter.py | 17 ++++++++------- buildman/server.py | 27 ++++++++++-------------- util/cloudwatch.py | 19 ++++++++--------- util/queuemetrics.py | 35 +++++++++++++------------------ 5 files changed, 55 insertions(+), 55 deletions(-) create mode 100644 buildman/enums.py diff --git a/buildman/enums.py b/buildman/enums.py new file mode 100644 index 000000000..3d38217fe --- /dev/null +++ b/buildman/enums.py @@ -0,0 +1,12 @@ +class BuildJobResult(object): + """ Build job result enum """ + INCOMPLETE = 'incomplete' + COMPLETE = 'complete' + ERROR = 'error' + + +class BuildServerStatus(object): + """ Build server status enum """ + STARTING = 'starting' + RUNNING = 'running' + SHUTDOWN = 'shutting_down' diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 0ff798eae..b295c33ea 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -1,5 +1,6 @@ from trollius import From +from buildman.enums import BuildJobResult from util.cloudwatch import get_queue @@ -28,6 +29,9 @@ class CloudWatchBuildReporter(BuildReporter): Implements a BuildReporter for Amazon's CloudWatch. """ def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): + if None in (queue, namespace_name, completed_name, failed_name, incompleted_name): + raise TypeError + self._queue = queue self._namespace_name = namespace_name self._completed_name = completed_name @@ -35,11 +39,11 @@ class CloudWatchBuildReporter(BuildReporter): self._incompleted_name = incompleted_name def report_completion_status(self, status): - if status == 'complete': + if status == BuildJobResult.COMPLETE: status_name = self._completed_name - elif status == 'error': + elif status == BuildJobResult.ERROR: status_name = self._failed_name - elif status == 'incomplete': + elif status == BuildJobResult.INCOMPLETE: status_name = self._incompleted_name else: return @@ -53,9 +57,8 @@ class BuildMetrics(object): """ def __init__(self, app=None): self._app = app - if app is None: - self._reporter = None - else: + self._reporter = NullReporter() + if app is not None: reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') if reporter_type == 'CloudWatch': namespace = app.config.get('BUILD_METRICS_NAMESPACE') @@ -65,8 +68,6 @@ class BuildMetrics(object): request_queue = get_queue(app) self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, failed_name, incompleted_name) - else: - self._reporter = NullReporter() def __getattr__(self, name): return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index 82eb39448..8f788d6cc 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -12,6 +12,7 @@ from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta +from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database @@ -26,12 +27,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 -class BuildJobResult(object): - """ Build job result enum """ - INCOMPLETE = 'incomplete' - COMPLETE = 'complete' - ERROR = 'error' - class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. @@ -39,7 +34,7 @@ class BuilderServer(object): def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None - self._current_status = 'starting' + self._current_status = BuildServerStatus.STARTING self._current_components = [] self._job_count = 0 @@ -59,7 +54,7 @@ class BuilderServer(object): self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() - self._current_status = 'running' + self._current_status = BuildServerStatus.RUNNING self._register_controller() @@ -83,13 +78,13 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) - tasks = [ + TASKS = [ Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), Task(self._queue_metrics_updater()), ] try: - loop.run_until_complete(trollius.wait(tasks)) + loop.run_until_complete(trollius.wait(TASKS)) except KeyboardInterrupt: pass finally: @@ -97,7 +92,7 @@ class BuilderServer(object): def close(self): logger.debug('Requested server shutdown') - self._current_status = 'shutting_down' + self._current_status = BuildServerStatus.SHUTDOWN self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') @@ -131,8 +126,6 @@ class BuilderServer(object): minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): - build_metrics.report_completion_status(job_status) - if job_status == BuildJobResult.INCOMPLETE: self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30) else: @@ -140,12 +133,14 @@ class BuilderServer(object): self._job_count = self._job_count - 1 - if self._current_status == 'shutting_down' and not self._job_count: + if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() + build_metrics.report_completion_status(job_status) + @trollius.coroutine def _work_checker(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @@ -178,7 +173,7 @@ class BuilderServer(object): @trollius.coroutine def _queue_metrics_updater(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: yield From(trollius.sleep(30)) self._queue.update_metrics() diff --git a/util/cloudwatch.py b/util/cloudwatch.py index cf7aed392..eca563164 100644 --- a/util/cloudwatch.py +++ b/util/cloudwatch.py @@ -1,27 +1,26 @@ import logging import boto +import thread from Queue import Queue from threading import Thread logger = logging.getLogger(__name__) -queue = None def get_queue(app): """ Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. """ - global queue - if queue is None: - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') - queue = Queue() - sender = CloudWatchSender(queue, access_key, secret_key) - sender.start() - else: - return queue + access_key = app.config.get('CLOUDWATCH_AWS_ACCESS_KEY') + secret_key = app.config.get('CLOUDWATCH_AWS_SECRET_KEY') + if None in (access_key, secret_key): + raise TypeError + queue = Queue() + sender = CloudWatchSender(queue, access_key, secret_key) + sender.start() + return queue class CloudWatchSender(Thread): """ diff --git a/util/queuemetrics.py b/util/queuemetrics.py index d84ced129..0f7090801 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -14,6 +14,9 @@ class NullReporter(object): class CloudWatchReporter(object): """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): + if None in (request_queue, namespace, need_capacity_name, build_percent_name): + raise TypeError + self._namespace = namespace self._need_capacity_name = need_capacity_name self._build_percent_name = build_percent_name @@ -39,28 +42,18 @@ class QueueMetrics(object): QueueMetrics initializes a reporter for recording metrics of work queues. """ def __init__(self, app=None): - self.app = app - self.sender = None + self._app = app + self._reporter = NullReporter() if app is not None: - self.state = self.init_app(app) - else: - self.state = None + reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config.get('QUEUE_METRICS_NAMESPACE') + req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') + build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - def init_app(self, app): - analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - - if analytics_type == 'CloudWatch': - namespace = app.config.get('QUEUE_METRICS_NAMESPACE') - req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') - build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - - request_queue = get_queue(app) - reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - else: - reporter = NullReporter() - - return reporter + request_queue = get_queue(app) + self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) def __getattr__(self, name): - return getattr(self.state, name, None) + return getattr(self._reporter, name, None) From 0d38e0b00b5db1cf52f9c028bf2ce07c337738b3 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 18 Feb 2015 16:05:36 -0500 Subject: [PATCH 19/20] metrics: use config['name'] to get metric conf --- buildman/jobutil/buildreporter.py | 11 ++++------- util/queuemetrics.py | 9 +++------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index b295c33ea..992b7a06e 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -29,9 +29,6 @@ class CloudWatchBuildReporter(BuildReporter): Implements a BuildReporter for Amazon's CloudWatch. """ def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): - if None in (queue, namespace_name, completed_name, failed_name, incompleted_name): - raise TypeError - self._queue = queue self._namespace_name = namespace_name self._completed_name = completed_name @@ -61,10 +58,10 @@ class BuildMetrics(object): if app is not None: reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') if reporter_type == 'CloudWatch': - namespace = app.config.get('BUILD_METRICS_NAMESPACE') - completed_name = app.config.get('BUILD_METRICS_COMPLETED_NAME') - failed_name = app.config.get('BUILD_METRICS_FAILED_NAME') - incompleted_name = app.config.get('BUILD_METRICS_INCOMPLETED_NAME') + namespace = app.config['BUILD_METRICS_NAMESPACE'] + completed_name = app.config['BUILD_METRICS_COMPLETED_NAME'] + failed_name = app.config['BUILD_METRICS_FAILED_NAME'] + incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME'] request_queue = get_queue(app) self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, failed_name, incompleted_name) diff --git a/util/queuemetrics.py b/util/queuemetrics.py index 0f7090801..9e0a549f4 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -14,9 +14,6 @@ class NullReporter(object): class CloudWatchReporter(object): """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): - if None in (request_queue, namespace, need_capacity_name, build_percent_name): - raise TypeError - self._namespace = namespace self._need_capacity_name = need_capacity_name self._build_percent_name = build_percent_name @@ -47,9 +44,9 @@ class QueueMetrics(object): if app is not None: reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') if reporter_type == 'CloudWatch': - namespace = app.config.get('QUEUE_METRICS_NAMESPACE') - req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') - build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') + namespace = app.config['QUEUE_METRICS_NAMESPACE'] + req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME'] + build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME'] request_queue = get_queue(app) self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, From 9ab3554226186d35f71e79b1d0b1f88487e62417 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 18 Feb 2015 17:11:45 -0500 Subject: [PATCH 20/20] buildreporter: does not execute in a coroutine! --- buildman/jobutil/buildreporter.py | 6 ++++-- util/cloudwatch.py | 7 ++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py index 992b7a06e..16dd0ca5b 100644 --- a/buildman/jobutil/buildreporter.py +++ b/buildman/jobutil/buildreporter.py @@ -11,7 +11,6 @@ class BuildReporter(object): def report_completion_status(self, status): """ Method to invoke the recording of build's completion status to a metric service. - This must _not_ block because it is assumed to run in an event loop. """ raise NotImplementedError @@ -35,6 +34,9 @@ class CloudWatchBuildReporter(BuildReporter): self._failed_name = failed_name self._incompleted_name = incompleted_name + def _send_to_queue(self, *args, **kwargs): + self._queue.put((args, kwargs)) + def report_completion_status(self, status): if status == BuildJobResult.COMPLETE: status_name = self._completed_name @@ -45,7 +47,7 @@ class CloudWatchBuildReporter(BuildReporter): else: return - yield From(self._queue.put(self._namespace_name, status_name, 1, unit='Count')) + self._send_to_queue(self._namespace_name, status_name, 1, unit='Count') class BuildMetrics(object): diff --git a/util/cloudwatch.py b/util/cloudwatch.py index eca563164..b75dadf31 100644 --- a/util/cloudwatch.py +++ b/util/cloudwatch.py @@ -1,6 +1,5 @@ import logging import boto -import thread from Queue import Queue from threading import Thread @@ -12,10 +11,8 @@ def get_queue(app): """ Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. """ - access_key = app.config.get('CLOUDWATCH_AWS_ACCESS_KEY') - secret_key = app.config.get('CLOUDWATCH_AWS_SECRET_KEY') - if None in (access_key, secret_key): - raise TypeError + access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY'] + secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] queue = Queue() sender = CloudWatchSender(queue, access_key, secret_key)