import cStringIO as StringIO
import os
import logging
import copy
import boto.s3.connection
import boto.s3.multipart
import boto.gs.connection
import boto.s3.key
import boto.gs.key
from io import BufferedIOBase
from uuid import uuid4
from collections import namedtuple
from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException
import app
logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_CHUNKS_KEY = 'chunks'
class StreamReadKeyAsFile(BufferedIOBase):
def __init__(self, key):
self._key = key
def read(self, amt=None):
if self.closed:
return None
resp = self._key.read(amt)
return resp
def readable(self):
return True
@property
def closed(self):
return self._key.closed
def close(self):
self._key.close(fast=True)
class _CloudStorage(BaseStorageV2):
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name):
super(_CloudStorage, self).__init__()
self.automatic_chunk_size = 5 * 1024 * 1024
self._initialized = False
self._bucket_name = bucket_name
self._access_key = access_key
self._secret_key = secret_key
self._root_path = storage_path
self._connection_class = connection_class
self._key_class = key_class
self._upload_params = upload_params
self._connect_kwargs = connect_kwargs
self._cloud_conn = None
self._cloud_bucket = None
def _initialize_cloud_conn(self):
if not self._initialized:
self._cloud_conn = self._connection_class(self._access_key, self._secret_key,
**self._connect_kwargs)
self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
self._initialized = True
def _debug_key(self, key):
"""Used for debugging only."""
orig_meth = key.bucket.connection.make_request
def new_meth(*args, **kwargs):
print '#' * 16
print args
print kwargs
print '#' * 16
return orig_meth(*args, **kwargs)
key.bucket.connection.make_request = new_meth
def _init_path(self, path=None):
path = os.path.join(self._root_path, path) if path else self._root_path
if path and path[0] == '/':
return path[1:]
return path
def get_cloud_conn(self):
self._initialize_cloud_conn()
return self._cloud_conn
def get_cloud_bucket(self):
return self._cloud_bucket
def get_content(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
return key.get_contents_as_string()
def put_content(self, path, content):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
key.set_contents_from_string(content, **self._upload_params)
return path
def get_supports_resumable_downloads(self):
return True
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
self._initialize_cloud_conn()
path = self._init_path(path)
k = self._key_class(self._cloud_bucket, path)
return k.generate_url(expires_in)
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True)
return url
def stream_read(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
while True:
buf = key.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key)
def __initiate_multipart_upload(self, path, content_type, content_encoding):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
metadata = {}
if content_type is not None:
metadata['Content-Type'] = content_type
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
app.metric_queue.put('MultipartUploadStart', 1)
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None):
return self._stream_write_internal(path, fp, content_type, content_encoding)
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO()
num_part = 1
total_bytes_written = 0
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
bytes_to_copy = self.automatic_chunk_size
if size != filelike.READ_UNTIL_END:
# We never want to ask for more bytes than our caller has indicated to copy
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
buf.seek(0)
try:
# Stage the bytes into the buffer for use with the multipart upload file API
bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy)
if bytes_staged == 0:
break
buf.seek(0)
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged
num_part += 1
except IOError:
app.metric_queue.put('MultipartUploadFailure', 1)
if cancel_on_error:
mp.cancel_upload()
return 0
if total_bytes_written > 0:
app.metric_queue.put('MultipartUploadSuccess', 1)
mp.complete_upload()
return total_bytes_written
def list_directory(self, path=None):
self._initialize_cloud_conn()
path = self._init_path(path)
if not path.endswith('/'):
path += '/'
ln = 0
if self._root_path != '/':
ln = len(self._root_path)
exists = False
for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
exists = True
name = key.name
if name.endswith('/'):
yield name[ln:-1]
else:
yield name[ln:]
if exists is False:
# In order to be compliant with the LocalStorage API. Even though
# S3 does not have a concept of folders.
raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
return key.exists()
def remove(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if key.exists():
# It's a file
key.delete()
return
# We assume it's a directory
if not path.endswith('/'):
path += '/'
for key in self._cloud_bucket.list(prefix=path):
key.delete()
def get_checksum(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
k = self._cloud_bucket.lookup(key)
if k is None:
raise IOError('No such key: \'{0}\''.format(path))
return k.etag[1:-1][:7]
def copy_to(self, destination, path):
self._initialize_cloud_conn()
# First try to copy directly via boto, but only if the storages are the
# same type, with the same access information.
if (self.__class__ == destination.__class__ and
self._access_key == destination._access_key and
self._secret_key == destination._secret_key):
logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket,
destination._cloud_bucket)
source_path = self._init_path(path)
source_key = self._key_class(self._cloud_bucket, source_path)
destination._initialize_cloud_conn()
dest_path = destination._init_path(path)
source_key.copy(destination._cloud_bucket, dest_path)
return
# Fallback to a slower, default copy.
logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket,
destination)
with self.stream_read_file(path) as fp:
destination.stream_write(path, fp)
def _rel_upload_path(self, uuid):
return 'uploads/{0}'.format(uuid)
def initiate_chunked_upload(self):
self._initialize_cloud_conn()
random_uuid = str(uuid4())
metadata = {
_CHUNKS_KEY: [],
}
return random_uuid, metadata
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
self._initialize_cloud_conn()
# We are going to upload each chunk to a separate key
chunk_path = self._rel_upload_path(str(uuid4()))
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
size=length)
new_metadata = copy.deepcopy(storage_metadata)
# We are only going to track keys to which data was confirmed written
if bytes_written > 0:
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
return bytes_written, new_metadata
def _chunk_generator(self, chunk_list):
for chunk in chunk_list:
yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length)
@staticmethod
def _chunk_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
def _client_side_chunk_join(self, final_path, chunk_list):
# If there's only one chunk, just "move" (copy and delete) the key and call it a day.
if len(chunk_list) == 1:
chunk_path = chunk_list[0].path
# Let the copy raise an exception if it fails.
self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path)
# Attempt to clean up the old chunk.
try:
self._cloud_bucket.delete_key(chunk_path)
except IOError:
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
msg = 'Failed to clean up chunk %s for move of %s'
logger.exception(msg, chunk_path, final_path)
else:
# Concatenate and write all the chunks as one key.
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
self.stream_write(final_path, concatenated)
# Attempt to clean up all the chunks.
for chunk in chunk_list:
try:
self._cloud_bucket.delete_key(chunk.path)
except IOError:
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
msg = 'Failed to clean up chunk %s for reupload of %s'
logger.exception(msg, chunk.path, final_path)
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn()
# Here is where things get interesting: we are going to try to assemble this server side
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
server_side_assembly = True
chunk_list = self._chunk_list_from_metadata(storage_metadata)
for chunk_offset, chunk in enumerate(chunk_list):
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
server_side_assembly = False
break
if server_side_assembly:
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
try:
# Awesome, we can do this completely server side, now we have to start a new multipart
# upload and use copy_part_from_key to set all of the chunks.
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
for chunk_offset, chunk in enumerate(chunk_list):
abs_chunk_path = self._init_path(chunk.path)
part_num = chunk_offset + 1
chunk_end_offset_inclusive = chunk.length - 1
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num,
start=0, end=chunk_end_offset_inclusive)
mpu.complete_upload()
except IOError as ioe:
# Something bad happened, log it and then give up
msg = 'Exception when attempting server-side assembly for: %s'
logger.exception(msg, final_path)
mpu.cancel_upload()
raise ioe
else:
# We are going to turn all of the server side objects into a single file-like stream, and
# pass that to stream_write to chunk and upload the final object.
self._client_side_chunk_join(final_path, chunk_list)
def cancel_chunked_upload(self, uuid, storage_metadata):
self._initialize_cloud_conn()
# We have to go through and delete all of the uploaded chunks
for chunk in self._chunk_list_from_metadata(storage_metadata):
self.remove(chunk.path)
class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
upload_params = {
'encrypt_key': True,
}
connect_kwargs = {}
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, s3_access_key,
s3_secret_key, s3_bucket)
def setup(self):
self.get_cloud_bucket().set_cors_xml("""
*
GET
3000
Authorization
*
PUT
3000
Content-Type
x-amz-acl
origin
""")
class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name)
def setup(self):
self.get_cloud_bucket().set_cors_xml("""
*
GET
PUT
Content-Type
3000
""")
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if content_type is not None:
key.set_metadata('Content-Type', content_type)
if content_encoding is not None:
key.set_metadata('Content-Encoding', content_encoding)
if size != filelike.READ_UNTIL_END:
fp = filelike.StreamSlice(fp, 0, size)
# TODO figure out how to handle cancel_on_error=False
key.set_contents_from_stream(fp)
return key.size
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn()
# Boto does not support GCS's multipart upload API because it differs from S3, so
# we are forced to join it all locally and then reupload.
# See https://github.com/boto/boto/issues/3355
chunk_list = self._chunk_list_from_metadata(storage_metadata)
self._client_side_chunk_join(final_path, chunk_list)
class RadosGWStorage(_CloudStorage):
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {
'host': hostname,
'is_secure': is_secure,
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
}
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, access_key,
secret_key, bucket_name)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn()
# RadosGW does not support multipart copying from keys, so we are forced to join
# it all locally and then reupload.
# See https://github.com/ceph/ceph/pull/5139
chunk_list = self._chunk_list_from_metadata(storage_metadata)
self._client_side_chunk_join(final_path, chunk_list)