RadosGW doesn't support server-side copy of keys into multipart, so we have to always join it on the local side.
489 lines
17 KiB
Python
489 lines
17 KiB
Python
import cStringIO as StringIO
|
|
import os
|
|
import logging
|
|
import copy
|
|
|
|
import boto.s3.connection
|
|
import boto.s3.multipart
|
|
import boto.gs.connection
|
|
import boto.s3.key
|
|
import boto.gs.key
|
|
|
|
from io import BufferedIOBase
|
|
from uuid import uuid4
|
|
from collections import namedtuple
|
|
|
|
from util.registry import filelike
|
|
from storage.basestorage import BaseStorageV2, InvalidChunkException
|
|
import app
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
|
|
_CHUNKS_KEY = 'chunks'
|
|
|
|
|
|
class StreamReadKeyAsFile(BufferedIOBase):
|
|
def __init__(self, key):
|
|
self._key = key
|
|
|
|
def read(self, amt=None):
|
|
if self.closed:
|
|
return None
|
|
|
|
resp = self._key.read(amt)
|
|
return resp
|
|
|
|
def readable(self):
|
|
return True
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._key.closed
|
|
|
|
def close(self):
|
|
self._key.close(fast=True)
|
|
|
|
|
|
class _CloudStorage(BaseStorageV2):
|
|
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
|
|
access_key, secret_key, bucket_name):
|
|
super(_CloudStorage, self).__init__()
|
|
|
|
self.automatic_chunk_size = 5 * 1024 * 1024
|
|
|
|
self._initialized = False
|
|
self._bucket_name = bucket_name
|
|
self._access_key = access_key
|
|
self._secret_key = secret_key
|
|
self._root_path = storage_path
|
|
self._connection_class = connection_class
|
|
self._key_class = key_class
|
|
self._upload_params = upload_params
|
|
self._connect_kwargs = connect_kwargs
|
|
self._cloud_conn = None
|
|
self._cloud_bucket = None
|
|
|
|
def _initialize_cloud_conn(self):
|
|
if not self._initialized:
|
|
self._cloud_conn = self._connection_class(self._access_key, self._secret_key,
|
|
**self._connect_kwargs)
|
|
self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
|
|
self._initialized = True
|
|
|
|
def _debug_key(self, key):
|
|
"""Used for debugging only."""
|
|
orig_meth = key.bucket.connection.make_request
|
|
|
|
def new_meth(*args, **kwargs):
|
|
print '#' * 16
|
|
print args
|
|
print kwargs
|
|
print '#' * 16
|
|
return orig_meth(*args, **kwargs)
|
|
key.bucket.connection.make_request = new_meth
|
|
|
|
def _init_path(self, path=None):
|
|
path = os.path.join(self._root_path, path) if path else self._root_path
|
|
if path and path[0] == '/':
|
|
return path[1:]
|
|
return path
|
|
|
|
def get_cloud_conn(self):
|
|
self._initialize_cloud_conn()
|
|
return self._cloud_conn
|
|
|
|
def get_cloud_bucket(self):
|
|
return self._cloud_bucket
|
|
|
|
def get_content(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
return key.get_contents_as_string()
|
|
|
|
def put_content(self, path, content):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
key.set_contents_from_string(content, **self._upload_params)
|
|
return path
|
|
|
|
def get_supports_resumable_downloads(self):
|
|
return True
|
|
|
|
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
k = self._key_class(self._cloud_bucket, path)
|
|
return k.generate_url(expires_in)
|
|
|
|
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True)
|
|
return url
|
|
|
|
def stream_read(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
while True:
|
|
buf = key.read(self.buffer_size)
|
|
if not buf:
|
|
break
|
|
yield buf
|
|
|
|
def stream_read_file(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
return StreamReadKeyAsFile(key)
|
|
|
|
def __initiate_multipart_upload(self, path, content_type, content_encoding):
|
|
# Minimum size of upload part size on S3 is 5MB
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
|
|
metadata = {}
|
|
if content_type is not None:
|
|
metadata['Content-Type'] = content_type
|
|
|
|
if content_encoding is not None:
|
|
metadata['Content-Encoding'] = content_encoding
|
|
|
|
app.metric_queue.put('MultipartUploadStart', 1)
|
|
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
|
**self._upload_params)
|
|
|
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
|
return self._stream_write_internal(path, fp, content_type, content_encoding)
|
|
|
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
|
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
|
|
|
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
|
buf = StringIO.StringIO()
|
|
|
|
num_part = 1
|
|
total_bytes_written = 0
|
|
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
|
|
bytes_to_copy = self.automatic_chunk_size
|
|
if size != filelike.READ_UNTIL_END:
|
|
# We never want to ask for more bytes than our caller has indicated to copy
|
|
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
|
|
|
|
buf.seek(0)
|
|
try:
|
|
# Stage the bytes into the buffer for use with the multipart upload file API
|
|
bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy)
|
|
if bytes_staged == 0:
|
|
break
|
|
|
|
buf.seek(0)
|
|
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
|
total_bytes_written += bytes_staged
|
|
num_part += 1
|
|
except IOError:
|
|
app.metric_queue.put('MultipartUploadFailure', 1)
|
|
if cancel_on_error:
|
|
mp.cancel_upload()
|
|
return 0
|
|
|
|
if total_bytes_written > 0:
|
|
app.metric_queue.put('MultipartUploadSuccess', 1)
|
|
mp.complete_upload()
|
|
return total_bytes_written
|
|
|
|
def list_directory(self, path=None):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
if not path.endswith('/'):
|
|
path += '/'
|
|
ln = 0
|
|
if self._root_path != '/':
|
|
ln = len(self._root_path)
|
|
exists = False
|
|
for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
|
|
exists = True
|
|
name = key.name
|
|
if name.endswith('/'):
|
|
yield name[ln:-1]
|
|
else:
|
|
yield name[ln:]
|
|
if exists is False:
|
|
# In order to be compliant with the LocalStorage API. Even though
|
|
# S3 does not have a concept of folders.
|
|
raise OSError('No such directory: \'{0}\''.format(path))
|
|
|
|
def exists(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
return key.exists()
|
|
|
|
def remove(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if key.exists():
|
|
# It's a file
|
|
key.delete()
|
|
return
|
|
# We assume it's a directory
|
|
if not path.endswith('/'):
|
|
path += '/'
|
|
for key in self._cloud_bucket.list(prefix=path):
|
|
key.delete()
|
|
|
|
def get_checksum(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
k = self._cloud_bucket.lookup(key)
|
|
if k is None:
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
|
|
return k.etag[1:-1][:7]
|
|
|
|
def copy_to(self, destination, path):
|
|
self._initialize_cloud_conn()
|
|
|
|
# First try to copy directly via boto, but only if the storages are the
|
|
# same type, with the same access information.
|
|
if (self.__class__ == destination.__class__ and
|
|
self._access_key == destination._access_key and
|
|
self._secret_key == destination._secret_key):
|
|
logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket,
|
|
destination._cloud_bucket)
|
|
|
|
source_path = self._init_path(path)
|
|
source_key = self._key_class(self._cloud_bucket, source_path)
|
|
|
|
destination._initialize_cloud_conn()
|
|
dest_path = destination._init_path(path)
|
|
source_key.copy(destination._cloud_bucket, dest_path)
|
|
return
|
|
|
|
# Fallback to a slower, default copy.
|
|
logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket,
|
|
destination)
|
|
with self.stream_read_file(path) as fp:
|
|
destination.stream_write(path, fp)
|
|
|
|
def _rel_upload_path(self, uuid):
|
|
return 'uploads/{0}'.format(uuid)
|
|
|
|
def initiate_chunked_upload(self):
|
|
self._initialize_cloud_conn()
|
|
random_uuid = str(uuid4())
|
|
|
|
metadata = {
|
|
_CHUNKS_KEY: [],
|
|
}
|
|
|
|
return random_uuid, metadata
|
|
|
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
|
|
self._initialize_cloud_conn()
|
|
|
|
# We are going to upload each chunk to a separate key
|
|
chunk_path = self._rel_upload_path(str(uuid4()))
|
|
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
|
|
size=length)
|
|
|
|
new_metadata = copy.deepcopy(storage_metadata)
|
|
|
|
# We are only going to track keys to which data was confirmed written
|
|
if bytes_written > 0:
|
|
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
|
|
|
return bytes_written, new_metadata
|
|
|
|
def _chunk_generator(self, chunk_list):
|
|
for chunk in chunk_list:
|
|
yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length)
|
|
|
|
@staticmethod
|
|
def _chunk_list_from_metadata(storage_metadata):
|
|
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
|
|
|
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
|
self._initialize_cloud_conn()
|
|
|
|
# Here is where things get interesting: we are going to try to assemble this server side
|
|
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
|
|
server_side_assembly = True
|
|
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
|
for chunk_offset, chunk in enumerate(chunk_list):
|
|
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
|
|
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
|
server_side_assembly = False
|
|
break
|
|
|
|
if server_side_assembly:
|
|
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
|
|
try:
|
|
# Awesome, we can do this completely server side, now we have to start a new multipart
|
|
# upload and use copy_part_from_key to set all of the chunks.
|
|
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
|
|
|
|
for chunk_offset, chunk in enumerate(chunk_list):
|
|
abs_chunk_path = self._init_path(chunk.path)
|
|
part_num = chunk_offset + 1
|
|
chunk_end_offset_inclusive = chunk.length - 1
|
|
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num,
|
|
start=0, end=chunk_end_offset_inclusive)
|
|
mpu.complete_upload()
|
|
|
|
except IOError as ioe:
|
|
# Something bad happened, log it and then give up
|
|
msg = 'Exception when attempting server-side assembly for: %s'
|
|
logger.exception(msg, final_path)
|
|
mpu.cancel_upload()
|
|
raise ioe
|
|
|
|
else:
|
|
# We are going to turn all of the server side objects into a single file-like stream, and
|
|
# pass that to stream_write to chunk and upload the final object.
|
|
logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path)
|
|
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
|
|
self.stream_write(final_path, concatenated)
|
|
|
|
|
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
|
self._initialize_cloud_conn()
|
|
|
|
# We have to go through and delete all of the uploaded chunks
|
|
for chunk in self._chunk_list_from_metadata(storage_metadata):
|
|
self.remove(chunk.path)
|
|
|
|
|
|
class S3Storage(_CloudStorage):
|
|
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
|
|
upload_params = {
|
|
'encrypt_key': True,
|
|
}
|
|
connect_kwargs = {}
|
|
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
|
connect_kwargs, upload_params, storage_path, s3_access_key,
|
|
s3_secret_key, s3_bucket)
|
|
|
|
def setup(self):
|
|
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
|
<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
|
<CORSRule>
|
|
<AllowedOrigin>*</AllowedOrigin>
|
|
<AllowedMethod>GET</AllowedMethod>
|
|
<MaxAgeSeconds>3000</MaxAgeSeconds>
|
|
<AllowedHeader>Authorization</AllowedHeader>
|
|
</CORSRule>
|
|
<CORSRule>
|
|
<AllowedOrigin>*</AllowedOrigin>
|
|
<AllowedMethod>PUT</AllowedMethod>
|
|
<MaxAgeSeconds>3000</MaxAgeSeconds>
|
|
<AllowedHeader>Content-Type</AllowedHeader>
|
|
<AllowedHeader>x-amz-acl</AllowedHeader>
|
|
<AllowedHeader>origin</AllowedHeader>
|
|
</CORSRule>
|
|
</CORSConfiguration>""")
|
|
|
|
class GoogleCloudStorage(_CloudStorage):
|
|
def __init__(self, storage_path, access_key, secret_key, bucket_name):
|
|
upload_params = {}
|
|
connect_kwargs = {}
|
|
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
|
|
connect_kwargs, upload_params, storage_path,
|
|
access_key, secret_key, bucket_name)
|
|
|
|
def setup(self):
|
|
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
|
<CorsConfig>
|
|
<Cors>
|
|
<Origins>
|
|
<Origin>*</Origin>
|
|
</Origins>
|
|
<Methods>
|
|
<Method>GET</Method>
|
|
<Method>PUT</Method>
|
|
</Methods>
|
|
<ResponseHeaders>
|
|
<ResponseHeader>Content-Type</ResponseHeader>
|
|
</ResponseHeaders>
|
|
<MaxAgeSec>3000</MaxAgeSec>
|
|
</Cors>
|
|
</CorsConfig>""")
|
|
|
|
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
|
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
|
# Minimum size of upload part size on S3 is 5MB
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
|
|
if content_type is not None:
|
|
key.set_metadata('Content-Type', content_type)
|
|
|
|
if content_encoding is not None:
|
|
key.set_metadata('Content-Encoding', content_encoding)
|
|
|
|
if size != filelike.READ_UNTIL_END:
|
|
fp = filelike.StreamSlice(fp, 0, size)
|
|
|
|
# TODO figure out how to handle cancel_on_error=False
|
|
key.set_contents_from_stream(fp)
|
|
|
|
return key.size
|
|
|
|
|
|
class RadosGWStorage(_CloudStorage):
|
|
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
|
|
upload_params = {}
|
|
connect_kwargs = {
|
|
'host': hostname,
|
|
'is_secure': is_secure,
|
|
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
|
|
}
|
|
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
|
connect_kwargs, upload_params, storage_path, access_key,
|
|
secret_key, bucket_name)
|
|
|
|
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
|
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
|
if requires_cors:
|
|
return None
|
|
|
|
return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors)
|
|
|
|
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
|
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
|
|
if requires_cors:
|
|
return None
|
|
|
|
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
|
|
|
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
|
self._initialize_cloud_conn()
|
|
|
|
# RadosGW does not support multipart copying from keys, so are forced to join
|
|
# it all locally and then reupload.
|
|
# See https://github.com/ceph/ceph/pull/5139
|
|
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
|
if len(chunk_list) == 1:
|
|
# If there's only one chunk, just "move" the key and call it a day.
|
|
chunk_path = chunk_list[0].path
|
|
self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path)
|
|
self._cloud_bucket.delete_key(chunk_path)
|
|
return
|
|
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
|
|
self.stream_write(final_path, concatenated)
|