diff --git a/storage/swift.py b/storage/swift.py index 742e7851c..77cacd4ed 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -7,6 +7,7 @@ import copy import hmac import string import logging +from _pyio import BufferedReader from uuid import uuid4 from swiftclient.client import Connection, ClientException @@ -26,7 +27,7 @@ logger = logging.getLogger(__name__) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _SEGMENTS_KEY = 'segments' _SEGMENT_DIRECTORY = 'segments' -_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB +_MAXIMUM_SEGMENT_SIZE = 200000000 # ~200 MB _DEFAULT_SWIFT_CONNECT_TIMEOUT = 5 # seconds class SwiftStorage(BaseStorage): @@ -240,6 +241,7 @@ class SwiftStorage(BaseStorage): # are finished hitting the data limit. total_bytes_written = 0 upload_error = None + read_until_end = length == filelike.READ_UNTIL_END while True: try: @@ -253,12 +255,12 @@ class SwiftStorage(BaseStorage): upload_error = ex break - if length != filelike.READ_UNTIL_END: + if not read_until_end: length = length - bytes_written offset = offset + bytes_written total_bytes_written = total_bytes_written + bytes_written - if bytes_written == 0 or length <= 0: + if bytes_written == 0 or (not read_until_end and length <= 0): return total_bytes_written, storage_metadata, upload_error return total_bytes_written, storage_metadata, upload_error @@ -277,6 +279,11 @@ class SwiftStorage(BaseStorage): limiting_fp = filelike.LimitingStream(in_fp, length) + # If retries are requested, then we need to use a buffered reader to allow for calls to + # seek() on retries from within the Swift client. + if self._retry_count > 0: + limiting_fp = BufferedReader(limiting_fp, buffer_size=length) + # Write the segment to Swift. self.stream_write(segment_path, limiting_fp, content_type) diff --git a/test/test_filelike.py b/test/test_filelike.py index 98bb02370..b3870a54f 100644 --- a/test/test_filelike.py +++ b/test/test_filelike.py @@ -88,6 +88,14 @@ class TestLimitingStream(unittest.TestCase): self.assertEquals('', stream.read(1)) self.assertEquals(stream.tell(), 3) + def test_seek_to_tell(self): + fileobj = StringIO('this is a cool test') + stream = LimitingStream(fileobj, 3) + stream.seek(stream.tell()) + + self.assertEquals('thi', stream.read(4)) + self.assertEquals(stream.tell(), 3) + class TestStreamSlice(unittest.TestCase): def test_none_read(self): diff --git a/test/test_queuefile.py b/test/test_queuefile.py index 9e5b4a194..340aac63b 100644 --- a/test/test_queuefile.py +++ b/test/test_queuefile.py @@ -1,8 +1,6 @@ import unittest import os -from StringIO import StringIO - from util.registry.queueprocess import QueueResult from util.registry.queuefile import QueueFile