parent
78e8aefd45
commit
a3ebb9028d
3 changed files with 191 additions and 34 deletions
|
@ -305,7 +305,7 @@ class SwiftStorage(BaseStorage):
|
|||
self.stream_write(segment_path, limiting_fp)
|
||||
|
||||
# We are only going to track keys to which data was confirmed written.
|
||||
bytes_written = limiting_fp.byte_count_read
|
||||
bytes_written = limiting_fp.tell()
|
||||
if bytes_written > 0:
|
||||
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
|
||||
bytes_written))
|
||||
|
|
126
test/test_filelike.py
Normal file
126
test/test_filelike.py
Normal file
|
@ -0,0 +1,126 @@
|
|||
import unittest
|
||||
|
||||
from StringIO import StringIO
|
||||
from util.registry.filelike import FilelikeStreamConcat, LimitingStream, StreamSlice
|
||||
|
||||
class TestFilelikeStreamConcat(unittest.TestCase):
|
||||
def somegenerator(self):
|
||||
yield 'some'
|
||||
yield 'cool'
|
||||
yield 'file-contents'
|
||||
|
||||
def test_parts(self):
|
||||
gens = iter([StringIO(s) for s in self.somegenerator()])
|
||||
fileobj = FilelikeStreamConcat(gens)
|
||||
|
||||
self.assertEquals('so', fileobj.read(2))
|
||||
self.assertEquals('mec', fileobj.read(3))
|
||||
self.assertEquals('oolfile', fileobj.read(7))
|
||||
self.assertEquals('-contents', fileobj.read(-1))
|
||||
|
||||
def test_entire(self):
|
||||
gens = iter([StringIO(s) for s in self.somegenerator()])
|
||||
fileobj = FilelikeStreamConcat(gens)
|
||||
self.assertEquals('somecoolfile-contents', fileobj.read(-1))
|
||||
|
||||
|
||||
class TestLimitingStream(unittest.TestCase):
|
||||
def test_nolimit(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj)
|
||||
self.assertEquals('this is a cool test', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), len('this is a cool test'))
|
||||
|
||||
def test_simplelimit(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 4)
|
||||
self.assertEquals('this', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), 4)
|
||||
|
||||
def test_simplelimit_readdefined(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 4)
|
||||
self.assertEquals('th', stream.read(2))
|
||||
self.assertEquals(stream.tell(), 2)
|
||||
|
||||
def test_nolimit_readdefined(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, -1)
|
||||
self.assertEquals('th', stream.read(2))
|
||||
self.assertEquals(stream.tell(), 2)
|
||||
|
||||
def test_limit_multiread(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 7)
|
||||
self.assertEquals('this', stream.read(4))
|
||||
self.assertEquals(' is', stream.read(3))
|
||||
self.assertEquals('', stream.read(2))
|
||||
self.assertEquals(stream.tell(), 7)
|
||||
|
||||
def test_limit_multiread2(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 7)
|
||||
self.assertEquals('this', stream.read(4))
|
||||
self.assertEquals(' is', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), 7)
|
||||
|
||||
def test_seek(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj)
|
||||
stream.seek(2)
|
||||
|
||||
self.assertEquals('is', stream.read(2))
|
||||
self.assertEquals(stream.tell(), 4)
|
||||
|
||||
def test_seek_withlimit(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 3)
|
||||
stream.seek(2)
|
||||
|
||||
self.assertEquals('i', stream.read(2))
|
||||
self.assertEquals(stream.tell(), 3)
|
||||
|
||||
def test_seek_pastlimit(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = LimitingStream(fileobj, 3)
|
||||
stream.seek(4)
|
||||
|
||||
self.assertEquals('', stream.read(1))
|
||||
self.assertEquals(stream.tell(), 3)
|
||||
|
||||
|
||||
class TestStreamSlice(unittest.TestCase):
|
||||
def test_noslice(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = StreamSlice(fileobj, 0)
|
||||
self.assertEquals('this is a cool test', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), len('this is a cool test'))
|
||||
|
||||
def test_startindex(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = StreamSlice(fileobj, 5)
|
||||
self.assertEquals('is a cool test', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), len('is a cool test'))
|
||||
|
||||
def test_startindex_limitedread(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = StreamSlice(fileobj, 5)
|
||||
self.assertEquals('is a', stream.read(4))
|
||||
self.assertEquals(stream.tell(), 4)
|
||||
|
||||
def test_slice(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = StreamSlice(fileobj, 5, 9)
|
||||
self.assertEquals('is a', stream.read(-1))
|
||||
self.assertEquals(stream.tell(), len('is a'))
|
||||
|
||||
def test_slice_explictread(self):
|
||||
fileobj = StringIO('this is a cool test')
|
||||
stream = StreamSlice(fileobj, 5, 9)
|
||||
self.assertEquals('is', stream.read(2))
|
||||
self.assertEquals(' a', stream.read(5))
|
||||
self.assertEquals(stream.tell(), len('is a'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -36,12 +36,16 @@ class BaseStreamFilelike(object):
|
|||
elif whence == WHENCE_RELATIVE_END:
|
||||
raise IOError('Stream does not have a known end point')
|
||||
|
||||
bytes_forward = num_bytes_to_ff
|
||||
while num_bytes_to_ff > 0:
|
||||
buf = self._fileobj.read(num_bytes_to_ff)
|
||||
if not buf:
|
||||
raise IOError('Seek past end of file')
|
||||
num_bytes_to_ff -= len(buf)
|
||||
|
||||
self._cursor_position += bytes_forward
|
||||
return bytes_forward
|
||||
|
||||
|
||||
class SocketReader(BaseStreamFilelike):
|
||||
def __init__(self, fileobj):
|
||||
|
@ -64,52 +68,54 @@ def wrap_with_handler(in_fp, handler):
|
|||
return wrapper
|
||||
|
||||
|
||||
class FilelikeStreamConcat(BaseStreamFilelike):
|
||||
class FilelikeStreamConcat(object):
|
||||
""" A file-like object which concats all the file-like objects in the specified generator into
|
||||
a single stream.
|
||||
"""
|
||||
def __init__(self, file_generator):
|
||||
super(FilelikeStreamConcat, self).__init__(self)
|
||||
self._file_generator = file_generator
|
||||
self._current_file = file_generator.next()
|
||||
self._current_position = 0
|
||||
self._closed = False
|
||||
|
||||
def tell(self):
|
||||
return self._current_position
|
||||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
|
||||
def read(self, size=READ_UNTIL_END):
|
||||
buf = self._current_file.read(size)
|
||||
if buf:
|
||||
self._cursor_position += len(buf)
|
||||
return buf
|
||||
buf = ''
|
||||
current_size = size
|
||||
|
||||
# That file was out of data, prime a new one
|
||||
self._current_file.close()
|
||||
try:
|
||||
self._current_file = self._file_generator.next()
|
||||
except StopIteration:
|
||||
return ''
|
||||
return self.read(size)
|
||||
while size == READ_UNTIL_END or len(buf) < size:
|
||||
current_buf = self._current_file.read(current_size)
|
||||
if current_buf:
|
||||
buf += current_buf
|
||||
self._current_position += len(current_buf)
|
||||
if size != READ_UNTIL_END:
|
||||
current_size -= len(current_buf)
|
||||
|
||||
else:
|
||||
# That file was out of data, prime a new one
|
||||
self._current_file.close()
|
||||
try:
|
||||
self._current_file = self._file_generator.next()
|
||||
except StopIteration:
|
||||
return buf
|
||||
|
||||
class LimitingStream(BaseStreamFilelike):
|
||||
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
|
||||
super(LimitingStream, self).__init__(fileobj)
|
||||
self._read_limit = read_limit
|
||||
self.byte_count_read = 0
|
||||
|
||||
def read(self, size=READ_UNTIL_END):
|
||||
max_bytes_to_read = -1
|
||||
|
||||
# If a read limit is specified, then determine the maximum number of bytes to return.
|
||||
if self._read_limit != READ_UNTIL_END:
|
||||
if size == READ_UNTIL_END:
|
||||
size = self._read_limit
|
||||
|
||||
max_bytes_to_read = min(self._read_limit - self.byte_count_read, size)
|
||||
|
||||
byte_data_read = super(LimitingStream, self).read(max_bytes_to_read)
|
||||
self.byte_count_read = self.byte_count_read + len(byte_data_read)
|
||||
return byte_data_read
|
||||
return buf
|
||||
|
||||
|
||||
class StreamSlice(BaseStreamFilelike):
|
||||
""" A file-like object which returns a file-like object that represents a slice of the data in
|
||||
the specified file obj. All methods will act as if the slice is its own file.
|
||||
"""
|
||||
|
||||
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
|
||||
super(StreamSlice, self).__init__(fileobj)
|
||||
self._end_offset_exclusive = end_offset_exclusive
|
||||
self._start_offset = start_offset
|
||||
|
||||
if start_offset > 0:
|
||||
self.seek(start_offset)
|
||||
|
@ -120,8 +126,33 @@ class StreamSlice(BaseStreamFilelike):
|
|||
return super(StreamSlice, self).read(size)
|
||||
|
||||
# Compute the max bytes to read until the end or until we reach the user requested max
|
||||
max_bytes_to_read = self._end_offset_exclusive - self.tell()
|
||||
max_bytes_to_read = self._end_offset_exclusive - super(StreamSlice, self).tell()
|
||||
if size != READ_UNTIL_END:
|
||||
max_bytes_to_read = min(max_bytes_to_read, size)
|
||||
|
||||
return super(StreamSlice, self).read(max_bytes_to_read)
|
||||
|
||||
def _file_min(self, first, second):
|
||||
if first == READ_UNTIL_END:
|
||||
return second
|
||||
|
||||
if second == READ_UNTIL_END:
|
||||
return first
|
||||
|
||||
return min(first, second)
|
||||
|
||||
def tell(self):
|
||||
return super(StreamSlice, self).tell() - self._start_offset
|
||||
|
||||
def seek(self, index, whence=WHENCE_ABSOLUTE):
|
||||
index = self._file_min(self._end_offset_exclusive, index)
|
||||
super(StreamSlice, self).seek(index, whence)
|
||||
|
||||
|
||||
class LimitingStream(StreamSlice):
|
||||
""" A file-like object which mimics the specified file stream being limited to the given number
|
||||
of bytes. All calls after that limit (if specified) will act as if the file has no additional
|
||||
data.
|
||||
"""
|
||||
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
|
||||
super(LimitingStream, self).__init__(fileobj, 0, read_limit)
|
||||
|
|
Reference in a new issue