import unittest from StringIO import StringIO from util.registry.queueprocess import QueueResult from util.registry.queuefile import QueueFile class FakeQueue(object): def __init__(self): self.items = [] def get(self, block): return self.items.pop(0) def put(self, data): self.items.append(data) class TestQueueFile(unittest.TestCase): def test_basic(self): queue = FakeQueue() queue.put(QueueResult('hello world', None)) queue.put(QueueResult('! how goes there?', None)) queue.put(QueueResult(None, None)) queuefile = QueueFile(queue) self.assertEquals(queuefile.read(), 'hello world! how goes there?') def test_chunk_reading(self): queue = FakeQueue() queue.put(QueueResult('hello world', None)) queue.put(QueueResult('! how goes there?', None)) queue.put(QueueResult(None, None)) queuefile = QueueFile(queue) data = '' while True: result = queuefile.read(size=2) if not result: break data += result self.assertEquals(data, 'hello world! how goes there?') def test_unhandled_exception(self): queue = FakeQueue() queue.put(QueueResult('hello world', None)) queue.put(QueueResult(None, IOError('some exception'))) queue.put(QueueResult('! how goes there?', None)) queue.put(QueueResult(None, None)) queuefile = QueueFile(queue) with self.assertRaises(IOError): queuefile.read(size=12) def test_handled_exception(self): queue = FakeQueue() queue.put(QueueResult('hello world', None)) queue.put(QueueResult(None, IOError('some exception'))) queue.put(QueueResult('! how goes there?', None)) queue.put(QueueResult(None, None)) ex_found = [None] def handler(ex): ex_found[0] = ex queuefile = QueueFile(queue) queuefile.add_exception_handler(handler) queuefile.read(size=12) self.assertIsNotNone(ex_found[0]) if __name__ == '__main__': unittest.main()