from multiprocessing import Process, Queue import logging import multiprocessing import os import time import gipc import sys import traceback logger = multiprocessing.log_to_stderr() logger.setLevel(logging.INFO) class QueueProcess(object): """ Helper class which invokes a worker in a process to produce data for one (or more) queues. """ def __init__(self, get_producer, chunk_size, max_size, args): self._get_producer = get_producer self._queues = [] self._chunk_size = chunk_size self._max_size = max_size self._args = args or [] def create_queue(self): """ Adds a multiprocessing queue to the list of queues. Any queues added will have the data produced appended. """ queue = Queue(self._max_size / self._chunk_size) self._queues.append(queue) return queue @staticmethod def run_process(target, args): gipc.start_process(target=target, args=args) def run(self): # Important! gipc is used here because normal multiprocessing does not work # correctly with gevent when we sleep. args = (self._get_producer, self._queues, self._chunk_size, self._args) QueueProcess.run_process(_run, args) def _run(get_producer, queues, chunk_size, args): producer = get_producer(*args) while True: try: data = producer(chunk_size) or None except Exception as ex: message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info()))) data = Exception(message) for queue in queues: try: queue.put(data, block=True, timeout=10) except Exception as ex: # One of the listeners stopped listening. return if data is None or isinstance(data, Exception): break # Important! This allows the thread that writes the queue data to the pipe # to do so. Otherwise, this hangs. time.sleep(0)