import logging import boto from multiprocessing import Process, Queue logger = logging.getLogger(__name__) class NullReporter(object): def report(self, *args): pass class QueueingCloudWatchReporter(object): def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace self._need_capacity_name = need_capacity_name self._build_percent_name = build_percent_name self._put_metrics_queue = request_queue def _send_to_queue(self, *args, **kwargs): self._put_metrics_queue.put((args, kwargs)) def report(self, currently_processing, running_count, total_count): logger.debug('Worker indicated %s running count and %s total count', running_count, total_count) need_capacity_count = total_count - running_count self._send_to_queue(self._namespace, self._need_capacity_name, need_capacity_count, unit='Count') building_percent = 100 if currently_processing else 0 self._send_to_queue(self._namespace, self._build_percent_name, building_percent, unit='Percent') class SendToCloudWatch(Process): def __init__(self, request_queue, aws_access_key, aws_secret_key): Process.__init__(self) self._aws_access_key = aws_access_key self._aws_secret_key = aws_secret_key self._put_metrics_queue = request_queue self.daemon = True def run(self): logger.debug('Starting cloudwatch sender process.') connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) while True: put_metric_args, kwargs = self._put_metrics_queue.get() logger.debug('Got queued put metrics reqeust.') connection.put_metric_data(*put_metric_args, **kwargs) class QueueMetrics(object): def __init__(self, app=None): self.app = app if app is not None: self.state = self.init_app(app) else: self.state = None def init_app(self, app): analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') if analytics_type == 'CloudWatch': access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') namespace = app.config.get('QUEUE_METRICS_NAMESPACE') req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') request_queue = Queue() reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, build_percent_name) sender = SendToCloudWatch(request_queue, access_key, secret_key) sender.start() else: reporter = NullReporter() # register extension with app app.extensions = getattr(app, 'extensions', {}) app.extensions['queuemetrics'] = reporter return reporter def __getattr__(self, name): return getattr(self.state, name, None)