diff --git a/storage/cloud.py b/storage/cloud.py index 1777c1fb2..35ec743d7 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -3,6 +3,8 @@ import os import logging import copy +from itertools import chain + from boto.exception import S3ResponseError import boto.s3.connection import boto.s3.multipart @@ -51,7 +53,8 @@ class _CloudStorage(BaseStorageV2): storage_path, bucket_name, access_key=None, secret_key=None): super(_CloudStorage, self).__init__() - self.automatic_chunk_size = 5 * 1024 * 1024 + self.minimum_chunk_size = 5 * 1024 * 1024 + self.maximum_chunk_size = None self._initialized = False self._bucket_name = bucket_name @@ -184,7 +187,7 @@ class _CloudStorage(BaseStorageV2): num_part = 1 total_bytes_written = 0 while size == filelike.READ_UNTIL_END or total_bytes_written < size: - bytes_to_copy = self.automatic_chunk_size + bytes_to_copy = self.minimum_chunk_size if size != filelike.READ_UNTIL_END: # We never want to ask for more bytes than our caller has indicated to copy bytes_to_copy = min(bytes_to_copy, size - total_bytes_written) @@ -364,6 +367,22 @@ class _CloudStorage(BaseStorageV2): logger.exception('Exception trying to perform action %s', action) raise s3re + @staticmethod + def _rechunk(chunk, max_chunk_size): + """ Rechunks the chunk list to meet maximum chunk size restrictions for the storage engine. """ + if max_chunk_size is None or chunk.length <= max_chunk_size: + yield chunk + else: + newchunk_length = chunk.length / 2 + first_subchunk = _PartUploadMetadata(chunk.path, chunk.offset, newchunk_length) + second_subchunk = _PartUploadMetadata(chunk.path, + chunk.offset + newchunk_length, + chunk.length - newchunk_length) + for subchunk in chain(_CloudStorage._rechunk(first_subchunk, max_chunk_size), + _CloudStorage._rechunk(second_subchunk, max_chunk_size)): + yield subchunk + + def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False): self._initialize_cloud_conn() chunk_list = self._chunk_list_from_metadata(storage_metadata) @@ -375,7 +394,7 @@ class _CloudStorage(BaseStorageV2): server_side_assembly = True for chunk_offset, chunk in enumerate(chunk_list): # If the chunk is both too small, and not the last chunk, we rule out server side assembly - if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list): + if chunk.length < self.minimum_chunk_size and (chunk_offset + 1) < len(chunk_list): server_side_assembly = False break @@ -385,14 +404,14 @@ class _CloudStorage(BaseStorageV2): # Awesome, we can do this completely server side, now we have to start a new multipart # upload and use copy_part_from_key to set all of the chunks. mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None) + updated_chunks = chain.from_iterable([_CloudStorage._rechunk(c, self.maximum_chunk_size) + for c in chunk_list]) - for chunk_offset, chunk in enumerate(chunk_list): + for index, chunk in enumerate(updated_chunks): abs_chunk_path = self._init_path(chunk.path) - part_num = chunk_offset + 1 - chunk_end_offset_inclusive = chunk.length - 1 self._perform_action_with_retry(mpu.copy_part_from_key, self.get_cloud_bucket().name, - abs_chunk_path, part_num, start=0, - end=chunk_end_offset_inclusive) + abs_chunk_path, index + 1, start=chunk.offset, + end=chunk.length + chunk.offset - 1) self._perform_action_with_retry(mpu.complete_upload) except IOError as ioe: @@ -433,6 +452,8 @@ class S3Storage(_CloudStorage): access_key=s3_access_key or None, secret_key=s3_secret_key or None) + self.maximum_chunk_size = 5 * 1024 * 1024 * 1024 # 5GB. + def setup(self): self.get_cloud_bucket().set_cors_xml(""" diff --git a/test/test_cloud_storage.py b/test/test_cloud_storage.py index 957e56c58..549207027 100644 --- a/test/test_cloud_storage.py +++ b/test/test_cloud_storage.py @@ -4,6 +4,7 @@ import boto import os from storage import S3Storage +from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CHUNKS_KEY from StringIO import StringIO @@ -140,6 +141,88 @@ class TestCloudStorage(unittest.TestCase): for chunk in metadata[_CHUNKS_KEY]: self.assertFalse(self.engine.exists(chunk.path)) + def test_large_chunks_upload(self): + # Make the max chunk size much smaller for testing. + self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + + upload_id, metadata = self.engine.initiate_chunked_upload() + + # Write a "super large" chunk, to ensure that it is broken into smaller chunks. + chunk_data = os.urandom(int(self.engine.maximum_chunk_size * 2.5)) + bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + self.assertEquals(bytes_written, len(chunk_data)) + + # Complete the chunked upload. + self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(chunk_data)) + self.assertEquals(chunk_data, self.engine.get_content('some/chunked/path')) + + def test_large_chunks_with_ragged_edge(self): + # Make the max chunk size much smaller for testing and force it to have a ragged edge. + self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + 10 + + upload_id, metadata = self.engine.initiate_chunked_upload() + + # Write a few "super large" chunks, to ensure that it is broken into smaller chunks. + all_data = '' + for _ in range(0, 2): + chunk_data = os.urandom(int(self.engine.maximum_chunk_size) + 20) + bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + self.assertEquals(bytes_written, len(chunk_data)) + all_data = all_data + chunk_data + metadata = new_metadata + + # Complete the chunked upload. + self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(all_data)) + self.assertEquals(all_data, self.engine.get_content('some/chunked/path')) + + def assertRechunked(self, chunk, max_size, *args): + rechunked = list(_CloudStorage._rechunk(chunk, max_size)) + self.assertEquals(len(rechunked), len(args), rechunked) + for index, chunk in enumerate(rechunked): + self.assertEquals(args[index], chunk) + + def test_rechunking(self): + chunk = _PartUploadMetadata('foo', 0, 100) + + self.assertRechunked(chunk, 50, + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50)) + + self.assertRechunked(chunk, 40, + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25)) + + self.assertRechunked(chunk, 51, + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50)) + + self.assertRechunked(chunk, 49, + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25)) + + self.assertRechunked(chunk, 99, + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50)) + + self.assertRechunked(chunk, 100, + _PartUploadMetadata('foo', 0, 100)) + if __name__ == '__main__': unittest.main()