Refactor metric collection

This change adds a generic queue onto which metrics can be pushed. A
separate module removes metrics from the queue and adds them to Cloudwatch.
Since these are now separate ideas, we can easily change the consumer from
Cloudwatch to anything else.

This change maintains near feature parity (the only change is there is now
just one queue instead of two - not a big deal).
This commit is contained in:
Matt Jibson 2015-08-11 16:39:33 -04:00
parent aee746bec6
commit cfb6e884f2
7 changed files with 46 additions and 147 deletions

10
app.py
View file

@ -26,11 +26,11 @@ from util.saas.exceptionlog import Sentry
from util.names import urn_generator
from util.config.oauth import GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuthConfig
from util.security.signing import Signer
from util.saas.queuemetrics import QueueMetrics
from util.saas.cloudwatch import send_cloudwatch
from util.saas.metricqueue import MetricQueue
from util.config.provider import FileConfigProvider, TestConfigProvider
from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager
from buildman.jobutil.buildreporter import BuildMetrics
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'
@ -124,8 +124,8 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY)
userevents = UserEventsBuilderModule(app)
superusers = SuperUserManager(app)
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
queue_metrics = QueueMetrics(app)
build_metrics = BuildMetrics(app)
metric_queue = MetricQueue()
send_cloudwatch(metric_queue, app)
tf = app.config['DB_TRANSACTION_FACTORY']
@ -137,7 +137,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login]
image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'], tf)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
reporter=queue_metrics.report)
metric_queue=metric_queue)
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
database.configure(app.config)

View file

@ -1,70 +0,0 @@
from buildman.enums import BuildJobResult
from util.saas.cloudwatch import get_queue
class BuildReporter(object):
"""
Base class for reporting build statuses to a metrics service.
"""
def report_completion_status(self, status):
"""
Method to invoke the recording of build's completion status to a metric service.
"""
raise NotImplementedError
class NullReporter(BuildReporter):
"""
The /dev/null of BuildReporters.
"""
def report_completion_status(self, *args):
pass
class CloudWatchBuildReporter(BuildReporter):
"""
Implements a BuildReporter for Amazon's CloudWatch.
"""
def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name):
self._queue = queue
self._namespace_name = namespace_name
self._completed_name = completed_name
self._failed_name = failed_name
self._incompleted_name = incompleted_name
def _send_to_queue(self, *args, **kwargs):
self._queue.put((args, kwargs))
def report_completion_status(self, status):
if status == BuildJobResult.COMPLETE:
status_name = self._completed_name
elif status == BuildJobResult.ERROR:
status_name = self._failed_name
elif status == BuildJobResult.INCOMPLETE:
status_name = self._incompleted_name
else:
return
self._send_to_queue(self._namespace_name, status_name, 1, unit='Count')
class BuildMetrics(object):
"""
BuildMetrics initializes a reporter for recording the status of build completions.
"""
def __init__(self, app=None):
self._app = app
self._reporter = NullReporter()
if app is not None:
reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null')
if reporter_type == 'CloudWatch':
namespace = app.config['BUILD_METRICS_NAMESPACE']
completed_name = app.config['BUILD_METRICS_COMPLETED_NAME']
failed_name = app.config['BUILD_METRICS_FAILED_NAME']
incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME']
request_queue = get_queue(app)
self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name,
failed_name, incompleted_name)
def __getattr__(self, name):
return getattr(self._reporter, name, None)

View file

@ -16,7 +16,7 @@ from buildman.enums import BuildJobResult, BuildServerStatus
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database
from app import app, build_metrics
from app import app, metric_queue
logger = logging.getLogger(__name__)
@ -151,7 +151,7 @@ class BuilderServer(object):
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
self._shutdown_event.set()
build_metrics.report_completion_status(job_status)
report_completion_status(job_status)
@trollius.coroutine
def _work_checker(self):
@ -225,3 +225,15 @@ class BuilderServer(object):
# Initialize the work queue checker.
yield From(self._work_checker())
def report_completion_status(status):
if status == BuildJobResult.COMPLETE:
status_name = 'CompleteBuilds'
elif status == BuildJobResult.ERROR:
status_name = 'FailedBuilds'
elif status == BuildJobResult.INCOMPLETE:
status_name = 'IncompletedBuilds'
else:
return
metric_queue.put(status_name, 1, unit='Count')

View file

@ -15,9 +15,9 @@ class NoopWith:
class WorkQueue(object):
def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None):
canonical_name_match_list=None, metric_queue=None):
self._queue_name = queue_name
self._reporter = reporter
self._metric_queue = metric_queue
self._transaction_factory = transaction_factory
self._currently_processing = False
@ -75,12 +75,14 @@ class WorkQueue(object):
return (running_count, available_not_running_count, available_count)
def update_metrics(self):
if self._reporter is None:
if self._metric_queue is None:
return
(running_count, available_not_running_count, available_count) = self.get_metrics()
self._reporter(self._currently_processing, running_count,
running_count + available_not_running_count)
self._metric_queue.put('BuildCapacityShortage', available_not_running_count, unit='Count')
building_percent = 100 if self._currently_processing else 0
self._metric_queue.put('PercentBuilding', building_percent, unit='Percent')
def has_retries_remaining(self, item_id):
""" Returns whether the queue item with the given id has any retries remaining. If the

View file

@ -7,29 +7,29 @@ from threading import Thread
logger = logging.getLogger(__name__)
def get_queue(app):
def send_cloudwatch(metrics, app):
"""
Returns a queue to a new CloudWatchSender.
Starts sending from metrics to a new CloudWatchSender.
"""
access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY']
secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY']
namespace = app.config['CLOUDWATCH_NAMESPACE']
queue = Queue()
sender = CloudWatchSender(queue, access_key, secret_key)
sender = CloudWatchSender(metrics, access_key, secret_key, namespace)
sender.start()
return queue
class CloudWatchSender(Thread):
"""
CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch.
"""
def __init__(self, request_queue, aws_access_key, aws_secret_key):
def __init__(self, metrics, aws_access_key, aws_secret_key, namespace):
Thread.__init__(self)
self.daemon = True
self._aws_access_key = aws_access_key
self._aws_secret_key = aws_secret_key
self._put_metrics_queue = request_queue
self._metrics = metrics
self._namespace = namespace
def run(self):
try:
@ -39,9 +39,9 @@ class CloudWatchSender(Thread):
logger.exception('Failed to connect to CloudWatch.')
while True:
put_metric_args, kwargs = self._put_metrics_queue.get()
put_metric_args, kwargs = self._metrics.get()
logger.debug('Got queued put metrics request.')
try:
connection.put_metric_data(*put_metric_args, **kwargs)
connection.put_metric_data(self._namespace, *put_metric_args, **kwargs)
except:
logger.exception('Failed to write to CloudWatch')

11
util/saas/metricqueue.py Normal file
View file

@ -0,0 +1,11 @@
from Queue import Queue
class MetricQueue(object):
def __init__(self):
self._queue = Queue()
def put(self, *args, **kwargs):
self._queue.put((args, kwargs))
def get(self):
return self._queue.get()

View file

@ -1,56 +0,0 @@
import logging
from util.saas.cloudwatch import get_queue
logger = logging.getLogger(__name__)
class NullReporter(object):
def report(self, *args):
pass
class CloudWatchReporter(object):
""" CloudWatchReporter reports work queue metrics to CloudWatch """
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
self._namespace = namespace
self._need_capacity_name = need_capacity_name
self._build_percent_name = build_percent_name
self._put_metrics_queue = request_queue
def _send_to_queue(self, *args, **kwargs):
self._put_metrics_queue.put((args, kwargs))
def report(self, currently_processing, running_count, total_count):
logger.debug('Worker indicated %s running count and %s total count', running_count,
total_count)
need_capacity_count = total_count - running_count
self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count,
unit='Count')
building_percent = 100 if currently_processing else 0
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
unit='Percent')
class QueueMetrics(object):
"""
QueueMetrics initializes a reporter for recording metrics of work queues.
"""
def __init__(self, app=None):
self._app = app
self._reporter = NullReporter()
if app is not None:
reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
if reporter_type == 'CloudWatch':
namespace = app.config['QUEUE_METRICS_NAMESPACE']
req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME']
build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME']
request_queue = get_queue(app)
self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
build_percent_name)
def __getattr__(self, name):
return getattr(self._reporter, name, None)