Fix QueueFile to support read-to-end semantics and add some tests

This commit is contained in:
Joseph Schorr 2016-03-29 17:01:15 -04:00
parent 2c7aae10a9
commit 6251e63e0e
3 changed files with 121 additions and 16 deletions

79
test/test_queuefile.py Normal file
View file

@ -0,0 +1,79 @@
import unittest
from StringIO import StringIO
from util.registry.queueprocess import QueueResult
from util.registry.queuefile import QueueFile
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self, block):
return self.items.pop(0)
def put(self, data):
self.items.append(data)
class TestQueueFile(unittest.TestCase):
def test_basic(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
self.assertEquals(queuefile.read(), 'hello world! how goes there?')
def test_chunk_reading(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
data = ''
while True:
result = queuefile.read(size=2)
if not result:
break
data += result
self.assertEquals(data, 'hello world! how goes there?')
def test_unhandled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
with self.assertRaises(IOError):
queuefile.read(size=12)
def test_handled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
ex_found = [None]
def handler(ex):
ex_found[0] = ex
queuefile = QueueFile(queue)
queuefile.add_exception_handler(handler)
queuefile.read(size=12)
self.assertIsNotNone(ex_found[0])
if __name__ == '__main__':
unittest.main()

View file

@ -1,5 +1,5 @@
class QueueFile(object): class QueueFile(object):
""" Class which implements a file-like interface and reads from a blocking """ Class which implements a file-like interface and reads QueueResult's from a blocking
multiprocessing queue. multiprocessing queue.
""" """
def __init__(self, queue, name=None): def __init__(self, queue, name=None):
@ -15,40 +15,61 @@ class QueueFile(object):
def add_exception_handler(self, handler): def add_exception_handler(self, handler):
self._exception_handlers.append(handler) self._exception_handlers.append(handler)
def read(self, size=8192): def read(self, size=-1):
# If the queuefile was closed or we have finished, send back any remaining data.
if self._closed or self._done: if self._closed or self._done:
if size == -1:
buf = self._buffer
self._buffer = ''
return buf
buf = self._buffer[0:size] buf = self._buffer[0:size]
self._buffer = self._buffer[size:] self._buffer = self._buffer[size:]
return buf return buf
while len(self._buffer) < size: # Loop until we reach the requested data size (or forever if all data was requested).
while (len(self._buffer) < size) or (size == -1):
result = self._queue.get(block=True) result = self._queue.get(block=True)
if result is None:
self._done = True
break
if isinstance(result, Exception): # Check for any exceptions raised by the queue process.
if result.exception is not None:
self._closed = True self._closed = True
self.raised_exception = True self.raised_exception = True
# Fire off the exception to any registered handlers. If no handlers were registered,
# then raise the exception locally.
handled = False handled = False
for handler in self._exception_handlers: for handler in self._exception_handlers:
handler(result) handler(result.exception)
handled = True handled = True
if handled: if handled:
return return
else:
raise result.exception
raise result # Check for no further data. If the QueueProcess has finished producing data, then break
# out of the loop to return the data already acquired.
if result.data is None:
self._done = True
break
self._buffer += result # Add the data to the buffer.
self._total_size += len(result) self._buffer += result.data
self._total_size += len(result.data)
# Return the requested slice of the buffer.
if size == -1:
buf = self._buffer
self._buffer = ''
return buf
buf = self._buffer[0:size] buf = self._buffer[0:size]
self._buffer = self._buffer[size:] self._buffer = self._buffer[size:]
return buf return buf
def flush(self): def flush(self):
# Purposefully not implemented.
pass pass
def close(self): def close(self):

View file

@ -1,7 +1,8 @@
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from collections import namedtuple
import logging import logging
import multiprocessing import multiprocessing
import os
import time import time
import sys import sys
import traceback import traceback
@ -46,23 +47,27 @@ class QueueProcess(object):
args = (self._get_producer, self._queues, self._chunk_size, self._args) args = (self._get_producer, self._queues, self._chunk_size, self._args)
QueueProcess.run_process(_run, args, finished=self._finished) QueueProcess.run_process(_run, args, finished=self._finished)
QueueResult = namedtuple('QueueResult', ['data', 'exception'])
def _run(get_producer, queues, chunk_size, args): def _run(get_producer, queues, chunk_size, args):
producer = get_producer(*args) producer = get_producer(*args)
while True: while True:
try: try:
data = producer(chunk_size) or None result = QueueResult(producer(chunk_size) or None, None)
except Exception as ex: except Exception as ex:
message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info()))) message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info())))
data = Exception(message) result = QueueResult(None, Exception(message))
for queue in queues: for queue in queues:
try: try:
queue.put(data, block=True) queue.put(result, block=True)
except Exception as ex: except Exception as ex:
logger.exception('Exception writing to queue.') logger.exception('Exception writing to queue.')
return return
if data is None or isinstance(data, Exception): # Terminate the producer loop if the data produced is empty or an exception occurred.
if result.data is None or result.exception is not None:
break break
# Important! This allows the thread that writes the queue data to the pipe # Important! This allows the thread that writes the queue data to the pipe