Make sure we don't generate chunk sizes larger than 5 GB.
Amazon S3 does not allow for chunk sizes larger than 5 GB; we currently don't handle that case at all, which is why large uploads are failing. This change ensures that if a storage engine specifies a *maximum* chunk size, we write multiple chunks no larger than that size.
This commit is contained in:
parent
a74e94fb67
commit
bfe2646a50
2 changed files with 112 additions and 8 deletions
|
@ -3,6 +3,8 @@ import os
|
|||
import logging
|
||||
import copy
|
||||
|
||||
from itertools import chain
|
||||
|
||||
from boto.exception import S3ResponseError
|
||||
import boto.s3.connection
|
||||
import boto.s3.multipart
|
||||
|
@ -51,7 +53,8 @@ class _CloudStorage(BaseStorageV2):
|
|||
storage_path, bucket_name, access_key=None, secret_key=None):
|
||||
super(_CloudStorage, self).__init__()
|
||||
|
||||
self.automatic_chunk_size = 5 * 1024 * 1024
|
||||
self.minimum_chunk_size = 5 * 1024 * 1024
|
||||
self.maximum_chunk_size = None
|
||||
|
||||
self._initialized = False
|
||||
self._bucket_name = bucket_name
|
||||
|
@ -184,7 +187,7 @@ class _CloudStorage(BaseStorageV2):
|
|||
num_part = 1
|
||||
total_bytes_written = 0
|
||||
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
|
||||
bytes_to_copy = self.automatic_chunk_size
|
||||
bytes_to_copy = self.minimum_chunk_size
|
||||
if size != filelike.READ_UNTIL_END:
|
||||
# We never want to ask for more bytes than our caller has indicated to copy
|
||||
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
|
||||
|
@ -364,6 +367,22 @@ class _CloudStorage(BaseStorageV2):
|
|||
logger.exception('Exception trying to perform action %s', action)
|
||||
raise s3re
|
||||
|
||||
@staticmethod
|
||||
def _rechunk(chunk, max_chunk_size):
|
||||
""" Rechunks the chunk list to meet maximum chunk size restrictions for the storage engine. """
|
||||
if max_chunk_size is None or chunk.length <= max_chunk_size:
|
||||
yield chunk
|
||||
else:
|
||||
newchunk_length = chunk.length / 2
|
||||
first_subchunk = _PartUploadMetadata(chunk.path, chunk.offset, newchunk_length)
|
||||
second_subchunk = _PartUploadMetadata(chunk.path,
|
||||
chunk.offset + newchunk_length,
|
||||
chunk.length - newchunk_length)
|
||||
for subchunk in chain(_CloudStorage._rechunk(first_subchunk, max_chunk_size),
|
||||
_CloudStorage._rechunk(second_subchunk, max_chunk_size)):
|
||||
yield subchunk
|
||||
|
||||
|
||||
def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False):
|
||||
self._initialize_cloud_conn()
|
||||
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
||||
|
@ -375,7 +394,7 @@ class _CloudStorage(BaseStorageV2):
|
|||
server_side_assembly = True
|
||||
for chunk_offset, chunk in enumerate(chunk_list):
|
||||
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
|
||||
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
||||
if chunk.length < self.minimum_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
||||
server_side_assembly = False
|
||||
break
|
||||
|
||||
|
@ -385,14 +404,14 @@ class _CloudStorage(BaseStorageV2):
|
|||
# Awesome, we can do this completely server side, now we have to start a new multipart
|
||||
# upload and use copy_part_from_key to set all of the chunks.
|
||||
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
|
||||
updated_chunks = chain.from_iterable([_CloudStorage._rechunk(c, self.maximum_chunk_size)
|
||||
for c in chunk_list])
|
||||
|
||||
for chunk_offset, chunk in enumerate(chunk_list):
|
||||
for index, chunk in enumerate(updated_chunks):
|
||||
abs_chunk_path = self._init_path(chunk.path)
|
||||
part_num = chunk_offset + 1
|
||||
chunk_end_offset_inclusive = chunk.length - 1
|
||||
self._perform_action_with_retry(mpu.copy_part_from_key, self.get_cloud_bucket().name,
|
||||
abs_chunk_path, part_num, start=0,
|
||||
end=chunk_end_offset_inclusive)
|
||||
abs_chunk_path, index + 1, start=chunk.offset,
|
||||
end=chunk.length + chunk.offset - 1)
|
||||
|
||||
self._perform_action_with_retry(mpu.complete_upload)
|
||||
except IOError as ioe:
|
||||
|
@ -433,6 +452,8 @@ class S3Storage(_CloudStorage):
|
|||
access_key=s3_access_key or None,
|
||||
secret_key=s3_secret_key or None)
|
||||
|
||||
self.maximum_chunk_size = 5 * 1024 * 1024 * 1024 # 5GB.
|
||||
|
||||
def setup(self):
|
||||
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||
|
|
|
@ -4,6 +4,7 @@ import boto
|
|||
import os
|
||||
|
||||
from storage import S3Storage
|
||||
from storage.cloud import _CloudStorage, _PartUploadMetadata
|
||||
from storage.cloud import _CHUNKS_KEY
|
||||
from StringIO import StringIO
|
||||
|
||||
|
@ -140,6 +141,88 @@ class TestCloudStorage(unittest.TestCase):
|
|||
for chunk in metadata[_CHUNKS_KEY]:
|
||||
self.assertFalse(self.engine.exists(chunk.path))
|
||||
|
||||
def test_large_chunks_upload(self):
|
||||
# Make the max chunk size much smaller for testing.
|
||||
self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2
|
||||
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
|
||||
# Write a "super large" chunk, to ensure that it is broken into smaller chunks.
|
||||
chunk_data = os.urandom(int(self.engine.maximum_chunk_size * 2.5))
|
||||
bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
self.assertEquals(bytes_written, len(chunk_data))
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(chunk_data))
|
||||
self.assertEquals(chunk_data, self.engine.get_content('some/chunked/path'))
|
||||
|
||||
def test_large_chunks_with_ragged_edge(self):
|
||||
# Make the max chunk size much smaller for testing and force it to have a ragged edge.
|
||||
self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + 10
|
||||
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
|
||||
# Write a few "super large" chunks, to ensure that it is broken into smaller chunks.
|
||||
all_data = ''
|
||||
for _ in range(0, 2):
|
||||
chunk_data = os.urandom(int(self.engine.maximum_chunk_size) + 20)
|
||||
bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
self.assertEquals(bytes_written, len(chunk_data))
|
||||
all_data = all_data + chunk_data
|
||||
metadata = new_metadata
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(all_data))
|
||||
self.assertEquals(all_data, self.engine.get_content('some/chunked/path'))
|
||||
|
||||
def assertRechunked(self, chunk, max_size, *args):
|
||||
rechunked = list(_CloudStorage._rechunk(chunk, max_size))
|
||||
self.assertEquals(len(rechunked), len(args), rechunked)
|
||||
for index, chunk in enumerate(rechunked):
|
||||
self.assertEquals(args[index], chunk)
|
||||
|
||||
def test_rechunking(self):
|
||||
chunk = _PartUploadMetadata('foo', 0, 100)
|
||||
|
||||
self.assertRechunked(chunk, 50,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 40,
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25))
|
||||
|
||||
self.assertRechunked(chunk, 51,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 49,
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25))
|
||||
|
||||
self.assertRechunked(chunk, 99,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 100,
|
||||
_PartUploadMetadata('foo', 0, 100))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue