From 143ca8653d394391a7bfe9bf1628186db4a6670a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 9 Feb 2018 16:46:50 -0500 Subject: [PATCH] Fix bugs in Azure storage engine to make it actually work Tested against a real Azure account --- requirements.txt | 2 +- storage/azurestorage.py | 45 ++++++++++++++++++++++++++++------------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/requirements.txt b/requirements.txt index aad5d33d2..3dfb81be3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,7 @@ APScheduler==3.0.5 asn1crypto==0.22.0 autobahn==0.9.3.post3 azure-common==1.1.8 -azure-storage-blob==0.37.1 +azure-storage-blob==1.1.0 Babel==2.4.0 beautifulsoup4==4.5.3 bencode==1.0 diff --git a/storage/azurestorage.py b/storage/azurestorage.py index 50e77cbb7..f24af76a7 100644 --- a/storage/azurestorage.py +++ b/storage/azurestorage.py @@ -21,7 +21,7 @@ from util.registry.filelike import LimitingStream, READ_UNTIL_END logger = logging.getLogger(__name__) -_COPY_POLL_SLEEP = 1 # seconds +_COPY_POLL_SLEEP = 0.25 # seconds _MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s _MAX_BLOCK_SIZE = 1024 * 1024 * 100 # 100MB _BLOCKS_KEY = 'blocks' @@ -56,6 +56,9 @@ class AzureStorage(BaseStorage): return os.path.join(self._storage_path, object_path).rstrip('/') + def _upload_blob_path_from_uuid(self, uuid): + return self._blob_name_from_path(self._upload_blob_name_from_uuid(uuid)) + def _upload_blob_name_from_uuid(self, uuid): return 'uploads/{0}'.format(uuid) @@ -73,7 +76,7 @@ class AzureStorage(BaseStorage): blob_url = self._blob_service.make_blob_url(self._azure_container, blob_name, sas_token=sas_token) except AzureException: - logger.exception('Exception when trying to get direct download for path %s', path) + logger.exception('Exception when trying to get direct download for path %s', object_path) raise IOError('Exception when trying to get direct download') return blob_url @@ -174,7 +177,7 @@ class AzureStorage(BaseStorage): if length == 0: return 0, storage_metadata, None - upload_blob_name = self._upload_blob_name_from_uuid(uuid) + upload_blob_path = self._upload_blob_path_from_uuid(uuid) new_metadata = copy.deepcopy(storage_metadata) total_bytes_written = 0 @@ -188,18 +191,28 @@ class AzureStorage(BaseStorage): limited = LimitingStream(in_fp, max_length, seekable=False) + # Note: Azure fails if a zero-length block is uploaded, so we read all the data here, + # and, if there is none, terminate early. + block_data = b'' + for chunk in iter(lambda: limited.read(4096), b""): + block_data += chunk + + if len(block_data) == 0: + break + block_index = len(new_metadata[_BLOCKS_KEY]) block_id = format(block_index, '05') new_metadata[_BLOCKS_KEY].append(block_id) try: - self._blob_service.put_block(self._azure_container, upload_blob_name, limited, block_id) + self._blob_service.put_block(self._azure_container, upload_blob_path, block_data, block_id, + validate_content=True) except AzureException as ae: logger.exception('Exception when trying to stream_upload_chunk block %s for %s', block_id, uuid) return total_bytes_written, new_metadata, ae - bytes_written = limited.tell() + bytes_written = len(block_data) total_bytes_written += bytes_written if bytes_written == 0 or bytes_written < max_length: break @@ -214,10 +227,11 @@ class AzureStorage(BaseStorage): Returns nothing. """ # Commit the blob's blocks. - upload_blob_name = self._upload_blob_name_from_uuid(uuid) + upload_blob_path = self._upload_blob_path_from_uuid(uuid) block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]] + try: - self._blob_service.put_block_list(self._azure_container, upload_blob_name, block_list) + self._blob_service.put_block_list(self._azure_container, upload_blob_path, block_list) except AzureException: logger.exception('Exception when trying to put block list for path %s from upload %s', final_path, uuid) @@ -227,17 +241,18 @@ class AzureStorage(BaseStorage): if storage_metadata[_CONTENT_TYPE_KEY] is not None: content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY]) try: - self._blob_service.set_blob_properties(self._azure_container, upload_blob_name, + self._blob_service.set_blob_properties(self._azure_container, upload_blob_path, content_settings=content_settings) except AzureException: logger.exception('Exception when trying to set blob properties for path %s', final_path) raise IOError('Exception when trying to set blob properties') # Copy the blob to its final location. - blob_name = self._blob_name_from_path(final_path) - copy_source_url = self.get_direct_download_url(upload_blob_name) + upload_blob_name = self._upload_blob_name_from_uuid(uuid) + copy_source_url = self.get_direct_download_url(upload_blob_name, expires_in=300) try: + blob_name = self._blob_name_from_path(final_path) copy_prop = self._blob_service.copy_blob(self._azure_container, blob_name, copy_source_url) except AzureException: @@ -248,8 +263,9 @@ class AzureStorage(BaseStorage): self._await_copy(self._azure_container, blob_name, copy_prop) # Delete the original blob. + logger.debug('Deleting chunked upload %s at path %s', uuid, upload_blob_path) try: - self._blob_service.delete_blob(self._azure_container, upload_blob_name) + self._blob_service.delete_blob(self._azure_container, upload_blob_path) except AzureException: logger.exception('Exception when trying to set delete uploaded blob %s', uuid) raise IOError('Exception when trying to delete uploaded blob') @@ -258,8 +274,9 @@ class AzureStorage(BaseStorage): """ Cancel the chunked upload and clean up any outstanding partially uploaded data. Returns nothing. """ - upload_blob_name = self._upload_blob_name_from_uuid(uuid) - self._blob_service.delete_blob(self._azure_container, upload_blob_name) + upload_blob_path = self._upload_blob_path_from_uuid(uuid) + logger.debug('Canceling chunked upload %s at path %s', uuid, upload_blob_path) + self._blob_service.delete_blob(self._azure_container, upload_blob_path) def _await_copy(self, container, blob_name, copy_prop): # Poll for copy completion. @@ -272,7 +289,7 @@ class AzureStorage(BaseStorage): return if copy_prop.status == 'failed' or copy_prop.status == 'aborted': - raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_props.status)) + raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_prop.status)) count = count + 1 if count > _MAX_COPY_POLL_COUNT: