Merge pull request #1328 from coreos-inc/queuefilefix

Fix QueueFile to support read-to-end semantics and add some tests
This commit is contained in:
josephschorr 2016-04-08 18:07:06 -04:00
commit affb600423
3 changed files with 121 additions and 16 deletions

79
test/test_queuefile.py Normal file
View file

@ -0,0 +1,79 @@
import unittest
from StringIO import StringIO
from util.registry.queueprocess import QueueResult
from util.registry.queuefile import QueueFile
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self, block):
return self.items.pop(0)
def put(self, data):
self.items.append(data)
class TestQueueFile(unittest.TestCase):
def test_basic(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
self.assertEquals(queuefile.read(), 'hello world! how goes there?')
def test_chunk_reading(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
data = ''
while True:
result = queuefile.read(size=2)
if not result:
break
data += result
self.assertEquals(data, 'hello world! how goes there?')
def test_unhandled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
with self.assertRaises(IOError):
queuefile.read(size=12)
def test_handled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
ex_found = [None]
def handler(ex):
ex_found[0] = ex
queuefile = QueueFile(queue)
queuefile.add_exception_handler(handler)
queuefile.read(size=12)
self.assertIsNotNone(ex_found[0])
if __name__ == '__main__':
unittest.main()

View file

@ -1,5 +1,5 @@
class QueueFile(object):
""" Class which implements a file-like interface and reads from a blocking
""" Class which implements a file-like interface and reads QueueResult's from a blocking
multiprocessing queue.
"""
def __init__(self, queue, name=None):
@ -15,40 +15,61 @@ class QueueFile(object):
def add_exception_handler(self, handler):
self._exception_handlers.append(handler)
def read(self, size=8192):
def read(self, size=-1):
# If the queuefile was closed or we have finished, send back any remaining data.
if self._closed or self._done:
if size == -1:
buf = self._buffer
self._buffer = ''
return buf
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
while len(self._buffer) < size:
# Loop until we reach the requested data size (or forever if all data was requested).
while (len(self._buffer) < size) or (size == -1):
result = self._queue.get(block=True)
if result is None:
self._done = True
break
if isinstance(result, Exception):
# Check for any exceptions raised by the queue process.
if result.exception is not None:
self._closed = True
self.raised_exception = True
# Fire off the exception to any registered handlers. If no handlers were registered,
# then raise the exception locally.
handled = False
for handler in self._exception_handlers:
handler(result)
handler(result.exception)
handled = True
if handled:
return
else:
raise result.exception
raise result
# Check for no further data. If the QueueProcess has finished producing data, then break
# out of the loop to return the data already acquired.
if result.data is None:
self._done = True
break
self._buffer += result
self._total_size += len(result)
# Add the data to the buffer.
self._buffer += result.data
self._total_size += len(result.data)
# Return the requested slice of the buffer.
if size == -1:
buf = self._buffer
self._buffer = ''
return buf
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
def flush(self):
# Purposefully not implemented.
pass
def close(self):

View file

@ -1,7 +1,8 @@
from multiprocessing import Process, Queue
from collections import namedtuple
import logging
import multiprocessing
import os
import time
import sys
import traceback
@ -46,23 +47,27 @@ class QueueProcess(object):
args = (self._get_producer, self._queues, self._chunk_size, self._args)
QueueProcess.run_process(_run, args, finished=self._finished)
QueueResult = namedtuple('QueueResult', ['data', 'exception'])
def _run(get_producer, queues, chunk_size, args):
producer = get_producer(*args)
while True:
try:
data = producer(chunk_size) or None
result = QueueResult(producer(chunk_size) or None, None)
except Exception as ex:
message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info())))
data = Exception(message)
result = QueueResult(None, Exception(message))
for queue in queues:
try:
queue.put(data, block=True)
queue.put(result, block=True)
except Exception as ex:
logger.exception('Exception writing to queue.')
return
if data is None or isinstance(data, Exception):
# Terminate the producer loop if the data produced is empty or an exception occurred.
if result.data is None or result.exception is not None:
break
# Important! This allows the thread that writes the queue data to the pipe