commit
9b43699741
3 changed files with 52 additions and 7 deletions
|
@ -318,6 +318,34 @@ class _CloudStorage(BaseStorageV2):
|
||||||
def _chunk_list_from_metadata(storage_metadata):
|
def _chunk_list_from_metadata(storage_metadata):
|
||||||
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
|
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
|
||||||
|
|
||||||
|
def _client_side_chunk_join(self, final_path, chunk_list):
|
||||||
|
# If there's only one chunk, just "move" (copy and delete) the key and call it a day.
|
||||||
|
if len(chunk_list) == 1:
|
||||||
|
chunk_path = chunk_list[0].path
|
||||||
|
# Let the copy raise an exception if it fails.
|
||||||
|
self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path)
|
||||||
|
|
||||||
|
# Attempt to clean up the old chunk.
|
||||||
|
try:
|
||||||
|
self._cloud_bucket.delete_key(chunk_path)
|
||||||
|
except IOError:
|
||||||
|
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
|
||||||
|
msg = 'Failed to clean up chunk %s for move of %s'
|
||||||
|
logger.exception(msg, chunk_path, final_path)
|
||||||
|
else:
|
||||||
|
# Concatenate and write all the chunks as one key.
|
||||||
|
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
|
||||||
|
self.stream_write(final_path, concatenated)
|
||||||
|
|
||||||
|
# Attempt to clean up all the chunks.
|
||||||
|
for chunk in chunk_list:
|
||||||
|
try:
|
||||||
|
self._cloud_bucket.delete_key(chunk.path)
|
||||||
|
except IOError:
|
||||||
|
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
|
||||||
|
msg = 'Failed to clean up chunk %s for reupload of %s'
|
||||||
|
logger.exception(msg, chunk.path, final_path)
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
|
||||||
|
@ -342,8 +370,8 @@ class _CloudStorage(BaseStorageV2):
|
||||||
abs_chunk_path = self._init_path(chunk.path)
|
abs_chunk_path = self._init_path(chunk.path)
|
||||||
part_num = chunk_offset + 1
|
part_num = chunk_offset + 1
|
||||||
chunk_end_offset_inclusive = chunk.length - 1
|
chunk_end_offset_inclusive = chunk.length - 1
|
||||||
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, 0,
|
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num,
|
||||||
chunk_end_offset_inclusive)
|
start=0, end=chunk_end_offset_inclusive)
|
||||||
mpu.complete_upload()
|
mpu.complete_upload()
|
||||||
|
|
||||||
except IOError as ioe:
|
except IOError as ioe:
|
||||||
|
@ -354,12 +382,9 @@ class _CloudStorage(BaseStorageV2):
|
||||||
raise ioe
|
raise ioe
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path)
|
|
||||||
|
|
||||||
# We are going to turn all of the server side objects into a single file-like stream, and
|
# We are going to turn all of the server side objects into a single file-like stream, and
|
||||||
# pass that to stream_write to chunk and upload the final object.
|
# pass that to stream_write to chunk and upload the final object.
|
||||||
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
|
self._client_side_chunk_join(final_path, chunk_list)
|
||||||
self.stream_write(final_path, concatenated)
|
|
||||||
|
|
||||||
|
|
||||||
def cancel_chunked_upload(self, uuid, storage_metadata):
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
||||||
|
@ -472,3 +497,12 @@ class RadosGWStorage(_CloudStorage):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
|
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
|
||||||
|
|
||||||
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
|
self._initialize_cloud_conn()
|
||||||
|
|
||||||
|
# RadosGW does not support multipart copying from keys, so we are forced to join
|
||||||
|
# it all locally and then reupload.
|
||||||
|
# See https://github.com/ceph/ceph/pull/5139
|
||||||
|
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
||||||
|
self._client_side_chunk_join(final_path, chunk_list)
|
||||||
|
|
|
@ -90,6 +90,15 @@ class TestLimitingStream(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
class TestStreamSlice(unittest.TestCase):
|
class TestStreamSlice(unittest.TestCase):
|
||||||
|
def test_none_read(self):
|
||||||
|
class NoneReader(object):
|
||||||
|
def read(self, size=None):
|
||||||
|
return None
|
||||||
|
|
||||||
|
stream = StreamSlice(NoneReader(), 0)
|
||||||
|
self.assertEquals(None, stream.read(-1))
|
||||||
|
self.assertEquals(0, stream.tell())
|
||||||
|
|
||||||
def test_noslice(self):
|
def test_noslice(self):
|
||||||
fileobj = StringIO('this is a cool test')
|
fileobj = StringIO('this is a cool test')
|
||||||
stream = StreamSlice(fileobj, 0)
|
stream = StreamSlice(fileobj, 0)
|
||||||
|
@ -123,4 +132,4 @@ class TestStreamSlice(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -15,6 +15,8 @@ class BaseStreamFilelike(object):
|
||||||
|
|
||||||
def read(self, size=READ_UNTIL_END):
|
def read(self, size=READ_UNTIL_END):
|
||||||
buf = self._fileobj.read(size)
|
buf = self._fileobj.read(size)
|
||||||
|
if buf is None:
|
||||||
|
return None
|
||||||
self._cursor_position += len(buf)
|
self._cursor_position += len(buf)
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
|
|
Reference in a new issue