From d488517b3602a9db1cb0dc1cd88311eacb16c0bf Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 3 Nov 2017 17:30:15 -0400 Subject: [PATCH] Implement storage driver for Azure Blob Storage --- requirements-nover.txt | 2 + requirements.txt | 2 + storage/__init__.py | 2 + storage/azurestorage.py | 305 +++++++++++++++++++++++++++++++++++++ storage/test/test_azure.py | 217 ++++++++++++++++++++++++++ util/registry/filelike.py | 9 +- 6 files changed, 536 insertions(+), 1 deletion(-) create mode 100644 storage/azurestorage.py create mode 100644 storage/test/test_azure.py diff --git a/requirements-nover.txt b/requirements-nover.txt index 01f9e57de..c7cf76009 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -18,6 +18,8 @@ PyGithub aiowsgi alembic autobahn==0.9.3-3 +azure-common +azure-storage-blob beautifulsoup4 bencode bintrees diff --git a/requirements.txt b/requirements.txt index 588ada58e..aad5d33d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,8 @@ appdirs==1.4.3 APScheduler==3.0.5 asn1crypto==0.22.0 autobahn==0.9.3.post3 +azure-common==1.1.8 +azure-storage-blob==0.37.1 Babel==2.4.0 beautifulsoup4==4.5.3 bencode==1.0 diff --git a/storage/__init__.py b/storage/__init__.py index 60af7cf20..e48681973 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -3,6 +3,7 @@ from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage, CloudFr from storage.fakestorage import FakeStorage from storage.distributedstorage import DistributedStorage from storage.swift import SwiftStorage +from storage.azurestorage import AzureStorage from storage.downloadproxy import DownloadProxy from util.ipresolver import NoopIPResolver @@ -13,6 +14,7 @@ STORAGE_DRIVER_CLASSES = { 'RadosGWStorage': RadosGWStorage, 'SwiftStorage': SwiftStorage, 'CloudFrontedS3Storage': CloudFrontedS3Storage, + 'AzureStorage': AzureStorage, } diff --git a/storage/azurestorage.py b/storage/azurestorage.py new file mode 100644 index 000000000..1db2980cb --- /dev/null +++ b/storage/azurestorage.py @@ -0,0 +1,305 @@ +""" Azure storage driver. + + Based on: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-python-how-to-use-blob-storage +""" + +import logging +import os +import io +import uuid +import copy +import time + +from datetime import datetime, timedelta + +from azure.common import AzureException +from azure.storage.blob import BlockBlobService, ContentSettings, BlobBlock, ContainerPermissions +from azure.storage.common.models import CorsRule + +from storage.basestorage import BaseStorage +from util.registry.filelike import LimitingStream, READ_UNTIL_END + +logger = logging.getLogger(__name__) + +_COPY_POLL_SLEEP = 1 # seconds +_MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s +_MAX_BLOCK_SIZE = 1024 * 1024 * 100 # 100MB +_BLOCKS_KEY = 'blocks' +_CONTENT_TYPE_KEY = 'content-type' + +class AzureStorage(BaseStorage): + def __init__(self, context, azure_container, storage_path, azure_account_name, + azure_account_key=None, sas_token=None, connection_string=None, + is_emulated=False, socket_timeout=20, request_timeout=20): + super(AzureStorage, self).__init__() + self._context = context + self._storage_path = storage_path.lstrip('/') + + self._azure_account_name = azure_account_key + self._azure_account_key = azure_account_key + self._azure_sas_token = sas_token + self._azure_container = azure_container + self._azure_connection_string = connection_string + self._request_timeout = request_timeout + + self._blob_service = BlockBlobService(account_name=azure_account_name, + account_key=azure_account_key, + sas_token=sas_token, + is_emulated=is_emulated, + connection_string=connection_string, + socket_timeout=socket_timeout) + + def _blob_name_from_path(self, object_path): + return os.path.join(self._storage_path, object_path).rstrip('/') + + def _upload_blob_name_from_uuid(self, uuid): + return 'uploads/{0}'.format(uuid) + + def get_direct_download_url(self, object_path, request_ip=None, expires_in=60, + requires_cors=False, head=False): + blob_name = self._blob_name_from_path(object_path) + + try: + sas_token = self._blob_service.generate_blob_shared_access_signature( + self._azure_container, + blob_name, + ContainerPermissions.READ, + datetime.utcnow() + timedelta(seconds=expires_in)) + + blob_url = self._blob_service.make_blob_url(self._azure_container, blob_name, + sas_token=sas_token) + except AzureException: + logger.exception('Exception when trying to get direct download for path %s', path) + raise IOError('Exception when trying to get direct download') + + return blob_url + + def validate(self, client): + super(AzureStorage, self).validate(client) + self._blob_service.get_container_properties(self._azure_container, + timeout=self._request_timeout) + + def get_content(self, path): + blob_name = self._blob_name_from_path(path) + try: + blob = self._blob_service.get_blob_to_bytes(self._azure_container, blob_name) + except AzureException: + logger.exception('Exception when trying to get path %s', path) + raise IOError('Exception when trying to get path') + + return blob.content + + def put_content(self, path, content): + blob_name = self._blob_name_from_path(path) + try: + self._blob_service.create_blob_from_bytes(self._azure_container, blob_name, content) + except AzureException: + logger.exception('Exception when trying to put path %s', path) + raise IOError('Exception when trying to put path') + + def stream_read(self, path): + with self.stream_read_file(path) as f: + while True: + buf = f.read(self.buffer_size) + if not buf: + break + yield buf + + def stream_read_file(self, path): + blob_name = self._blob_name_from_path(path) + + try: + output_stream = io.BytesIO() + self._blob_service.get_blob_to_stream(self._azure_container, blob_name, output_stream) + output_stream.seek(0) + except AzureException: + logger.exception('Exception when trying to stream_file_read path %s', path) + raise IOError('Exception when trying to stream_file_read path') + + return output_stream + + def stream_write(self, path, fp, content_type=None, content_encoding=None): + blob_name = self._blob_name_from_path(path) + content_settings = ContentSettings( + content_type=content_type, + content_encoding=content_encoding, + ) + + try: + self._blob_service.create_blob_from_stream(self._azure_container, blob_name, fp, + content_settings=content_settings) + except AzureException: + logger.exception('Exception when trying to stream_write path %s', path) + raise IOError('Exception when trying to stream_write path') + + def exists(self, path): + blob_name = self._blob_name_from_path(path) + try: + return self._blob_service.exists(self._azure_container, blob_name, + timeout=self._request_timeout) + except AzureException: + logger.exception('Exception when trying to check exists path %s', path) + raise IOError('Exception when trying to check exists path') + + def remove(self, path): + blob_name = self._blob_name_from_path(path) + try: + self._blob_service.delete_blob(self._azure_container, blob_name) + except AzureException: + logger.exception('Exception when trying to remove path %s', path) + raise IOError('Exception when trying to remove path') + + def get_checksum(self, path): + blob_name = self._blob_name_from_path(path) + try: + blob = self._blob_service.get_blob_properties(self._azure_container, blob_name) + except AzureException: + logger.exception('Exception when trying to get_checksum for path %s', path) + raise IOError('Exception when trying to get_checksum path') + return blob.properties.etag + + def initiate_chunked_upload(self): + random_uuid = str(uuid.uuid4()) + metadata = { + _BLOCKS_KEY: [], + _CONTENT_TYPE_KEY: None, + } + return random_uuid, metadata + + def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): + if length == 0: + return 0, storage_metadata, None + + upload_blob_name = self._upload_blob_name_from_uuid(uuid) + new_metadata = copy.deepcopy(storage_metadata) + + total_bytes_written = 0 + + while True: + current_length = length - total_bytes_written + max_length = (min(current_length, _MAX_BLOCK_SIZE) if length != READ_UNTIL_END + else _MAX_BLOCK_SIZE) + if max_length <= 0: + break + + limited = LimitingStream(in_fp, max_length, seekable=False) + + block_index = len(new_metadata[_BLOCKS_KEY]) + block_id = format(block_index, '05') + new_metadata[_BLOCKS_KEY].append(block_id) + + try: + self._blob_service.put_block(self._azure_container, upload_blob_name, limited, block_id) + except AzureException as ae: + logger.exception('Exception when trying to stream_upload_chunk block %s for %s', block_id, + uuid) + return total_bytes_written, new_metadata, ae + + bytes_written = limited.tell() + total_bytes_written += bytes_written + if bytes_written == 0 or bytes_written < max_length: + break + + if content_type is not None: + new_metadata[_CONTENT_TYPE_KEY] = content_type + + return total_bytes_written, new_metadata, None + + def complete_chunked_upload(self, uuid, final_path, storage_metadata): + """ Complete the chunked upload and store the final results in the path indicated. + Returns nothing. + """ + # Commit the blob's blocks. + upload_blob_name = self._upload_blob_name_from_uuid(uuid) + block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]] + try: + self._blob_service.put_block_list(self._azure_container, upload_blob_name, block_list) + except AzureException: + logger.exception('Exception when trying to put block list for path %s from upload %s', + final_path, uuid) + raise IOError('Exception when trying to put block list') + + # Set the content type on the blob if applicable. + if storage_metadata[_CONTENT_TYPE_KEY] is not None: + content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY]) + try: + self._blob_service.set_blob_properties(self._azure_container, upload_blob_name, + content_settings=content_settings) + except AzureException: + logger.exception('Exception when trying to set blob properties for path %s', final_path) + raise IOError('Exception when trying to set blob properties') + + # Copy the blob to its final location. + blob_name = self._blob_name_from_path(final_path) + copy_source_url = self.get_direct_download_url(upload_blob_name) + + try: + copy_prop = self._blob_service.copy_blob(self._azure_container, blob_name, + copy_source_url) + except AzureException: + logger.exception('Exception when trying to set copy uploaded blob %s to path %s', uuid, + final_path) + raise IOError('Exception when trying to copy uploaded blob') + + self._await_copy(self._azure_container, blob_name, copy_prop) + + # Delete the original blob. + try: + self._blob_service.delete_blob(self._azure_container, upload_blob_name) + except AzureException: + logger.exception('Exception when trying to set delete uploaded blob %s', uuid) + raise IOError('Exception when trying to delete uploaded blob') + + def cancel_chunked_upload(self, uuid, storage_metadata): + """ Cancel the chunked upload and clean up any outstanding partially uploaded data. + Returns nothing. + """ + upload_blob_name = self._upload_blob_name_from_uuid(uuid) + self._blob_service.delete_blob(self._azure_container, upload_blob_name) + + def _await_copy(self, container, blob_name, copy_prop): + # Poll for copy completion. + count = 0 + while copy_prop.status == 'pending': + props = self._blob_service.get_blob_properties(container, blob_name) + copy_prop = props.properties.copy + + if copy_prop.status == 'success': + return + + if copy_prop.status == 'failed' or copy_prop.status == 'aborted': + raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_props.status)) + + count = count + 1 + if count > _MAX_COPY_POLL_COUNT: + raise IOError('Timed out waiting for copy to complete') + + time.sleep(_COPY_POLL_SLEEP) + + def copy_to(self, destination, path): + if (self.__class__ == destination.__class__): + logger.debug('Starting copying file from Azure %s to Azure %s via an Azure copy', + self._azure_container, destination) + blob_name = self._blob_name_from_path(path) + copy_source_url = self.get_direct_download_url(path) + copy_prop = self._blob_service.copy_blob(destination._azure_container, blob_name, + copy_source_url) + self._await_copy(destination._azure_container, blob_name, copy_prop) + logger.debug('Finished copying file from Azure %s to Azure %s via an Azure copy', + self._azure_container, destination) + return + + # Fallback to a slower, default copy. + logger.debug('Copying file from Azure container %s to %s via a streamed copy', + self._azure_container, destination) + with self.stream_read_file(path) as fp: + destination.stream_write(path, fp) + + def setup(self): + # From: https://docs.microsoft.com/en-us/rest/api/storageservices/cross-origin-resource-sharing--cors--support-for-the-azure-storage-services + cors = [CorsRule(allowed_origins='*', allowed_methods=['GET', 'PUT'], max_age_in_seconds=3000, + exposed_headers=['x-ms-meta-*'], + allowed_headers=['x-ms-meta-data*', 'x-ms-meta-target*', 'x-ms-meta-abc', + 'Content-Type'])] + + self._blob_service.set_blob_service_properties(cors=cors) diff --git a/storage/test/test_azure.py b/storage/test/test_azure.py new file mode 100644 index 000000000..5c9d40985 --- /dev/null +++ b/storage/test/test_azure.py @@ -0,0 +1,217 @@ +import base64 +import md5 +import pytest +import io + +from contextlib import contextmanager +from urlparse import parse_qs, urlparse +from httmock import urlmatch, HTTMock +from xml.dom import minidom + +from azure.storage.blob import BlockBlobService + +from storage.azurestorage import AzureStorage + +@contextmanager +def fake_azure_storage(files=None): + service = BlockBlobService(is_emulated=True) + endpoint = service.primary_endpoint.split('/') + container_name = 'somecontainer' + files = files if files is not None else {} + + container_prefix = '/' + endpoint[1] + '/' + container_name + + @urlmatch(netloc=endpoint[0], path=container_prefix + '$') + def get_container(url, request): + return {'status_code': 200, 'content': '{}'} + + @urlmatch(netloc=endpoint[0], path=container_prefix + '/.+') + def container_file(url, request): + filename = url.path[len(container_prefix)+1:] + + if request.method == 'GET' or request.method == 'HEAD': + return { + 'status_code': 200 if filename in files else 404, + 'content': files.get(filename), + 'headers': { + 'ETag': 'foobar', + }, + } + + if request.method == 'DELETE': + files.pop(filename) + return { + 'status_code': 201, + 'content': '', + 'headers': { + 'ETag': 'foobar', + }, + } + + + if request.method == 'PUT': + query_params = parse_qs(url.query) + if query_params.get('comp') == ['properties']: + return { + 'status_code': 201, + 'content': '{}', + 'headers': { + 'x-ms-request-server-encrypted': False, + 'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT', + } + } + + if query_params.get('comp') == ['block']: + block_id = query_params['blockid'][0] + files[filename] = files.get(filename) or {} + files[filename][block_id] = request.body + return { + 'status_code': 201, + 'content': '{}', + 'headers': { + 'Content-MD5': base64.b64encode(md5.new(request.body).digest()), + 'ETag': 'foo', + 'x-ms-request-server-encrypted': False, + 'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT', + } + } + + if query_params.get('comp') == ['blocklist']: + parsed = minidom.parseString(request.body) + latest = parsed.getElementsByTagName('Latest') + combined = [] + for latest_block in latest: + combined.append(files[filename][latest_block.childNodes[0].data]) + + files[filename] = ''.join(combined) + return { + 'status_code': 201, + 'content': '{}', + 'headers': { + 'Content-MD5': base64.b64encode(md5.new(files[filename]).digest()), + 'ETag': 'foo', + 'x-ms-request-server-encrypted': False, + 'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT', + } + } + + if request.headers.get('x-ms-copy-source'): + copy_source = request.headers['x-ms-copy-source'] + copy_path = urlparse(copy_source).path[len(container_prefix) + 1:] + files[filename] = files[copy_path] + return { + 'status_code': 201, + 'content': '{}', + 'headers': { + 'x-ms-request-server-encrypted': False, + 'x-ms-copy-status': 'success', + 'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT', + } + } + + files[filename] = request.body + + return { + 'status_code': 201, + 'content': '{}', + 'headers': { + 'Content-MD5': base64.b64encode(md5.new(request.body).digest()), + 'ETag': 'foo', + 'x-ms-request-server-encrypted': False, + 'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT', + } + } + + return {'status_code': 405, 'content': ''} + + @urlmatch(netloc=endpoint[0], path='.+') + def catchall(url, request): + return {'status_code': 405, 'content': ''} + + with HTTMock(get_container, container_file, catchall): + yield AzureStorage(None, 'somecontainer', '', 'someaccount', is_emulated=True) + +def test_validate(): + with fake_azure_storage() as s: + s.validate(None) + +def test_basics(): + with fake_azure_storage() as s: + s.put_content('hello', 'hello world') + assert s.exists('hello') + assert s.get_content('hello') == 'hello world' + assert s.get_checksum('hello') + assert ''.join(list(s.stream_read('hello'))) == 'hello world' + assert s.stream_read_file('hello').read() == 'hello world' + + s.remove('hello') + assert not s.exists('hello') + +def test_does_not_exist(): + with fake_azure_storage() as s: + assert not s.exists('hello') + + with pytest.raises(IOError): + s.get_content('hello') + + with pytest.raises(IOError): + s.get_checksum('hello') + + with pytest.raises(IOError): + list(s.stream_read('hello')) + + with pytest.raises(IOError): + s.stream_read_file('hello') + +def test_stream_write(): + fp = io.BytesIO() + fp.write('hello world!') + fp.seek(0) + + with fake_azure_storage() as s: + s.stream_write('hello', fp) + + assert s.get_content('hello') == 'hello world!' + +@pytest.mark.parametrize('chunk_size', [ + (1), + (5), + (10), +]) +def test_chunked_uploading(chunk_size): + with fake_azure_storage() as s: + string_data = 'hello world!' + chunks = [string_data[index:index+chunk_size] for index in range(0, len(string_data), chunk_size)] + + uuid, metadata = s.initiate_chunked_upload() + start_index = 0 + + for chunk in chunks: + fp = io.BytesIO() + fp.write(chunk) + fp.seek(0) + + total_bytes_written, metadata, error = s.stream_upload_chunk(uuid, start_index, -1, fp, + metadata) + assert total_bytes_written == len(chunk) + assert metadata + assert not error + + start_index += total_bytes_written + + s.complete_chunked_upload(uuid, 'chunked', metadata) + assert s.get_content('chunked') == string_data + +def test_get_direct_download_url(): + with fake_azure_storage() as s: + s.put_content('hello', 'world') + assert 'sig' in s.get_direct_download_url('hello') + +def test_copy_to(): + files = {} + + with fake_azure_storage(files=files) as s: + s.put_content('hello', 'hello world') + with fake_azure_storage(files=files) as s2: + s.copy_to(s2, 'hello') + assert s2.exists('hello') diff --git a/util/registry/filelike.py b/util/registry/filelike.py index 97555a23f..81e65ec59 100644 --- a/util/registry/filelike.py +++ b/util/registry/filelike.py @@ -156,5 +156,12 @@ class LimitingStream(StreamSlice): of bytes. All calls after that limit (if specified) will act as if the file has no additional data. """ - def __init__(self, fileobj, read_limit=READ_UNTIL_END): + def __init__(self, fileobj, read_limit=READ_UNTIL_END, seekable=True): super(LimitingStream, self).__init__(fileobj, 0, read_limit) + self._seekable = seekable + + def seek(self, index, whence=WHENCE_ABSOLUTE): + if not self._seekable: + raise AttributeError + + super(LimitingStream, self).seek(index, whence)