Batch cloudwatch puts
This commit is contained in:
parent
7c3b555ee9
commit
9a7e5bb35e
2 changed files with 56 additions and 9 deletions
|
@ -1,12 +1,19 @@
|
|||
import logging
|
||||
import boto
|
||||
import time
|
||||
|
||||
from Queue import Queue
|
||||
from Queue import Empty
|
||||
from threading import Thread
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_BATCH_METRICS = 100
|
||||
|
||||
# Sleep for this much time between failed send requests.
|
||||
# This prevents hammering cloudwatch when it's not available.
|
||||
FAILED_SEND_SLEEP_SECS = 5
|
||||
|
||||
def start_cloudwatch_sender(metrics, app):
|
||||
"""
|
||||
Starts sending from metrics to a new CloudWatchSender.
|
||||
|
@ -16,7 +23,7 @@ def start_cloudwatch_sender(metrics, app):
|
|||
secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY']
|
||||
namespace = app.config['CLOUDWATCH_NAMESPACE']
|
||||
except KeyError:
|
||||
logger.debug('Cloudwatch not configured')
|
||||
logger.debug('CloudWatch not configured')
|
||||
return
|
||||
|
||||
sender = CloudWatchSender(metrics, access_key, secret_key, namespace)
|
||||
|
@ -44,9 +51,43 @@ class CloudWatchSender(Thread):
|
|||
self._metrics.enable()
|
||||
|
||||
while True:
|
||||
put_metric_args, kwargs = self._metrics.get()
|
||||
logger.debug('Got queued put metrics request.')
|
||||
metrics = {
|
||||
'name': [],
|
||||
'value': [],
|
||||
'unit': [],
|
||||
'timestamp': [],
|
||||
'dimensions': [],
|
||||
}
|
||||
|
||||
metric = self._metrics.get()
|
||||
append_metric(metrics, metric)
|
||||
|
||||
while len(metrics['name']) < MAX_BATCH_METRICS:
|
||||
try:
|
||||
metric = self._metrics.get_nowait()
|
||||
append_metric(metrics, metric)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
try:
|
||||
connection.put_metric_data(self._namespace, *put_metric_args, **kwargs)
|
||||
connection.put_metric_data(self._namespace, **metrics)
|
||||
logger.debug('Sent %d CloudWatch metrics', len(metrics['name']))
|
||||
except:
|
||||
logger.exception('Failed to write to CloudWatch')
|
||||
for i in range(len(metrics['name'])):
|
||||
self._metrics.put(metrics['name'][i], metrics['value'][i],
|
||||
unit=metrics['unit'][i],
|
||||
dimensions=metrics['dimensions'][i],
|
||||
timestamp=metrics['timestamp'][i],
|
||||
)
|
||||
|
||||
logger.exception('Failed to write to CloudWatch: %s', metrics)
|
||||
logger.debug('Attempted to requeue %d metrics.', len(metrics['name']))
|
||||
time.sleep(FAILED_SEND_SLEEP_SECS)
|
||||
|
||||
def append_metric(metrics, m):
|
||||
name, value, kwargs = m
|
||||
metrics['name'].append(name)
|
||||
metrics['value'].append(value)
|
||||
metrics['unit'].append(kwargs.get('unit'))
|
||||
metrics['dimensions'].append(kwargs.get('dimensions'))
|
||||
metrics['timestamp'].append(kwargs.get('timestamp'))
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
|
@ -16,19 +17,24 @@ class MetricQueue(object):
|
|||
def enable(self, maxsize=10000):
|
||||
self._queue = Queue(maxsize)
|
||||
|
||||
def put(self, *args, **kwargs):
|
||||
def put(self, name, value, **kwargs):
|
||||
if self._queue is None:
|
||||
logging.debug('No metric queue: %s %s', args, kwargs)
|
||||
logging.debug('No metric queue: %s %s %s', name, value, kwargs)
|
||||
return
|
||||
|
||||
try:
|
||||
self._queue.put_nowait((args, kwargs))
|
||||
kwargs.setdefault('timestamp', datetime.datetime.now())
|
||||
kwargs.setdefault('dimensions', {})
|
||||
self._queue.put_nowait((name, value, kwargs))
|
||||
except Full:
|
||||
logger.error('Metric queue full')
|
||||
|
||||
def get(self):
|
||||
return self._queue.get()
|
||||
|
||||
def get_nowait(self):
|
||||
return self._queue.get_nowait()
|
||||
|
||||
def time_blueprint(bp, metric_queue):
|
||||
bp.before_request(time_before_request)
|
||||
bp.after_request(time_after_request(bp.name, metric_queue))
|
||||
|
|
Reference in a new issue