import logging import boto from Queue import Queue from threading import Thread logger = logging.getLogger(__name__) class NullReporter(object): def report(self, *args): pass class QueueingCloudWatchReporter(object): """ QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace self._need_capacity_name = need_capacity_name self._build_percent_name = build_percent_name self._put_metrics_queue = request_queue def _send_to_queue(self, *args, **kwargs): self._put_metrics_queue.put((args, kwargs)) def report(self, currently_processing, running_count, total_count): logger.debug('Worker indicated %s running count and %s total count', running_count, total_count) need_capacity_count = total_count - running_count self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count, unit='Count') building_percent = 100 if currently_processing else 0 self._send_to_queue(self._namespace, self._build_percent_name, building_percent, unit='Percent') class SendToCloudWatch(Thread): """ SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. """ def __init__(self, request_queue, aws_access_key, aws_secret_key): Thread.__init__(self) self.daemon = True self._aws_access_key = aws_access_key self._aws_secret_key = aws_secret_key self._put_metrics_queue = request_queue def run(self): try: logger.debug('Starting CloudWatch sender process.') connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) except: logger.exception('Failed to connect to CloudWatch.') while True: put_metric_args, kwargs = self._put_metrics_queue.get() logger.debug('Got queued put metrics request.') try: connection.put_metric_data(*put_metric_args, **kwargs) except: logger.exception('Failed to write to CloudWatch') class QueueMetrics(object): def __init__(self, app=None): self.app = app self.sender = None if app is not None: self.state = self.init_app(app) else: self.state = None def init_app(self, app): analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') if analytics_type == 'CloudWatch': access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') namespace = app.config.get('QUEUE_METRICS_NAMESPACE') req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') request_queue = Queue() reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, build_percent_name) self.sender = SendToCloudWatch(request_queue, access_key, secret_key) else: reporter = NullReporter() # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['queuemetrics'] = reporter return reporter def run(self): logger.debug('Asked to start CloudWatch reporter') if self.sender is not None: logger.debug('Starting CloudWatch reporter') self.sender.start() def __getattr__(self, name): return getattr(self.state, name, None)