93 lines
2.7 KiB
Python
93 lines
2.7 KiB
Python
import logging
|
|
import boto
|
|
import time
|
|
|
|
from Queue import Empty
|
|
from threading import Thread
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_BATCH_METRICS = 20
|
|
|
|
# Sleep for this much time between failed send requests.
|
|
# This prevents hammering cloudwatch when it's not available.
|
|
FAILED_SEND_SLEEP_SECS = 5
|
|
|
|
def start_cloudwatch_sender(metrics, app):
|
|
"""
|
|
Starts sending from metrics to a new CloudWatchSender.
|
|
"""
|
|
access_key = app.config.get('CLOUDWATCH_AWS_ACCESS_KEY')
|
|
secret_key = app.config.get('CLOUDWATCH_AWS_SECRET_KEY')
|
|
namespace = app.config.get('CLOUDWATCH_NAMESPACE')
|
|
|
|
if not namespace:
|
|
logger.debug('CloudWatch not configured')
|
|
return
|
|
|
|
sender = CloudWatchSender(metrics, access_key, secret_key, namespace)
|
|
sender.start()
|
|
|
|
class CloudWatchSender(Thread):
|
|
"""
|
|
CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch.
|
|
"""
|
|
def __init__(self, metrics, aws_access_key, aws_secret_key, namespace):
|
|
Thread.__init__(self)
|
|
self.daemon = True
|
|
|
|
self._aws_access_key = aws_access_key
|
|
self._aws_secret_key = aws_secret_key
|
|
self._metrics = metrics
|
|
self._namespace = namespace
|
|
|
|
def run(self):
|
|
try:
|
|
logger.debug('Starting CloudWatch sender process.')
|
|
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
|
|
except:
|
|
logger.exception('Failed to connect to CloudWatch.')
|
|
self._metrics.enable_deprecated()
|
|
|
|
while True:
|
|
metrics = {
|
|
'name': [],
|
|
'value': [],
|
|
'unit': [],
|
|
'timestamp': [],
|
|
'dimensions': [],
|
|
}
|
|
|
|
metric = self._metrics.get_deprecated()
|
|
append_metric(metrics, metric)
|
|
|
|
while len(metrics['name']) < MAX_BATCH_METRICS:
|
|
try:
|
|
metric = self._metrics.get_nowait_deprecated()
|
|
append_metric(metrics, metric)
|
|
except Empty:
|
|
break
|
|
|
|
try:
|
|
connection.put_metric_data(self._namespace, **metrics)
|
|
logger.debug('Sent %d CloudWatch metrics', len(metrics['name']))
|
|
except:
|
|
for i in range(len(metrics['name'])):
|
|
self._metrics.put_deprecated(metrics['name'][i], metrics['value'][i],
|
|
unit=metrics['unit'][i],
|
|
dimensions=metrics['dimensions'][i],
|
|
timestamp=metrics['timestamp'][i],
|
|
)
|
|
|
|
logger.exception('Failed to write to CloudWatch: %s', metrics)
|
|
logger.debug('Attempted to requeue %d metrics.', len(metrics['name']))
|
|
time.sleep(FAILED_SEND_SLEEP_SECS)
|
|
|
|
def append_metric(metrics, m):
|
|
name, value, kwargs = m
|
|
metrics['name'].append(name)
|
|
metrics['value'].append(value)
|
|
metrics['unit'].append(kwargs.get('unit'))
|
|
metrics['dimensions'].append(kwargs.get('dimensions'))
|
|
metrics['timestamp'].append(kwargs.get('timestamp'))
|