This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_queuefile.py

79 lines
1.9 KiB
Python

import unittest
from StringIO import StringIO
from util.registry.queueprocess import QueueResult
from util.registry.queuefile import QueueFile
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self, block):
return self.items.pop(0)
def put(self, data):
self.items.append(data)
class TestQueueFile(unittest.TestCase):
def test_basic(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
self.assertEquals(queuefile.read(), 'hello world! how goes there?')
def test_chunk_reading(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
data = ''
while True:
result = queuefile.read(size=2)
if not result:
break
data += result
self.assertEquals(data, 'hello world! how goes there?')
def test_unhandled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
queuefile = QueueFile(queue)
with self.assertRaises(IOError):
queuefile.read(size=12)
def test_handled_exception(self):
queue = FakeQueue()
queue.put(QueueResult('hello world', None))
queue.put(QueueResult(None, IOError('some exception')))
queue.put(QueueResult('! how goes there?', None))
queue.put(QueueResult(None, None))
ex_found = [None]
def handler(ex):
ex_found[0] = ex
queuefile = QueueFile(queue)
queuefile.add_exception_handler(handler)
queuefile.read(size=12)
self.assertIsNotNone(ex_found[0])
if __name__ == '__main__':
unittest.main()