diff --git a/util/saas/cloudwatch.py b/util/saas/cloudwatch.py index 442c9091e..c25bf7e09 100644 --- a/util/saas/cloudwatch.py +++ b/util/saas/cloudwatch.py @@ -1,12 +1,19 @@ import logging import boto +import time -from Queue import Queue +from Queue import Empty from threading import Thread logger = logging.getLogger(__name__) +MAX_BATCH_METRICS = 100 + +# Sleep for this much time between failed send requests. +# This prevents hammering cloudwatch when it's not available. +FAILED_SEND_SLEEP_SECS = 5 + def start_cloudwatch_sender(metrics, app): """ Starts sending from metrics to a new CloudWatchSender. @@ -16,7 +23,7 @@ def start_cloudwatch_sender(metrics, app): secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] namespace = app.config['CLOUDWATCH_NAMESPACE'] except KeyError: - logger.debug('Cloudwatch not configured') + logger.debug('CloudWatch not configured') return sender = CloudWatchSender(metrics, access_key, secret_key, namespace) @@ -44,9 +51,43 @@ class CloudWatchSender(Thread): self._metrics.enable() while True: - put_metric_args, kwargs = self._metrics.get() - logger.debug('Got queued put metrics request.') + metrics = { + 'name': [], + 'value': [], + 'unit': [], + 'timestamp': [], + 'dimensions': [], + } + + metric = self._metrics.get() + append_metric(metrics, metric) + + while len(metrics['name']) < MAX_BATCH_METRICS: + try: + metric = self._metrics.get_nowait() + append_metric(metrics, metric) + except Empty: + break + try: - connection.put_metric_data(self._namespace, *put_metric_args, **kwargs) + connection.put_metric_data(self._namespace, **metrics) + logger.debug('Sent %d CloudWatch metrics', len(metrics['name'])) except: - logger.exception('Failed to write to CloudWatch') + for i in range(len(metrics['name'])): + self._metrics.put(metrics['name'][i], metrics['value'][i], + unit=metrics['unit'][i], + dimensions=metrics['dimensions'][i], + timestamp=metrics['timestamp'][i], + ) + + logger.exception('Failed to write to CloudWatch: %s', metrics) + logger.debug('Attempted to requeue %d metrics.', len(metrics['name'])) + time.sleep(FAILED_SEND_SLEEP_SECS) + +def append_metric(metrics, m): + name, value, kwargs = m + metrics['name'].append(name) + metrics['value'].append(value) + metrics['unit'].append(kwargs.get('unit')) + metrics['dimensions'].append(kwargs.get('dimensions')) + metrics['timestamp'].append(kwargs.get('timestamp')) diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py index 42d2bf543..b99ec922f 100644 --- a/util/saas/metricqueue.py +++ b/util/saas/metricqueue.py @@ -1,3 +1,4 @@ +import datetime import logging import time @@ -16,19 +17,24 @@ class MetricQueue(object): def enable(self, maxsize=10000): self._queue = Queue(maxsize) - def put(self, *args, **kwargs): + def put(self, name, value, **kwargs): if self._queue is None: - logging.debug('No metric queue: %s %s', args, kwargs) + logging.debug('No metric queue: %s %s %s', name, value, kwargs) return try: - self._queue.put_nowait((args, kwargs)) + kwargs.setdefault('timestamp', datetime.datetime.now()) + kwargs.setdefault('dimensions', {}) + self._queue.put_nowait((name, value, kwargs)) except Full: logger.error('Metric queue full') def get(self): return self._queue.get() + def get_nowait(self): + return self._queue.get_nowait() + def time_blueprint(bp, metric_queue): bp.before_request(time_before_request) bp.after_request(time_after_request(bp.name, metric_queue))