98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
import logging
|
|
import boto
|
|
|
|
from multiprocessing import Process, Queue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NullReporter(object):
|
|
def report(self, *args):
|
|
pass
|
|
|
|
|
|
class QueueingCloudWatchReporter(object):
|
|
""" QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """
|
|
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
|
|
self._namespace = namespace
|
|
self._need_capacity_name = need_capacity_name
|
|
self._build_percent_name = build_percent_name
|
|
self._put_metrics_queue = request_queue
|
|
|
|
def _send_to_queue(self, *args, **kwargs):
|
|
self._put_metrics_queue.put((args, kwargs))
|
|
|
|
def report(self, currently_processing, running_count, total_count):
|
|
logger.debug('Worker indicated %s running count and %s total count', running_count,
|
|
total_count)
|
|
|
|
need_capacity_count = total_count - running_count
|
|
self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count,
|
|
unit='Count')
|
|
|
|
building_percent = 100 if currently_processing else 0
|
|
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
|
|
unit='Percent')
|
|
|
|
|
|
class SendToCloudWatch(Process):
|
|
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
|
|
CloudWatch. """
|
|
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
|
Process.__init__(self)
|
|
self._aws_access_key = aws_access_key
|
|
self._aws_secret_key = aws_secret_key
|
|
self._put_metrics_queue = request_queue
|
|
self.daemon = True
|
|
|
|
def run(self):
|
|
logger.debug('Starting CloudWatch sender process.')
|
|
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
|
|
while True:
|
|
put_metric_args, kwargs = self._put_metrics_queue.get()
|
|
logger.debug('Got queued put metrics request.')
|
|
try:
|
|
connection.put_metric_data(*put_metric_args, **kwargs)
|
|
except:
|
|
logger.exception('Writing to CloudWatch failed')
|
|
|
|
|
|
class QueueMetrics(object):
|
|
def __init__(self, app=None):
|
|
self.app = app
|
|
self.sender = None
|
|
if app is not None:
|
|
self.state = self.init_app(app)
|
|
else:
|
|
self.state = None
|
|
|
|
def init_app(self, app):
|
|
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
|
|
|
if analytics_type == 'CloudWatch':
|
|
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
|
|
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
|
|
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
|
|
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
|
|
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
|
|
|
|
request_queue = Queue()
|
|
reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name,
|
|
build_percent_name)
|
|
self.sender = SendToCloudWatch(request_queue, access_key, secret_key)
|
|
else:
|
|
reporter = NullReporter()
|
|
|
|
# register extension with app
|
|
app.extensions = getattr(app, 'extensions', {})
|
|
app.extensions['queuemetrics'] = reporter
|
|
return reporter
|
|
|
|
def run(self):
|
|
logger.debug('Asked to start CloudWatch reporter')
|
|
if self.sender is not None:
|
|
logger.debug('Starting CloudWatch reporter')
|
|
self.sender.start()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.state, name, None)
|