Merge pull request #2902 from coreos-inc/joseph.schorr/QS-51/azure-blob-store

Add support for Azure Blob Storage
This commit is contained in:
josephschorr 2018-02-07 11:34:29 -05:00 committed by GitHub
commit 846deb75fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 549 additions and 1 deletions

View file

@ -18,6 +18,8 @@ PyGithub
aiowsgi
alembic
autobahn==0.9.3-3
azure-common
azure-storage-blob
beautifulsoup4
bencode
bintrees

View file

@ -16,6 +16,8 @@ appdirs==1.4.3
APScheduler==3.0.5
asn1crypto==0.22.0
autobahn==0.9.3.post3
azure-common==1.1.8
azure-storage-blob==0.37.1
Babel==2.4.0
beautifulsoup4==4.5.3
bencode==1.0

View file

@ -297,6 +297,7 @@
<select class="form-control" ng-class="storageConfigError[$index].engine ? 'ng-invalid' : ''" ng-model="sc.data[0]">
<option value="LocalStorage">Locally mounted directory</option>
<option value="S3Storage">Amazon S3</option>
<option value="AzureStorage">Azure Blob Storage</option>
<option value="GoogleCloudStorage">Google Cloud Storage</option>
<option value="RadosGWStorage">Ceph Object Gateway (RADOS)</option>
<option value="SwiftStorage">OpenStack Storage (Swift)</option>

View file

@ -108,6 +108,14 @@ angular.module("core-config-setup", ['angularFileUpload'])
{'name': 'port', 'title': 'S3 Port (optional)', 'placeholder': '443', 'kind': 'text', 'pattern': '^[0-9]+$', 'optional': true}
],
'AzureStorage': [
{'name': 'azure_container', 'title': 'Azure Storage Container', 'placeholder': 'container', 'kind': 'text'},
{'name': 'storage_path', 'title': 'Storage Directory', 'placeholder': '/path/inside/container', 'kind': 'text'},
{'name': 'azure_account_name', 'title': 'Azure Account Name', 'placeholder': 'accountnamehere', 'kind': 'text'},
{'name': 'azure_account_key', 'title': 'Azure Account Key', 'placeholder': 'accountkeyhere', 'kind': 'text', 'optional': true},
{'name': 'sas_token', 'title': 'Azure SAS Token', 'placeholder': 'sastokenhere', 'kind': 'text', 'optional': true},
],
'GoogleCloudStorage': [
{'name': 'access_key', 'title': 'Cloud Access Key', 'placeholder': 'accesskeyhere', 'kind': 'text'},
{'name': 'secret_key', 'title': 'Cloud Secret Key', 'placeholder': 'secretkeyhere', 'kind': 'text'},

View file

@ -3,6 +3,7 @@ from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage, CloudFr
from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
from storage.swift import SwiftStorage
from storage.azurestorage import AzureStorage
from storage.downloadproxy import DownloadProxy
from util.ipresolver import NoopIPResolver
@ -13,6 +14,7 @@ STORAGE_DRIVER_CLASSES = {
'RadosGWStorage': RadosGWStorage,
'SwiftStorage': SwiftStorage,
'CloudFrontedS3Storage': CloudFrontedS3Storage,
'AzureStorage': AzureStorage,
}

309
storage/azurestorage.py Normal file
View file

@ -0,0 +1,309 @@
""" Azure storage driver.
Based on: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-python-how-to-use-blob-storage
"""
import logging
import os
import io
import uuid
import copy
import time
from datetime import datetime, timedelta
from azure.common import AzureException
from azure.storage.blob import BlockBlobService, ContentSettings, BlobBlock, ContainerPermissions
from azure.storage.common.models import CorsRule
from storage.basestorage import BaseStorage
from util.registry.filelike import LimitingStream, READ_UNTIL_END
logger = logging.getLogger(__name__)
_COPY_POLL_SLEEP = 1 # seconds
_MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s
_MAX_BLOCK_SIZE = 1024 * 1024 * 100 # 100MB
_BLOCKS_KEY = 'blocks'
_CONTENT_TYPE_KEY = 'content-type'
class AzureStorage(BaseStorage):
def __init__(self, context, azure_container, storage_path, azure_account_name,
azure_account_key=None, sas_token=None, connection_string=None,
is_emulated=False, socket_timeout=20, request_timeout=20):
super(AzureStorage, self).__init__()
self._context = context
self._storage_path = storage_path.lstrip('/')
self._azure_account_name = azure_account_key
self._azure_account_key = azure_account_key
self._azure_sas_token = sas_token
self._azure_container = azure_container
self._azure_connection_string = connection_string
self._request_timeout = request_timeout
self._blob_service = BlockBlobService(account_name=azure_account_name,
account_key=azure_account_key,
sas_token=sas_token,
is_emulated=is_emulated,
connection_string=connection_string,
socket_timeout=socket_timeout)
def _blob_name_from_path(self, object_path):
if '..' in object_path:
raise Exception('Relative paths are not allowed; found %s' % object_path)
return os.path.join(self._storage_path, object_path).rstrip('/')
def _upload_blob_name_from_uuid(self, uuid):
return 'uploads/{0}'.format(uuid)
def get_direct_download_url(self, object_path, request_ip=None, expires_in=60,
requires_cors=False, head=False):
blob_name = self._blob_name_from_path(object_path)
try:
sas_token = self._blob_service.generate_blob_shared_access_signature(
self._azure_container,
blob_name,
ContainerPermissions.READ,
datetime.utcnow() + timedelta(seconds=expires_in))
blob_url = self._blob_service.make_blob_url(self._azure_container, blob_name,
sas_token=sas_token)
except AzureException:
logger.exception('Exception when trying to get direct download for path %s', path)
raise IOError('Exception when trying to get direct download')
return blob_url
def validate(self, client):
super(AzureStorage, self).validate(client)
self._blob_service.get_container_properties(self._azure_container,
timeout=self._request_timeout)
def get_content(self, path):
blob_name = self._blob_name_from_path(path)
try:
blob = self._blob_service.get_blob_to_bytes(self._azure_container, blob_name)
except AzureException:
logger.exception('Exception when trying to get path %s', path)
raise IOError('Exception when trying to get path')
return blob.content
def put_content(self, path, content):
blob_name = self._blob_name_from_path(path)
try:
self._blob_service.create_blob_from_bytes(self._azure_container, blob_name, content)
except AzureException:
logger.exception('Exception when trying to put path %s', path)
raise IOError('Exception when trying to put path')
def stream_read(self, path):
with self.stream_read_file(path) as f:
while True:
buf = f.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
blob_name = self._blob_name_from_path(path)
try:
output_stream = io.BytesIO()
self._blob_service.get_blob_to_stream(self._azure_container, blob_name, output_stream)
output_stream.seek(0)
except AzureException:
logger.exception('Exception when trying to stream_file_read path %s', path)
raise IOError('Exception when trying to stream_file_read path')
return output_stream
def stream_write(self, path, fp, content_type=None, content_encoding=None):
blob_name = self._blob_name_from_path(path)
content_settings = ContentSettings(
content_type=content_type,
content_encoding=content_encoding,
)
try:
self._blob_service.create_blob_from_stream(self._azure_container, blob_name, fp,
content_settings=content_settings)
except AzureException:
logger.exception('Exception when trying to stream_write path %s', path)
raise IOError('Exception when trying to stream_write path')
def exists(self, path):
blob_name = self._blob_name_from_path(path)
try:
return self._blob_service.exists(self._azure_container, blob_name,
timeout=self._request_timeout)
except AzureException:
logger.exception('Exception when trying to check exists path %s', path)
raise IOError('Exception when trying to check exists path')
def remove(self, path):
blob_name = self._blob_name_from_path(path)
try:
self._blob_service.delete_blob(self._azure_container, blob_name)
except AzureException:
logger.exception('Exception when trying to remove path %s', path)
raise IOError('Exception when trying to remove path')
def get_checksum(self, path):
blob_name = self._blob_name_from_path(path)
try:
blob = self._blob_service.get_blob_properties(self._azure_container, blob_name)
except AzureException:
logger.exception('Exception when trying to get_checksum for path %s', path)
raise IOError('Exception when trying to get_checksum path')
return blob.properties.etag
def initiate_chunked_upload(self):
random_uuid = str(uuid.uuid4())
metadata = {
_BLOCKS_KEY: [],
_CONTENT_TYPE_KEY: None,
}
return random_uuid, metadata
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
if length == 0:
return 0, storage_metadata, None
upload_blob_name = self._upload_blob_name_from_uuid(uuid)
new_metadata = copy.deepcopy(storage_metadata)
total_bytes_written = 0
while True:
current_length = length - total_bytes_written
max_length = (min(current_length, _MAX_BLOCK_SIZE) if length != READ_UNTIL_END
else _MAX_BLOCK_SIZE)
if max_length <= 0:
break
limited = LimitingStream(in_fp, max_length, seekable=False)
block_index = len(new_metadata[_BLOCKS_KEY])
block_id = format(block_index, '05')
new_metadata[_BLOCKS_KEY].append(block_id)
try:
self._blob_service.put_block(self._azure_container, upload_blob_name, limited, block_id)
except AzureException as ae:
logger.exception('Exception when trying to stream_upload_chunk block %s for %s', block_id,
uuid)
return total_bytes_written, new_metadata, ae
bytes_written = limited.tell()
total_bytes_written += bytes_written
if bytes_written == 0 or bytes_written < max_length:
break
if content_type is not None:
new_metadata[_CONTENT_TYPE_KEY] = content_type
return total_bytes_written, new_metadata, None
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
"""
# Commit the blob's blocks.
upload_blob_name = self._upload_blob_name_from_uuid(uuid)
block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]]
try:
self._blob_service.put_block_list(self._azure_container, upload_blob_name, block_list)
except AzureException:
logger.exception('Exception when trying to put block list for path %s from upload %s',
final_path, uuid)
raise IOError('Exception when trying to put block list')
# Set the content type on the blob if applicable.
if storage_metadata[_CONTENT_TYPE_KEY] is not None:
content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY])
try:
self._blob_service.set_blob_properties(self._azure_container, upload_blob_name,
content_settings=content_settings)
except AzureException:
logger.exception('Exception when trying to set blob properties for path %s', final_path)
raise IOError('Exception when trying to set blob properties')
# Copy the blob to its final location.
blob_name = self._blob_name_from_path(final_path)
copy_source_url = self.get_direct_download_url(upload_blob_name)
try:
copy_prop = self._blob_service.copy_blob(self._azure_container, blob_name,
copy_source_url)
except AzureException:
logger.exception('Exception when trying to set copy uploaded blob %s to path %s', uuid,
final_path)
raise IOError('Exception when trying to copy uploaded blob')
self._await_copy(self._azure_container, blob_name, copy_prop)
# Delete the original blob.
try:
self._blob_service.delete_blob(self._azure_container, upload_blob_name)
except AzureException:
logger.exception('Exception when trying to set delete uploaded blob %s', uuid)
raise IOError('Exception when trying to delete uploaded blob')
def cancel_chunked_upload(self, uuid, storage_metadata):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
Returns nothing.
"""
upload_blob_name = self._upload_blob_name_from_uuid(uuid)
self._blob_service.delete_blob(self._azure_container, upload_blob_name)
def _await_copy(self, container, blob_name, copy_prop):
# Poll for copy completion.
count = 0
while copy_prop.status == 'pending':
props = self._blob_service.get_blob_properties(container, blob_name)
copy_prop = props.properties.copy
if copy_prop.status == 'success':
return
if copy_prop.status == 'failed' or copy_prop.status == 'aborted':
raise IOError('Copy of blob %s failed with status %s' % (blob_name, copy_props.status))
count = count + 1
if count > _MAX_COPY_POLL_COUNT:
raise IOError('Timed out waiting for copy to complete')
time.sleep(_COPY_POLL_SLEEP)
def copy_to(self, destination, path):
if (self.__class__ == destination.__class__):
logger.debug('Starting copying file from Azure %s to Azure %s via an Azure copy',
self._azure_container, destination)
blob_name = self._blob_name_from_path(path)
copy_source_url = self.get_direct_download_url(path)
copy_prop = self._blob_service.copy_blob(destination._azure_container, blob_name,
copy_source_url)
self._await_copy(destination._azure_container, blob_name, copy_prop)
logger.debug('Finished copying file from Azure %s to Azure %s via an Azure copy',
self._azure_container, destination)
return
# Fallback to a slower, default copy.
logger.debug('Copying file from Azure container %s to %s via a streamed copy',
self._azure_container, destination)
with self.stream_read_file(path) as fp:
destination.stream_write(path, fp)
def setup(self):
# From: https://docs.microsoft.com/en-us/rest/api/storageservices/cross-origin-resource-sharing--cors--support-for-the-azure-storage-services
cors = [CorsRule(allowed_origins='*', allowed_methods=['GET', 'PUT'], max_age_in_seconds=3000,
exposed_headers=['x-ms-meta-*'],
allowed_headers=['x-ms-meta-data*', 'x-ms-meta-target*', 'x-ms-meta-abc',
'Content-Type'])]
self._blob_service.set_blob_service_properties(cors=cors)

217
storage/test/test_azure.py Normal file
View file

@ -0,0 +1,217 @@
import base64
import md5
import pytest
import io
from contextlib import contextmanager
from urlparse import parse_qs, urlparse
from httmock import urlmatch, HTTMock
from xml.dom import minidom
from azure.storage.blob import BlockBlobService
from storage.azurestorage import AzureStorage
@contextmanager
def fake_azure_storage(files=None):
service = BlockBlobService(is_emulated=True)
endpoint = service.primary_endpoint.split('/')
container_name = 'somecontainer'
files = files if files is not None else {}
container_prefix = '/' + endpoint[1] + '/' + container_name
@urlmatch(netloc=endpoint[0], path=container_prefix + '$')
def get_container(url, request):
return {'status_code': 200, 'content': '{}'}
@urlmatch(netloc=endpoint[0], path=container_prefix + '/.+')
def container_file(url, request):
filename = url.path[len(container_prefix)+1:]
if request.method == 'GET' or request.method == 'HEAD':
return {
'status_code': 200 if filename in files else 404,
'content': files.get(filename),
'headers': {
'ETag': 'foobar',
},
}
if request.method == 'DELETE':
files.pop(filename)
return {
'status_code': 201,
'content': '',
'headers': {
'ETag': 'foobar',
},
}
if request.method == 'PUT':
query_params = parse_qs(url.query)
if query_params.get('comp') == ['properties']:
return {
'status_code': 201,
'content': '{}',
'headers': {
'x-ms-request-server-encrypted': False,
'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT',
}
}
if query_params.get('comp') == ['block']:
block_id = query_params['blockid'][0]
files[filename] = files.get(filename) or {}
files[filename][block_id] = request.body
return {
'status_code': 201,
'content': '{}',
'headers': {
'Content-MD5': base64.b64encode(md5.new(request.body).digest()),
'ETag': 'foo',
'x-ms-request-server-encrypted': False,
'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT',
}
}
if query_params.get('comp') == ['blocklist']:
parsed = minidom.parseString(request.body)
latest = parsed.getElementsByTagName('Latest')
combined = []
for latest_block in latest:
combined.append(files[filename][latest_block.childNodes[0].data])
files[filename] = ''.join(combined)
return {
'status_code': 201,
'content': '{}',
'headers': {
'Content-MD5': base64.b64encode(md5.new(files[filename]).digest()),
'ETag': 'foo',
'x-ms-request-server-encrypted': False,
'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT',
}
}
if request.headers.get('x-ms-copy-source'):
copy_source = request.headers['x-ms-copy-source']
copy_path = urlparse(copy_source).path[len(container_prefix) + 1:]
files[filename] = files[copy_path]
return {
'status_code': 201,
'content': '{}',
'headers': {
'x-ms-request-server-encrypted': False,
'x-ms-copy-status': 'success',
'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT',
}
}
files[filename] = request.body
return {
'status_code': 201,
'content': '{}',
'headers': {
'Content-MD5': base64.b64encode(md5.new(request.body).digest()),
'ETag': 'foo',
'x-ms-request-server-encrypted': False,
'last-modified': 'Wed, 21 Oct 2015 07:28:00 GMT',
}
}
return {'status_code': 405, 'content': ''}
@urlmatch(netloc=endpoint[0], path='.+')
def catchall(url, request):
return {'status_code': 405, 'content': ''}
with HTTMock(get_container, container_file, catchall):
yield AzureStorage(None, 'somecontainer', '', 'someaccount', is_emulated=True)
def test_validate():
with fake_azure_storage() as s:
s.validate(None)
def test_basics():
with fake_azure_storage() as s:
s.put_content('hello', 'hello world')
assert s.exists('hello')
assert s.get_content('hello') == 'hello world'
assert s.get_checksum('hello')
assert ''.join(list(s.stream_read('hello'))) == 'hello world'
assert s.stream_read_file('hello').read() == 'hello world'
s.remove('hello')
assert not s.exists('hello')
def test_does_not_exist():
with fake_azure_storage() as s:
assert not s.exists('hello')
with pytest.raises(IOError):
s.get_content('hello')
with pytest.raises(IOError):
s.get_checksum('hello')
with pytest.raises(IOError):
list(s.stream_read('hello'))
with pytest.raises(IOError):
s.stream_read_file('hello')
def test_stream_write():
fp = io.BytesIO()
fp.write('hello world!')
fp.seek(0)
with fake_azure_storage() as s:
s.stream_write('hello', fp)
assert s.get_content('hello') == 'hello world!'
@pytest.mark.parametrize('chunk_size', [
(1),
(5),
(10),
])
def test_chunked_uploading(chunk_size):
with fake_azure_storage() as s:
string_data = 'hello world!'
chunks = [string_data[index:index+chunk_size] for index in range(0, len(string_data), chunk_size)]
uuid, metadata = s.initiate_chunked_upload()
start_index = 0
for chunk in chunks:
fp = io.BytesIO()
fp.write(chunk)
fp.seek(0)
total_bytes_written, metadata, error = s.stream_upload_chunk(uuid, start_index, -1, fp,
metadata)
assert total_bytes_written == len(chunk)
assert metadata
assert not error
start_index += total_bytes_written
s.complete_chunked_upload(uuid, 'chunked', metadata)
assert s.get_content('chunked') == string_data
def test_get_direct_download_url():
with fake_azure_storage() as s:
s.put_content('hello', 'world')
assert 'sig' in s.get_direct_download_url('hello')
def test_copy_to():
files = {}
with fake_azure_storage(files=files) as s:
s.put_content('hello', 'hello world')
with fake_azure_storage(files=files) as s2:
s.copy_to(s2, 'hello')
assert s2.exists('hello')

View file

@ -156,5 +156,12 @@ class LimitingStream(StreamSlice):
of bytes. All calls after that limit (if specified) will act as if the file has no additional
data.
"""
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
def __init__(self, fileobj, read_limit=READ_UNTIL_END, seekable=True):
super(LimitingStream, self).__init__(fileobj, 0, read_limit)
self._seekable = seekable
def seek(self, index, whence=WHENCE_ABSOLUTE):
if not self._seekable:
raise AttributeError
super(LimitingStream, self).seek(index, whence)