diff --git a/app.py b/app.py index 4b2e71c66..4a72420b4 100644 --- a/app.py +++ b/app.py @@ -26,11 +26,11 @@ from util.saas.exceptionlog import Sentry from util.names import urn_generator from util.config.oauth import GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuthConfig from util.security.signing import Signer -from util.saas.queuemetrics import QueueMetrics +from util.saas.cloudwatch import send_cloudwatch +from util.saas.metricqueue import MetricQueue from util.config.provider import FileConfigProvider, TestConfigProvider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager -from buildman.jobutil.buildreporter import BuildMetrics OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -124,8 +124,8 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY) userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) -queue_metrics = QueueMetrics(app) -build_metrics = BuildMetrics(app) +metric_queue = MetricQueue() +send_cloudwatch(metric_queue, app) tf = app.config['DB_TRANSACTION_FACTORY'] @@ -137,7 +137,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login] image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'], tf) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, - reporter=queue_metrics.report) + metric_queue=metric_queue) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) database.configure(app.config) diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py deleted file mode 100644 index 553f62ee7..000000000 --- a/buildman/jobutil/buildreporter.py +++ /dev/null @@ -1,70 +0,0 @@ -from buildman.enums import BuildJobResult -from util.saas.cloudwatch import get_queue - - -class BuildReporter(object): - """ - Base class for reporting build statuses to a metrics service. - """ - def report_completion_status(self, status): - """ - Method to invoke the recording of build's completion status to a metric service. - """ - raise NotImplementedError - - -class NullReporter(BuildReporter): - """ - The /dev/null of BuildReporters. - """ - def report_completion_status(self, *args): - pass - - -class CloudWatchBuildReporter(BuildReporter): - """ - Implements a BuildReporter for Amazon's CloudWatch. - """ - def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): - self._queue = queue - self._namespace_name = namespace_name - self._completed_name = completed_name - self._failed_name = failed_name - self._incompleted_name = incompleted_name - - def _send_to_queue(self, *args, **kwargs): - self._queue.put((args, kwargs)) - - def report_completion_status(self, status): - if status == BuildJobResult.COMPLETE: - status_name = self._completed_name - elif status == BuildJobResult.ERROR: - status_name = self._failed_name - elif status == BuildJobResult.INCOMPLETE: - status_name = self._incompleted_name - else: - return - - self._send_to_queue(self._namespace_name, status_name, 1, unit='Count') - - -class BuildMetrics(object): - """ - BuildMetrics initializes a reporter for recording the status of build completions. - """ - def __init__(self, app=None): - self._app = app - self._reporter = NullReporter() - if app is not None: - reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') - if reporter_type == 'CloudWatch': - namespace = app.config['BUILD_METRICS_NAMESPACE'] - completed_name = app.config['BUILD_METRICS_COMPLETED_NAME'] - failed_name = app.config['BUILD_METRICS_FAILED_NAME'] - incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME'] - request_queue = get_queue(app) - self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, - failed_name, incompleted_name) - - def __getattr__(self, name): - return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index 655b78495..f2a4feb92 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -16,7 +16,7 @@ from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from app import app, build_metrics +from app import app, metric_queue logger = logging.getLogger(__name__) @@ -151,7 +151,7 @@ class BuilderServer(object): if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() - build_metrics.report_completion_status(job_status) + report_completion_status(job_status) @trollius.coroutine def _work_checker(self): @@ -225,3 +225,15 @@ class BuilderServer(object): # Initialize the work queue checker. yield From(self._work_checker()) + +def report_completion_status(status): + if status == BuildJobResult.COMPLETE: + status_name = 'CompleteBuilds' + elif status == BuildJobResult.ERROR: + status_name = 'FailedBuilds' + elif status == BuildJobResult.INCOMPLETE: + status_name = 'IncompletedBuilds' + else: + return + + metric_queue.put(status_name, 1, unit='Count') diff --git a/data/queue.py b/data/queue.py index 60632f5b1..26cbce8ad 100644 --- a/data/queue.py +++ b/data/queue.py @@ -15,9 +15,9 @@ class NoopWith: class WorkQueue(object): def __init__(self, queue_name, transaction_factory, - canonical_name_match_list=None, reporter=None): + canonical_name_match_list=None, metric_queue=None): self._queue_name = queue_name - self._reporter = reporter + self._metric_queue = metric_queue self._transaction_factory = transaction_factory self._currently_processing = False @@ -75,12 +75,14 @@ class WorkQueue(object): return (running_count, available_not_running_count, available_count) def update_metrics(self): - if self._reporter is None: + if self._metric_queue is None: return (running_count, available_not_running_count, available_count) = self.get_metrics() - self._reporter(self._currently_processing, running_count, - running_count + available_not_running_count) + self._metric_queue.put('BuildCapacityShortage', available_not_running_count, unit='Count') + + building_percent = 100 if self._currently_processing else 0 + self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') def has_retries_remaining(self, item_id): """ Returns whether the queue item with the given id has any retries remaining. If the diff --git a/util/saas/cloudwatch.py b/util/saas/cloudwatch.py index abf71a254..16f93a558 100644 --- a/util/saas/cloudwatch.py +++ b/util/saas/cloudwatch.py @@ -7,29 +7,29 @@ from threading import Thread logger = logging.getLogger(__name__) -def get_queue(app): +def send_cloudwatch(metrics, app): """ - Returns a queue to a new CloudWatchSender. + Starts sending from metrics to a new CloudWatchSender. """ access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY'] secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] + namespace = app.config['CLOUDWATCH_NAMESPACE'] - queue = Queue() - sender = CloudWatchSender(queue, access_key, secret_key) + sender = CloudWatchSender(metrics, access_key, secret_key, namespace) sender.start() - return queue class CloudWatchSender(Thread): """ CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. """ - def __init__(self, request_queue, aws_access_key, aws_secret_key): + def __init__(self, metrics, aws_access_key, aws_secret_key, namespace): Thread.__init__(self) self.daemon = True self._aws_access_key = aws_access_key self._aws_secret_key = aws_secret_key - self._put_metrics_queue = request_queue + self._metrics = metrics + self._namespace = namespace def run(self): try: @@ -39,9 +39,9 @@ class CloudWatchSender(Thread): logger.exception('Failed to connect to CloudWatch.') while True: - put_metric_args, kwargs = self._put_metrics_queue.get() + put_metric_args, kwargs = self._metrics.get() logger.debug('Got queued put metrics request.') try: - connection.put_metric_data(*put_metric_args, **kwargs) + connection.put_metric_data(self._namespace, *put_metric_args, **kwargs) except: logger.exception('Failed to write to CloudWatch') diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py new file mode 100644 index 000000000..f4ad40231 --- /dev/null +++ b/util/saas/metricqueue.py @@ -0,0 +1,11 @@ +from Queue import Queue + +class MetricQueue(object): + def __init__(self): + self._queue = Queue() + + def put(self, *args, **kwargs): + self._queue.put((args, kwargs)) + + def get(self): + return self._queue.get() diff --git a/util/saas/queuemetrics.py b/util/saas/queuemetrics.py deleted file mode 100644 index 62ebd082f..000000000 --- a/util/saas/queuemetrics.py +++ /dev/null @@ -1,56 +0,0 @@ -import logging - -from util.saas.cloudwatch import get_queue - - -logger = logging.getLogger(__name__) - - -class NullReporter(object): - def report(self, *args): - pass - - -class CloudWatchReporter(object): - """ CloudWatchReporter reports work queue metrics to CloudWatch """ - def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): - self._namespace = namespace - self._need_capacity_name = need_capacity_name - self._build_percent_name = build_percent_name - self._put_metrics_queue = request_queue - - def _send_to_queue(self, *args, **kwargs): - self._put_metrics_queue.put((args, kwargs)) - - def report(self, currently_processing, running_count, total_count): - logger.debug('Worker indicated %s running count and %s total count', running_count, - total_count) - - need_capacity_count = total_count - running_count - self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count, - unit='Count') - - building_percent = 100 if currently_processing else 0 - self._send_to_queue(self._namespace, self._build_percent_name, building_percent, - unit='Percent') - -class QueueMetrics(object): - """ - QueueMetrics initializes a reporter for recording metrics of work queues. - """ - def __init__(self, app=None): - self._app = app - self._reporter = NullReporter() - if app is not None: - reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - if reporter_type == 'CloudWatch': - namespace = app.config['QUEUE_METRICS_NAMESPACE'] - req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME'] - build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME'] - - request_queue = get_queue(app) - self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - - def __getattr__(self, name): - return getattr(self._reporter, name, None)