From cfb6e884f26f5bf75d2177fcf738bebb155341c1 Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Tue, 11 Aug 2015 16:39:33 -0400 Subject: [PATCH] Refactor metric collection This change adds a generic queue onto which metrics can be pushed. A separate module removes metrics from the queue and adds them to Cloudwatch. Since these are now separate ideas, we can easily change the consumer from Cloudwatch to anything else. This change maintains near feature parity (the only change is there is now just one queue instead of two - not a big deal). --- app.py | 10 ++--- buildman/jobutil/buildreporter.py | 70 ------------------------------- buildman/server.py | 16 ++++++- data/queue.py | 12 +++--- util/saas/cloudwatch.py | 18 ++++---- util/saas/metricqueue.py | 11 +++++ util/saas/queuemetrics.py | 56 ------------------------- 7 files changed, 46 insertions(+), 147 deletions(-) delete mode 100644 buildman/jobutil/buildreporter.py create mode 100644 util/saas/metricqueue.py delete mode 100644 util/saas/queuemetrics.py diff --git a/app.py b/app.py index 4b2e71c66..4a72420b4 100644 --- a/app.py +++ b/app.py @@ -26,11 +26,11 @@ from util.saas.exceptionlog import Sentry from util.names import urn_generator from util.config.oauth import GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuthConfig from util.security.signing import Signer -from util.saas.queuemetrics import QueueMetrics +from util.saas.cloudwatch import send_cloudwatch +from util.saas.metricqueue import MetricQueue from util.config.provider import FileConfigProvider, TestConfigProvider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager -from buildman.jobutil.buildreporter import BuildMetrics OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -124,8 +124,8 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY) userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) -queue_metrics = QueueMetrics(app) -build_metrics = BuildMetrics(app) +metric_queue = MetricQueue() +send_cloudwatch(metric_queue, app) tf = app.config['DB_TRANSACTION_FACTORY'] @@ -137,7 +137,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login] image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'], tf) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, - reporter=queue_metrics.report) + metric_queue=metric_queue) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) database.configure(app.config) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py deleted file mode 100644 index 553f62ee7..000000000 --- a/buildman/jobutil/buildreporter.py +++ /dev/null @@ -1,70 +0,0 @@ -from buildman.enums import BuildJobResult -from util.saas.cloudwatch import get_queue - - -class BuildReporter(object): - """ - Base class for reporting build statuses to a metrics service. - """ - def report_completion_status(self, status): - """ - Method to invoke the recording of build's completion status to a metric service. - """ - raise NotImplementedError - - -class NullReporter(BuildReporter): - """ - The /dev/null of BuildReporters. - """ - def report_completion_status(self, *args): - pass - - -class CloudWatchBuildReporter(BuildReporter): - """ - Implements a BuildReporter for Amazon's CloudWatch. - """ - def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): - self._queue = queue - self._namespace_name = namespace_name - self._completed_name = completed_name - self._failed_name = failed_name - self._incompleted_name = incompleted_name - - def _send_to_queue(self, *args, **kwargs): - self._queue.put((args, kwargs)) - - def report_completion_status(self, status): - if status == BuildJobResult.COMPLETE: - status_name = self._completed_name - elif status == BuildJobResult.ERROR: - status_name = self._failed_name - elif status == BuildJobResult.INCOMPLETE: - status_name = self._incompleted_name - else: - return - - self._send_to_queue(self._namespace_name, status_name, 1, unit='Count') - - -class BuildMetrics(object): - """ - BuildMetrics initializes a reporter for recording the status of build completions. - """ - def __init__(self, app=None): - self._app = app - self._reporter = NullReporter() - if app is not None: - reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') - if reporter_type == 'CloudWatch': - namespace = app.config['BUILD_METRICS_NAMESPACE'] - completed_name = app.config['BUILD_METRICS_COMPLETED_NAME'] - failed_name = app.config['BUILD_METRICS_FAILED_NAME'] - incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME'] - request_queue = get_queue(app) - self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, - failed_name, incompleted_name) - - def __getattr__(self, name): - return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index 655b78495..f2a4feb92 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -16,7 +16,7 @@ from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from app import app, build_metrics +from app import app, metric_queue logger = logging.getLogger(__name__) @@ -151,7 +151,7 @@ class BuilderServer(object): if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() - build_metrics.report_completion_status(job_status) + report_completion_status(job_status) @trollius.coroutine def _work_checker(self): @@ -225,3 +225,15 @@ class BuilderServer(object): # Initialize the work queue checker. yield From(self._work_checker()) + +def report_completion_status(status): + if status == BuildJobResult.COMPLETE: + status_name = 'CompleteBuilds' + elif status == BuildJobResult.ERROR: + status_name = 'FailedBuilds' + elif status == BuildJobResult.INCOMPLETE: + status_name = 'IncompletedBuilds' + else: + return + + metric_queue.put(status_name, 1, unit='Count') diff --git a/data/queue.py b/data/queue.py index 60632f5b1..26cbce8ad 100644 --- a/data/queue.py +++ b/data/queue.py @@ -15,9 +15,9 @@ class NoopWith: class WorkQueue(object): def __init__(self, queue_name, transaction_factory, - canonical_name_match_list=None, reporter=None): + canonical_name_match_list=None, metric_queue=None): self._queue_name = queue_name - self._reporter = reporter + self._metric_queue = metric_queue self._transaction_factory = transaction_factory self._currently_processing = False @@ -75,12 +75,14 @@ class WorkQueue(object): return (running_count, available_not_running_count, available_count) def update_metrics(self): - if self._reporter is None: + if self._metric_queue is None: return (running_count, available_not_running_count, available_count) = self.get_metrics() - self._reporter(self._currently_processing, running_count, - running_count + available_not_running_count) + self._metric_queue.put('BuildCapacityShortage', available_not_running_count, unit='Count') + + building_percent = 100 if self._currently_processing else 0 + self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') def has_retries_remaining(self, item_id): """ Returns whether the queue item with the given id has any retries remaining. If the diff --git a/util/saas/cloudwatch.py b/util/saas/cloudwatch.py index abf71a254..16f93a558 100644 --- a/util/saas/cloudwatch.py +++ b/util/saas/cloudwatch.py @@ -7,29 +7,29 @@ from threading import Thread logger = logging.getLogger(__name__) -def get_queue(app): +def send_cloudwatch(metrics, app): """ - Returns a queue to a new CloudWatchSender. + Starts sending from metrics to a new CloudWatchSender. """ access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY'] secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] + namespace = app.config['CLOUDWATCH_NAMESPACE'] - queue = Queue() - sender = CloudWatchSender(queue, access_key, secret_key) + sender = CloudWatchSender(metrics, access_key, secret_key, namespace) sender.start() - return queue class CloudWatchSender(Thread): """ CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. """ - def __init__(self, request_queue, aws_access_key, aws_secret_key): + def __init__(self, metrics, aws_access_key, aws_secret_key, namespace): Thread.__init__(self) self.daemon = True self._aws_access_key = aws_access_key self._aws_secret_key = aws_secret_key - self._put_metrics_queue = request_queue + self._metrics = metrics + self._namespace = namespace def run(self): try: @@ -39,9 +39,9 @@ class CloudWatchSender(Thread): logger.exception('Failed to connect to CloudWatch.') while True: - put_metric_args, kwargs = self._put_metrics_queue.get() + put_metric_args, kwargs = self._metrics.get() logger.debug('Got queued put metrics request.') try: - connection.put_metric_data(*put_metric_args, **kwargs) + connection.put_metric_data(self._namespace, *put_metric_args, **kwargs) except: logger.exception('Failed to write to CloudWatch') diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py new file mode 100644 index 000000000..f4ad40231 --- /dev/null +++ b/util/saas/metricqueue.py @@ -0,0 +1,11 @@ +from Queue import Queue + +class MetricQueue(object): + def __init__(self): + self._queue = Queue() + + def put(self, *args, **kwargs): + self._queue.put((args, kwargs)) + + def get(self): + return self._queue.get() diff --git a/util/saas/queuemetrics.py b/util/saas/queuemetrics.py deleted file mode 100644 index 62ebd082f..000000000 --- a/util/saas/queuemetrics.py +++ /dev/null @@ -1,56 +0,0 @@ -import logging - -from util.saas.cloudwatch import get_queue - - -logger = logging.getLogger(__name__) - - -class NullReporter(object): - def report(self, *args): - pass - - -class CloudWatchReporter(object): - """ CloudWatchReporter reports work queue metrics to CloudWatch """ - def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): - self._namespace = namespace - self._need_capacity_name = need_capacity_name - self._build_percent_name = build_percent_name - self._put_metrics_queue = request_queue - - def _send_to_queue(self, *args, **kwargs): - self._put_metrics_queue.put((args, kwargs)) - - def report(self, currently_processing, running_count, total_count): - logger.debug('Worker indicated %s running count and %s total count', running_count, - total_count) - - need_capacity_count = total_count - running_count - self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count, - unit='Count') - - building_percent = 100 if currently_processing else 0 - self._send_to_queue(self._namespace, self._build_percent_name, building_percent, - unit='Percent') - -class QueueMetrics(object): - """ - QueueMetrics initializes a reporter for recording metrics of work queues. - """ - def __init__(self, app=None): - self._app = app - self._reporter = NullReporter() - if app is not None: - reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - if reporter_type == 'CloudWatch': - namespace = app.config['QUEUE_METRICS_NAMESPACE'] - req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME'] - build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME'] - - request_queue = get_queue(app) - self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - - def __getattr__(self, name): - return getattr(self._reporter, name, None)