Merge pull request #3169 from quay/move-queuefile-tests
Move queuefile tests to pytest
This commit is contained in:
commit
cb5fa0cefc
2 changed files with 112 additions and 116 deletions
|
@ -1,116 +0,0 @@
|
|||
import unittest
|
||||
import os
|
||||
|
||||
from util.registry.queueprocess import QueueResult
|
||||
from util.registry.queuefile import QueueFile
|
||||
|
||||
class FakeQueue(object):
|
||||
def __init__(self):
|
||||
self.items = []
|
||||
|
||||
def get(self, block):
|
||||
return self.items.pop(0)
|
||||
|
||||
def put(self, data):
|
||||
self.items.append(data)
|
||||
|
||||
|
||||
class TestQueueFile(unittest.TestCase):
|
||||
def test_basic(self):
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
self.assertEquals(queuefile.read(), 'hello world! how goes there?')
|
||||
|
||||
def test_chunk_reading(self):
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
data = ''
|
||||
|
||||
while True:
|
||||
result = queuefile.read(size=2)
|
||||
if not result:
|
||||
break
|
||||
|
||||
data += result
|
||||
|
||||
self.assertEquals(data, 'hello world! how goes there?')
|
||||
|
||||
def test_unhandled_exception(self):
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult(None, IOError('some exception')))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
queuefile.read(size=12)
|
||||
|
||||
def test_handled_exception(self):
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult(None, IOError('some exception')))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
ex_found = [None]
|
||||
|
||||
def handler(ex):
|
||||
ex_found[0] = ex
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
queuefile.add_exception_handler(handler)
|
||||
queuefile.read(size=12)
|
||||
|
||||
self.assertIsNotNone(ex_found[0])
|
||||
|
||||
def test_binary_data(self):
|
||||
queue = FakeQueue()
|
||||
|
||||
# Generate some binary data.
|
||||
binary_data = os.urandom(1024)
|
||||
queue.put(QueueResult(binary_data, None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
found_data = ''
|
||||
while True:
|
||||
current_data = queuefile.read(size=37)
|
||||
if len(current_data) == 0:
|
||||
break
|
||||
|
||||
found_data = found_data + current_data
|
||||
|
||||
self.assertEquals(found_data, binary_data)
|
||||
|
||||
def test_empty_data(self):
|
||||
queue = FakeQueue()
|
||||
|
||||
# Generate some empty binary data.
|
||||
binary_data = '\0' * 1024
|
||||
queue.put(QueueResult(binary_data, None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
found_data = ''
|
||||
while True:
|
||||
current_data = queuefile.read(size=37)
|
||||
if len(current_data) == 0:
|
||||
break
|
||||
|
||||
found_data = found_data + current_data
|
||||
|
||||
self.assertEquals(found_data, binary_data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
112
util/registry/test/test_queuefile.py
Normal file
112
util/registry/test/test_queuefile.py
Normal file
|
@ -0,0 +1,112 @@
|
|||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from util.registry.queueprocess import QueueResult
|
||||
from util.registry.queuefile import QueueFile
|
||||
|
||||
class FakeQueue(object):
|
||||
def __init__(self):
|
||||
self.items = []
|
||||
|
||||
def get(self, block):
|
||||
return self.items.pop(0)
|
||||
|
||||
def put(self, data):
|
||||
self.items.append(data)
|
||||
|
||||
|
||||
def test_basic():
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
assert queuefile.read() == 'hello world! how goes there?'
|
||||
|
||||
def test_chunk_reading():
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
data = ''
|
||||
|
||||
while True:
|
||||
result = queuefile.read(size=2)
|
||||
if not result:
|
||||
break
|
||||
|
||||
data += result
|
||||
|
||||
assert data == 'hello world! how goes there?'
|
||||
|
||||
def test_unhandled_exception():
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult(None, IOError('some exception')))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
|
||||
with pytest.raises(IOError):
|
||||
queuefile.read(size=12)
|
||||
|
||||
def test_handled_exception():
|
||||
queue = FakeQueue()
|
||||
queue.put(QueueResult('hello world', None))
|
||||
queue.put(QueueResult(None, IOError('some exception')))
|
||||
queue.put(QueueResult('! how goes there?', None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
ex_found = [None]
|
||||
|
||||
def handler(ex):
|
||||
ex_found[0] = ex
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
queuefile.add_exception_handler(handler)
|
||||
queuefile.read(size=12)
|
||||
|
||||
assert ex_found[0] is not None
|
||||
|
||||
def test_binary_data():
|
||||
queue = FakeQueue()
|
||||
|
||||
# Generate some binary data.
|
||||
binary_data = os.urandom(1024)
|
||||
queue.put(QueueResult(binary_data, None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
found_data = ''
|
||||
while True:
|
||||
current_data = queuefile.read(size=37)
|
||||
if len(current_data) == 0:
|
||||
break
|
||||
|
||||
found_data = found_data + current_data
|
||||
|
||||
assert found_data == binary_data
|
||||
|
||||
def test_empty_data():
|
||||
queue = FakeQueue()
|
||||
|
||||
# Generate some empty binary data.
|
||||
binary_data = '\0' * 1024
|
||||
queue.put(QueueResult(binary_data, None))
|
||||
queue.put(QueueResult(None, None))
|
||||
|
||||
queuefile = QueueFile(queue)
|
||||
found_data = ''
|
||||
while True:
|
||||
current_data = queuefile.read(size=37)
|
||||
if len(current_data) == 0:
|
||||
break
|
||||
|
||||
found_data = found_data + current_data
|
||||
|
||||
assert found_data == binary_data
|
Reference in a new issue