Merge pull request #570 from coreos-inc/python-registry-v2-filelike

Add full unit tests for the file-like objects and fix them
This commit is contained in:
josephschorr 2015-09-30 15:17:24 -04:00
commit 21462388e2
3 changed files with 191 additions and 34 deletions

View file

@ -305,7 +305,7 @@ class SwiftStorage(BaseStorage):
self.stream_write(segment_path, limiting_fp)
# We are only going to track keys to which data was confirmed written.
bytes_written = limiting_fp.byte_count_read
bytes_written = limiting_fp.tell()
if bytes_written > 0:
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))

126
test/test_filelike.py Normal file
View file

@ -0,0 +1,126 @@
import unittest
from StringIO import StringIO
from util.registry.filelike import FilelikeStreamConcat, LimitingStream, StreamSlice
class TestFilelikeStreamConcat(unittest.TestCase):
def somegenerator(self):
yield 'some'
yield 'cool'
yield 'file-contents'
def test_parts(self):
gens = iter([StringIO(s) for s in self.somegenerator()])
fileobj = FilelikeStreamConcat(gens)
self.assertEquals('so', fileobj.read(2))
self.assertEquals('mec', fileobj.read(3))
self.assertEquals('oolfile', fileobj.read(7))
self.assertEquals('-contents', fileobj.read(-1))
def test_entire(self):
gens = iter([StringIO(s) for s in self.somegenerator()])
fileobj = FilelikeStreamConcat(gens)
self.assertEquals('somecoolfile-contents', fileobj.read(-1))
class TestLimitingStream(unittest.TestCase):
def test_nolimit(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj)
self.assertEquals('this is a cool test', stream.read(-1))
self.assertEquals(stream.tell(), len('this is a cool test'))
def test_simplelimit(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 4)
self.assertEquals('this', stream.read(-1))
self.assertEquals(stream.tell(), 4)
def test_simplelimit_readdefined(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 4)
self.assertEquals('th', stream.read(2))
self.assertEquals(stream.tell(), 2)
def test_nolimit_readdefined(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, -1)
self.assertEquals('th', stream.read(2))
self.assertEquals(stream.tell(), 2)
def test_limit_multiread(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 7)
self.assertEquals('this', stream.read(4))
self.assertEquals(' is', stream.read(3))
self.assertEquals('', stream.read(2))
self.assertEquals(stream.tell(), 7)
def test_limit_multiread2(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 7)
self.assertEquals('this', stream.read(4))
self.assertEquals(' is', stream.read(-1))
self.assertEquals(stream.tell(), 7)
def test_seek(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj)
stream.seek(2)
self.assertEquals('is', stream.read(2))
self.assertEquals(stream.tell(), 4)
def test_seek_withlimit(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 3)
stream.seek(2)
self.assertEquals('i', stream.read(2))
self.assertEquals(stream.tell(), 3)
def test_seek_pastlimit(self):
fileobj = StringIO('this is a cool test')
stream = LimitingStream(fileobj, 3)
stream.seek(4)
self.assertEquals('', stream.read(1))
self.assertEquals(stream.tell(), 3)
class TestStreamSlice(unittest.TestCase):
def test_noslice(self):
fileobj = StringIO('this is a cool test')
stream = StreamSlice(fileobj, 0)
self.assertEquals('this is a cool test', stream.read(-1))
self.assertEquals(stream.tell(), len('this is a cool test'))
def test_startindex(self):
fileobj = StringIO('this is a cool test')
stream = StreamSlice(fileobj, 5)
self.assertEquals('is a cool test', stream.read(-1))
self.assertEquals(stream.tell(), len('is a cool test'))
def test_startindex_limitedread(self):
fileobj = StringIO('this is a cool test')
stream = StreamSlice(fileobj, 5)
self.assertEquals('is a', stream.read(4))
self.assertEquals(stream.tell(), 4)
def test_slice(self):
fileobj = StringIO('this is a cool test')
stream = StreamSlice(fileobj, 5, 9)
self.assertEquals('is a', stream.read(-1))
self.assertEquals(stream.tell(), len('is a'))
def test_slice_explictread(self):
fileobj = StringIO('this is a cool test')
stream = StreamSlice(fileobj, 5, 9)
self.assertEquals('is', stream.read(2))
self.assertEquals(' a', stream.read(5))
self.assertEquals(stream.tell(), len('is a'))
if __name__ == '__main__':
unittest.main()

View file

@ -36,12 +36,16 @@ class BaseStreamFilelike(object):
elif whence == WHENCE_RELATIVE_END:
raise IOError('Stream does not have a known end point')
bytes_forward = num_bytes_to_ff
while num_bytes_to_ff > 0:
buf = self._fileobj.read(num_bytes_to_ff)
if not buf:
raise IOError('Seek past end of file')
num_bytes_to_ff -= len(buf)
self._cursor_position += bytes_forward
return bytes_forward
class SocketReader(BaseStreamFilelike):
def __init__(self, fileobj):
@ -64,52 +68,54 @@ def wrap_with_handler(in_fp, handler):
return wrapper
class FilelikeStreamConcat(BaseStreamFilelike):
class FilelikeStreamConcat(object):
""" A file-like object which concats all the file-like objects in the specified generator into
a single stream.
"""
def __init__(self, file_generator):
super(FilelikeStreamConcat, self).__init__(self)
self._file_generator = file_generator
self._current_file = file_generator.next()
self._current_position = 0
self._closed = False
def tell(self):
return self._current_position
def close(self):
self._closed = True
def read(self, size=READ_UNTIL_END):
buf = self._current_file.read(size)
if buf:
self._cursor_position += len(buf)
return buf
buf = ''
current_size = size
# That file was out of data, prime a new one
self._current_file.close()
try:
self._current_file = self._file_generator.next()
except StopIteration:
return ''
return self.read(size)
while size == READ_UNTIL_END or len(buf) < size:
current_buf = self._current_file.read(current_size)
if current_buf:
buf += current_buf
self._current_position += len(current_buf)
if size != READ_UNTIL_END:
current_size -= len(current_buf)
else:
# That file was out of data, prime a new one
self._current_file.close()
try:
self._current_file = self._file_generator.next()
except StopIteration:
return buf
class LimitingStream(BaseStreamFilelike):
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
super(LimitingStream, self).__init__(fileobj)
self._read_limit = read_limit
self.byte_count_read = 0
def read(self, size=READ_UNTIL_END):
max_bytes_to_read = -1
# If a read limit is specified, then determine the maximum number of bytes to return.
if self._read_limit != READ_UNTIL_END:
if size == READ_UNTIL_END:
size = self._read_limit
max_bytes_to_read = min(self._read_limit - self.byte_count_read, size)
byte_data_read = super(LimitingStream, self).read(max_bytes_to_read)
self.byte_count_read = self.byte_count_read + len(byte_data_read)
return byte_data_read
return buf
class StreamSlice(BaseStreamFilelike):
""" A file-like object which returns a file-like object that represents a slice of the data in
the specified file obj. All methods will act as if the slice is its own file.
"""
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
super(StreamSlice, self).__init__(fileobj)
self._end_offset_exclusive = end_offset_exclusive
self._start_offset = start_offset
if start_offset > 0:
self.seek(start_offset)
@ -120,8 +126,33 @@ class StreamSlice(BaseStreamFilelike):
return super(StreamSlice, self).read(size)
# Compute the max bytes to read until the end or until we reach the user requested max
max_bytes_to_read = self._end_offset_exclusive - self.tell()
max_bytes_to_read = self._end_offset_exclusive - super(StreamSlice, self).tell()
if size != READ_UNTIL_END:
max_bytes_to_read = min(max_bytes_to_read, size)
return super(StreamSlice, self).read(max_bytes_to_read)
def _file_min(self, first, second):
if first == READ_UNTIL_END:
return second
if second == READ_UNTIL_END:
return first
return min(first, second)
def tell(self):
return super(StreamSlice, self).tell() - self._start_offset
def seek(self, index, whence=WHENCE_ABSOLUTE):
index = self._file_min(self._end_offset_exclusive, index)
super(StreamSlice, self).seek(index, whence)
class LimitingStream(StreamSlice):
""" A file-like object which mimics the specified file stream being limited to the given number
of bytes. All calls after that limit (if specified) will act as if the file has no additional
data.
"""
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
super(LimitingStream, self).__init__(fileobj, 0, read_limit)