diff --git a/test/test_queuefile.py b/test/test_queuefile.py new file mode 100644 index 000000000..15858190b --- /dev/null +++ b/test/test_queuefile.py @@ -0,0 +1,79 @@ +import unittest + +from StringIO import StringIO + +from util.registry.queueprocess import QueueResult +from util.registry.queuefile import QueueFile + +class FakeQueue(object): + def __init__(self): + self.items = [] + + def get(self, block): + return self.items.pop(0) + + def put(self, data): + self.items.append(data) + + +class TestQueueFile(unittest.TestCase): + def test_basic(self): + queue = FakeQueue() + queue.put(QueueResult('hello world', None)) + queue.put(QueueResult('! how goes there?', None)) + queue.put(QueueResult(None, None)) + + queuefile = QueueFile(queue) + self.assertEquals(queuefile.read(), 'hello world! how goes there?') + + def test_chunk_reading(self): + queue = FakeQueue() + queue.put(QueueResult('hello world', None)) + queue.put(QueueResult('! how goes there?', None)) + queue.put(QueueResult(None, None)) + + queuefile = QueueFile(queue) + data = '' + + while True: + result = queuefile.read(size=2) + if not result: + break + + data += result + + self.assertEquals(data, 'hello world! how goes there?') + + def test_unhandled_exception(self): + queue = FakeQueue() + queue.put(QueueResult('hello world', None)) + queue.put(QueueResult(None, IOError('some exception'))) + queue.put(QueueResult('! how goes there?', None)) + queue.put(QueueResult(None, None)) + + queuefile = QueueFile(queue) + + with self.assertRaises(IOError): + queuefile.read(size=12) + + def test_handled_exception(self): + queue = FakeQueue() + queue.put(QueueResult('hello world', None)) + queue.put(QueueResult(None, IOError('some exception'))) + queue.put(QueueResult('! how goes there?', None)) + queue.put(QueueResult(None, None)) + + ex_found = [None] + + def handler(ex): + ex_found[0] = ex + + queuefile = QueueFile(queue) + queuefile.add_exception_handler(handler) + queuefile.read(size=12) + + self.assertIsNotNone(ex_found[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/util/registry/queuefile.py b/util/registry/queuefile.py index 3f26ffc6b..28012a852 100644 --- a/util/registry/queuefile.py +++ b/util/registry/queuefile.py @@ -1,5 +1,5 @@ class QueueFile(object): - """ Class which implements a file-like interface and reads from a blocking + """ Class which implements a file-like interface and reads QueueResult's from a blocking multiprocessing queue. """ def __init__(self, queue, name=None): @@ -15,40 +15,61 @@ class QueueFile(object): def add_exception_handler(self, handler): self._exception_handlers.append(handler) - def read(self, size=8192): + def read(self, size=-1): + # If the queuefile was closed or we have finished, send back any remaining data. if self._closed or self._done: + if size == -1: + buf = self._buffer + self._buffer = '' + return buf + buf = self._buffer[0:size] self._buffer = self._buffer[size:] return buf - while len(self._buffer) < size: + # Loop until we reach the requested data size (or forever if all data was requested). + while (len(self._buffer) < size) or (size == -1): result = self._queue.get(block=True) - if result is None: - self._done = True - break - if isinstance(result, Exception): + # Check for any exceptions raised by the queue process. + if result.exception is not None: self._closed = True self.raised_exception = True + # Fire off the exception to any registered handlers. If no handlers were registered, + # then raise the exception locally. handled = False for handler in self._exception_handlers: - handler(result) + handler(result.exception) handled = True if handled: return + else: + raise result.exception - raise result + # Check for no further data. If the QueueProcess has finished producing data, then break + # out of the loop to return the data already acquired. + if result.data is None: + self._done = True + break - self._buffer += result - self._total_size += len(result) + # Add the data to the buffer. + self._buffer += result.data + self._total_size += len(result.data) + + # Return the requested slice of the buffer. + if size == -1: + buf = self._buffer + self._buffer = '' + return buf buf = self._buffer[0:size] self._buffer = self._buffer[size:] return buf def flush(self): + # Purposefully not implemented. pass def close(self): diff --git a/util/registry/queueprocess.py b/util/registry/queueprocess.py index ebf786796..5cf3f20d0 100644 --- a/util/registry/queueprocess.py +++ b/util/registry/queueprocess.py @@ -1,7 +1,8 @@ from multiprocessing import Process, Queue +from collections import namedtuple + import logging import multiprocessing -import os import time import sys import traceback @@ -46,23 +47,27 @@ class QueueProcess(object): args = (self._get_producer, self._queues, self._chunk_size, self._args) QueueProcess.run_process(_run, args, finished=self._finished) + +QueueResult = namedtuple('QueueResult', ['data', 'exception']) + def _run(get_producer, queues, chunk_size, args): producer = get_producer(*args) while True: try: - data = producer(chunk_size) or None + result = QueueResult(producer(chunk_size) or None, None) except Exception as ex: message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info()))) - data = Exception(message) + result = QueueResult(None, Exception(message)) for queue in queues: try: - queue.put(data, block=True) + queue.put(result, block=True) except Exception as ex: logger.exception('Exception writing to queue.') return - if data is None or isinstance(data, Exception): + # Terminate the producer loop if the data produced is empty or an exception occurred. + if result.data is None or result.exception is not None: break # Important! This allows the thread that writes the queue data to the pipe