import json import logging from multiprocessing import Process, Queue from mixpanel import Consumer, Mixpanel logger = logging.getLogger(__name__) class MixpanelQueingConsumer(object): def __init__(self, request_queue): self._mp_queue = request_queue def send(self, endpoint, json_message): logger.debug('Queing mixpanel request.') self._mp_queue.put(json.dumps([endpoint, json_message])) class SendToMixpanel(Process): def __init__(self, request_queue): Process.__init__(self) self._mp_queue = request_queue self._consumer = Consumer() self.daemon = True def run(self): logger.debug('Starting sender process.') while True: mp_request = self._mp_queue.get() logger.debug('Got queued mixpanel reqeust.') self._consumer.send(*json.loads(mp_request)) def init_app(app): logger.debug('Initializing mixpanel with key: %s' % app.config['MIXPANEL_KEY']) request_queue = Queue() mixpanel = Mixpanel(app.config['MIXPANEL_KEY'], MixpanelQueingConsumer(request_queue)) SendToMixpanel(request_queue).start() return mixpanel