Fix bugs in Azure storage engine to make it actually work

Tested against a real Azure account
This commit is contained in:
Joseph Schorr 2018-02-09 16:46:50 -05:00
parent 846deb75fe
commit 143ca8653d
2 changed files with 32 additions and 15 deletions

View file

@ -17,7 +17,7 @@ APScheduler==3.0.5
asn1crypto==0.22.0 asn1crypto==0.22.0
autobahn==0.9.3.post3 autobahn==0.9.3.post3
azure-common==1.1.8 azure-common==1.1.8
azure-storage-blob==0.37.1 azure-storage-blob==1.1.0
Babel==2.4.0 Babel==2.4.0
beautifulsoup4==4.5.3 beautifulsoup4==4.5.3
bencode==1.0 bencode==1.0

View file

@ -21,7 +21,7 @@ from util.registry.filelike import LimitingStream, READ_UNTIL_END
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_COPY_POLL_SLEEP = 1 # seconds _COPY_POLL_SLEEP = 0.25 # seconds
_MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s _MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s
_MAX_BLOCK_SIZE = 1024 * 1024 * 100 # 100MB _MAX_BLOCK_SIZE = 1024 * 1024 * 100 # 100MB
_BLOCKS_KEY = 'blocks' _BLOCKS_KEY = 'blocks'
@ -56,6 +56,9 @@ class AzureStorage(BaseStorage):
return os.path.join(self._storage_path, object_path).rstrip('/') return os.path.join(self._storage_path, object_path).rstrip('/')
def _upload_blob_path_from_uuid(self, uuid):
return self._blob_name_from_path(self._upload_blob_name_from_uuid(uuid))
def _upload_blob_name_from_uuid(self, uuid): def _upload_blob_name_from_uuid(self, uuid):
return 'uploads/{0}'.format(uuid) return 'uploads/{0}'.format(uuid)
@ -73,7 +76,7 @@ class AzureStorage(BaseStorage):
blob_url = self._blob_service.make_blob_url(self._azure_container, blob_name, blob_url = self._blob_service.make_blob_url(self._azure_container, blob_name,
sas_token=sas_token) sas_token=sas_token)
except AzureException: except AzureException:
logger.exception('Exception when trying to get direct download for path %s', path) logger.exception('Exception when trying to get direct download for path %s', object_path)
raise IOError('Exception when trying to get direct download') raise IOError('Exception when trying to get direct download')
return blob_url return blob_url
@ -174,7 +177,7 @@ class AzureStorage(BaseStorage):
if length == 0: if length == 0:
return 0, storage_metadata, None return 0, storage_metadata, None
upload_blob_name = self._upload_blob_name_from_uuid(uuid) upload_blob_path = self._upload_blob_path_from_uuid(uuid)
new_metadata = copy.deepcopy(storage_metadata) new_metadata = copy.deepcopy(storage_metadata)
total_bytes_written = 0 total_bytes_written = 0
@ -188,18 +191,28 @@ class AzureStorage(BaseStorage):
limited = LimitingStream(in_fp, max_length, seekable=False) limited = LimitingStream(in_fp, max_length, seekable=False)
# Note: Azure fails if a zero-length block is uploaded, so we read all the data here,
# and, if there is none, terminate early.
block_data = b''
for chunk in iter(lambda: limited.read(4096), b""):
block_data += chunk
if len(block_data) == 0:
break
block_index = len(new_metadata[_BLOCKS_KEY]) block_index = len(new_metadata[_BLOCKS_KEY])
block_id = format(block_index, '05') block_id = format(block_index, '05')
new_metadata[_BLOCKS_KEY].append(block_id) new_metadata[_BLOCKS_KEY].append(block_id)
try: try:
self._blob_service.put_block(self._azure_container, upload_blob_name, limited, block_id) self._blob_service.put_block(self._azure_container, upload_blob_path, block_data, block_id,
validate_content=True)
except AzureException as ae: except AzureException as ae:
logger.exception('Exception when trying to stream_upload_chunk block %s for %s', block_id, logger.exception('Exception when trying to stream_upload_chunk block %s for %s', block_id,
uuid) uuid)
return total_bytes_written, new_metadata, ae return total_bytes_written, new_metadata, ae
bytes_written = limited.tell() bytes_written = len(block_data)
total_bytes_written += bytes_written total_bytes_written += bytes_written
if bytes_written == 0 or bytes_written < max_length: if bytes_written == 0 or bytes_written < max_length:
break break
@ -214,10 +227,11 @@ class AzureStorage(BaseStorage):
Returns nothing. Returns nothing.
""" """
# Commit the blob's blocks. # Commit the blob's blocks.
upload_blob_name = self._upload_blob_name_from_uuid(uuid) upload_blob_path = self._upload_blob_path_from_uuid(uuid)
block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]] block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]]
try: try:
self._blob_service.put_block_list(self._azure_container, upload_blob_name, block_list) self._blob_service.put_block_list(self._azure_container, upload_blob_path, block_list)
except AzureException: except AzureException:
logger.exception('Exception when trying to put block list for path %s from upload %s', logger.exception('Exception when trying to put block list for path %s from upload %s',
final_path, uuid) final_path, uuid)
@ -227,17 +241,18 @@ class AzureStorage(BaseStorage):
if storage_metadata[_CONTENT_TYPE_KEY] is not None: if storage_metadata[_CONTENT_TYPE_KEY] is not None:
content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY]) content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY])
try: try:
self._blob_service.set_blob_properties(self._azure_container, upload_blob_name, self._blob_service.set_blob_properties(self._azure_container, upload_blob_path,
content_settings=content_settings) content_settings=content_settings)
except AzureException: except AzureException:
logger.exception('Exception when trying to set blob properties for path %s', final_path) logger.exception('Exception when trying to set blob properties for path %s', final_path)
raise IOError('Exception when trying to set blob properties') raise IOError('Exception when trying to set blob properties')
# Copy the blob to its final location. # Copy the blob to its final location.
blob_name = self._blob_name_from_path(final_path) upload_blob_name = self._upload_blob_name_from_uuid(uuid)
copy_source_url = self.get_direct_download_url(upload_blob_name) copy_source_url = self.get_direct_download_url(upload_blob_name, expires_in=300)
try: try:
blob_name = self._blob_name_from_path(final_path)
copy_prop = self._blob_service.copy_blob(self._azure_container, blob_name, copy_prop = self._blob_service.copy_blob(self._azure_container, blob_name,
copy_source_url) copy_source_url)
except AzureException: except AzureException:
@ -248,8 +263,9 @@ class AzureStorage(BaseStorage):
self._await_copy(self._azure_container, blob_name, copy_prop) self._await_copy(self._azure_container, blob_name, copy_prop)
# Delete the original blob. # Delete the original blob.
logger.debug('Deleting chunked upload %s at path %s', uuid, upload_blob_path)
try: try:
self._blob_service.delete_blob(self._azure_container, upload_blob_name) self._blob_service.delete_blob(self._azure_container, upload_blob_path)
except AzureException: except AzureException:
logger.exception('Exception when trying to set delete uploaded blob %s', uuid) logger.exception('Exception when trying to set delete uploaded blob %s', uuid)
raise IOError('Exception when trying to delete uploaded blob') raise IOError('Exception when trying to delete uploaded blob')
@ -258,8 +274,9 @@ class AzureStorage(BaseStorage):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data. """ Cancel the chunked upload and clean up any outstanding partially uploaded data.
Returns nothing. Returns nothing.
""" """
upload_blob_name = self._upload_blob_name_from_uuid(uuid) upload_blob_path = self._upload_blob_path_from_uuid(uuid)
self._blob_service.delete_blob(self._azure_container, upload_blob_name) logger.debug('Canceling chunked upload %s at path %s', uuid, upload_blob_path)
self._blob_service.delete_blob(self._azure_container, upload_blob_path)
def _await_copy(self, container, blob_name, copy_prop): def _await_copy(self, container, blob_name, copy_prop):
# Poll for copy completion. # Poll for copy completion.
@ -272,7 +289,7 @@ class AzureStorage(BaseStorage):
return return
if copy_prop.status == 'failed' or copy_prop.status == 'aborted': if copy_prop.status == 'failed' or copy_prop.status == 'aborted':
raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_props.status)) raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_prop.status))
count = count + 1 count = count + 1
if count > _MAX_COPY_POLL_COUNT: if count > _MAX_COPY_POLL_COUNT: