Migrate queued metric from processes to threads
This commit is contained in:
parent
07bb9be603
commit
33088f742a
2 changed files with 15 additions and 12 deletions
|
@ -1,7 +1,8 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from multiprocessing import Process, Queue
|
from Queue import Queue
|
||||||
|
from threading import Thread
|
||||||
from mixpanel import Consumer, Mixpanel
|
from mixpanel import Consumer, Mixpanel
|
||||||
|
|
||||||
|
|
||||||
|
@ -17,24 +18,23 @@ class MixpanelQueingConsumer(object):
|
||||||
self._mp_queue.put(json.dumps([endpoint, json_message]))
|
self._mp_queue.put(json.dumps([endpoint, json_message]))
|
||||||
|
|
||||||
|
|
||||||
class SendToMixpanel(Process):
|
class SendToMixpanel(Thread):
|
||||||
def __init__(self, request_queue):
|
def __init__(self, request_queue):
|
||||||
Process.__init__(self)
|
Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
self._mp_queue = request_queue
|
self._mp_queue = request_queue
|
||||||
self._consumer = Consumer()
|
self._consumer = Consumer()
|
||||||
self.daemon = True
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug('Starting mixpanel sender process.')
|
logger.debug('Starting mixpanel sender process.')
|
||||||
while True:
|
while True:
|
||||||
mp_request = self._mp_queue.get()
|
mp_request = self._mp_queue.get()
|
||||||
logger.debug('Got queued mixpanel reqeust.')
|
logger.debug('Got queued mixpanel request.')
|
||||||
try:
|
try:
|
||||||
self._consumer.send(*json.loads(mp_request))
|
self._consumer.send(*json.loads(mp_request))
|
||||||
except:
|
except:
|
||||||
# Make sure we don't crash if Mixpanel request fails.
|
logger.exception('Failed to send Mixpanel request.')
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class FakeMixpanel(object):
|
class FakeMixpanel(object):
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import logging
|
import logging
|
||||||
import boto
|
import boto
|
||||||
|
|
||||||
from multiprocessing import Process, Queue
|
from Queue import Queue
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -35,15 +37,16 @@ class QueueingCloudWatchReporter(object):
|
||||||
unit='Percent')
|
unit='Percent')
|
||||||
|
|
||||||
|
|
||||||
class SendToCloudWatch(Process):
|
class SendToCloudWatch(Thread):
|
||||||
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
|
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
|
||||||
CloudWatch. """
|
CloudWatch. """
|
||||||
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
||||||
Process.__init__(self)
|
Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
self._aws_access_key = aws_access_key
|
self._aws_access_key = aws_access_key
|
||||||
self._aws_secret_key = aws_secret_key
|
self._aws_secret_key = aws_secret_key
|
||||||
self._put_metrics_queue = request_queue
|
self._put_metrics_queue = request_queue
|
||||||
self.daemon = True
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug('Starting CloudWatch sender process.')
|
logger.debug('Starting CloudWatch sender process.')
|
||||||
|
@ -54,7 +57,7 @@ class SendToCloudWatch(Process):
|
||||||
try:
|
try:
|
||||||
connection.put_metric_data(*put_metric_args, **kwargs)
|
connection.put_metric_data(*put_metric_args, **kwargs)
|
||||||
except:
|
except:
|
||||||
logger.exception('Writing to CloudWatch failed')
|
logger.exception('Failed to write to CloudWatch')
|
||||||
|
|
||||||
|
|
||||||
class QueueMetrics(object):
|
class QueueMetrics(object):
|
||||||
|
|
Reference in a new issue