From abe43a0e07b0a1c77148dc3e9f7d574d2e600a44 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 30 Sep 2015 15:11:28 -0400 Subject: [PATCH 1/3] override upload_chunk_complete for RadosGW RadosGW doesn't support server-side copy of keys into multipart, so we have to always join it on the local side. --- storage/cloud.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/storage/cloud.py b/storage/cloud.py index 3c28480dc..dd8cd2256 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -342,8 +342,8 @@ class _CloudStorage(BaseStorageV2): abs_chunk_path = self._init_path(chunk.path) part_num = chunk_offset + 1 chunk_end_offset_inclusive = chunk.length - 1 - mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, 0, - chunk_end_offset_inclusive) + mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, + start=0, end=chunk_end_offset_inclusive) mpu.complete_upload() except IOError as ioe: @@ -354,10 +354,9 @@ class _CloudStorage(BaseStorageV2): raise ioe else: - logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path) - # We are going to turn all of the server side objects into a single file-like stream, and # pass that to stream_write to chunk and upload the final object. + logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path) concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) self.stream_write(final_path, concatenated) @@ -472,3 +471,19 @@ class RadosGWStorage(_CloudStorage): return None return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors) + + def complete_chunked_upload(self, uuid, final_path, storage_metadata): + self._initialize_cloud_conn() + + # RadosGW does not support multipart copying from keys, so are forced to join + # it all locally and then reupload. + # See https://github.com/ceph/ceph/pull/5139 + chunk_list = self._chunk_list_from_metadata(storage_metadata) + if len(chunk_list) == 1: + # If there's only one chunk, just "move" the key and call it a day. + chunk_path = chunk_list[0].path + self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path) + self._cloud_bucket.delete_key(chunk_path) + return + concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) + self.stream_write(final_path, concatenated) From ffeb99d4eebe8f186ecc73fda15bef5c2148bf92 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 30 Sep 2015 16:24:04 -0400 Subject: [PATCH 2/3] BaseStreamFileLike: handle reads that return None Fixes #555. --- test/test_filelike.py | 11 ++++++++++- util/registry/filelike.py | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/test/test_filelike.py b/test/test_filelike.py index beff7d626..98bb02370 100644 --- a/test/test_filelike.py +++ b/test/test_filelike.py @@ -90,6 +90,15 @@ class TestLimitingStream(unittest.TestCase): class TestStreamSlice(unittest.TestCase): + def test_none_read(self): + class NoneReader(object): + def read(self, size=None): + return None + + stream = StreamSlice(NoneReader(), 0) + self.assertEquals(None, stream.read(-1)) + self.assertEquals(0, stream.tell()) + def test_noslice(self): fileobj = StringIO('this is a cool test') stream = StreamSlice(fileobj, 0) @@ -123,4 +132,4 @@ class TestStreamSlice(unittest.TestCase): if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/util/registry/filelike.py b/util/registry/filelike.py index 0ace55adc..97555a23f 100644 --- a/util/registry/filelike.py +++ b/util/registry/filelike.py @@ -15,6 +15,8 @@ class BaseStreamFilelike(object): def read(self, size=READ_UNTIL_END): buf = self._fileobj.read(size) + if buf is None: + return None self._cursor_position += len(buf) return buf From 6ed5087a3c27a7f33d59c446bde9e8636de61a0a Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 30 Sep 2015 17:46:22 -0400 Subject: [PATCH 3/3] add client side chunk join method --- storage/cloud.py | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/storage/cloud.py b/storage/cloud.py index dd8cd2256..2e90c05d1 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -318,6 +318,34 @@ class _CloudStorage(BaseStorageV2): def _chunk_list_from_metadata(storage_metadata): return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]] + def _client_side_chunk_join(self, final_path, chunk_list): + # If there's only one chunk, just "move" (copy and delete) the key and call it a day. + if len(chunk_list) == 1: + chunk_path = chunk_list[0].path + # Let the copy raise an exception if it fails. + self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path) + + # Attempt to clean up the old chunk. + try: + self._cloud_bucket.delete_key(chunk_path) + except IOError: + # We failed to delete a chunk. This sucks, but we shouldn't fail the push. + msg = 'Failed to clean up chunk %s for move of %s' + logger.exception(msg, chunk_path, final_path) + else: + # Concatenate and write all the chunks as one key. + concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) + self.stream_write(final_path, concatenated) + + # Attempt to clean up all the chunks. + for chunk in chunk_list: + try: + self._cloud_bucket.delete_key(chunk.path) + except IOError: + # We failed to delete a chunk. This sucks, but we shouldn't fail the push. + msg = 'Failed to clean up chunk %s for reupload of %s' + logger.exception(msg, chunk.path, final_path) + def complete_chunked_upload(self, uuid, final_path, storage_metadata): self._initialize_cloud_conn() @@ -356,9 +384,7 @@ class _CloudStorage(BaseStorageV2): else: # We are going to turn all of the server side objects into a single file-like stream, and # pass that to stream_write to chunk and upload the final object. - logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path) - concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) - self.stream_write(final_path, concatenated) + self._client_side_chunk_join(final_path, chunk_list) def cancel_chunked_upload(self, uuid, storage_metadata): @@ -475,15 +501,8 @@ class RadosGWStorage(_CloudStorage): def complete_chunked_upload(self, uuid, final_path, storage_metadata): self._initialize_cloud_conn() - # RadosGW does not support multipart copying from keys, so are forced to join + # RadosGW does not support multipart copying from keys, so we are forced to join # it all locally and then reupload. # See https://github.com/ceph/ceph/pull/5139 chunk_list = self._chunk_list_from_metadata(storage_metadata) - if len(chunk_list) == 1: - # If there's only one chunk, just "move" the key and call it a day. - chunk_path = chunk_list[0].path - self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path) - self._cloud_bucket.delete_key(chunk_path) - return - concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) - self.stream_write(final_path, concatenated) + self._client_side_chunk_join(final_path, chunk_list)