util: add cloudwatch package

This isolates the CloudWatch sending thread to it's own package where it
can be shared among any other packages that want to asynchronously send
metrics to CloudWatch.
This commit is contained in:
Jimmy Zelinskie 2015-02-14 16:30:10 -05:00
parent a4908d08ef
commit 6a1dd376c2
2 changed files with 60 additions and 48 deletions

51
util/cloudwatch.py Normal file
View file

@ -0,0 +1,51 @@
import logging
import boto
from Queue import Queue
from threading import Thread
logger = logging.getLogger(__name__)
queue = None
def get_queue(app):
"""
Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them.
"""
if queue is None:
global queue
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
queue = Queue()
sender = CloudWatchSender(queue, access_key, secret_key)
sender.start()
else:
return queue
class CloudWatchSender(Thread):
"""
CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch.
"""
def __init__(self, request_queue, aws_access_key, aws_secret_key):
Thread.__init__(self)
self.daemon = True
self._aws_access_key = aws_access_key
self._aws_secret_key = aws_secret_key
self._put_metrics_queue = request_queue
def run(self):
try:
logger.debug('Starting CloudWatch sender process.')
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except:
logger.exception('Failed to connect to CloudWatch.')
while True:
put_metric_args, kwargs = self._put_metrics_queue.get()
logger.debug('Got queued put metrics request.')
try:
connection.put_metric_data(*put_metric_args, **kwargs)
except:
logger.exception('Failed to write to CloudWatch')

View file

@ -1,8 +1,6 @@
import logging
import boto
from Queue import Queue
from threading import Thread
from util.cloudwatch import get_queue
logger = logging.getLogger(__name__)
@ -13,8 +11,8 @@ class NullReporter(object):
pass
class QueueingCloudWatchReporter(object):
""" QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """
class CloudWatchReporter(object):
""" CloudWatchReporter reports work queue metrics to CloudWatch """
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
self._namespace = namespace
self._need_capacity_name = need_capacity_name
@ -36,35 +34,10 @@ class QueueingCloudWatchReporter(object):
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
unit='Percent')
class SendToCloudWatch(Thread):
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
CloudWatch. """
def __init__(self, request_queue, aws_access_key, aws_secret_key):
Thread.__init__(self)
self.daemon = True
self._aws_access_key = aws_access_key
self._aws_secret_key = aws_secret_key
self._put_metrics_queue = request_queue
def run(self):
try:
logger.debug('Starting CloudWatch sender process.')
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except:
logger.exception('Failed to connect to CloudWatch.')
while True:
put_metric_args, kwargs = self._put_metrics_queue.get()
logger.debug('Got queued put metrics request.')
try:
connection.put_metric_data(*put_metric_args, **kwargs)
except:
logger.exception('Failed to write to CloudWatch')
class QueueMetrics(object):
"""
QueueMetrics initializes a reporter for recording metrics of work queues.
"""
def __init__(self, app=None):
self.app = app
self.sender = None
@ -77,29 +50,17 @@ class QueueMetrics(object):
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
if analytics_type == 'CloudWatch':
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
request_queue = Queue()
reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name,
build_percent_name)
self.sender = SendToCloudWatch(request_queue, access_key, secret_key)
request_queue = get_queue(app)
reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
build_percent_name)
else:
reporter = NullReporter()
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['queuemetrics'] = reporter
return reporter
def run(self):
logger.debug('Asked to start CloudWatch reporter')
if self.sender is not None:
logger.debug('Starting CloudWatch reporter')
self.sender.start()
def __getattr__(self, name):
return getattr(self.state, name, None)