This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/cloud.py

509 lines
18 KiB
Python
Raw Normal View History

2013-09-25 21:50:03 +00:00
import cStringIO as StringIO
import os
import logging
import copy
2013-09-25 21:50:03 +00:00
import boto.s3.connection
import boto.s3.multipart
2014-08-12 06:06:44 +00:00
import boto.gs.connection
2013-09-25 21:50:03 +00:00
import boto.s3.key
2014-08-12 06:06:44 +00:00
import boto.gs.key
2013-09-25 21:50:03 +00:00
from io import BufferedIOBase
from uuid import uuid4
from collections import namedtuple
from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException
2015-08-31 17:48:52 +00:00
import app
2013-09-25 21:50:03 +00:00
logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_CHUNKS_KEY = 'chunks'
class StreamReadKeyAsFile(BufferedIOBase):
def __init__(self, key):
self._key = key
def read(self, amt=None):
if self.closed:
return None
resp = self._key.read(amt)
return resp
def readable(self):
return True
@property
def closed(self):
return self._key.closed
def close(self):
self._key.close(fast=True)
class _CloudStorage(BaseStorageV2):
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name):
super(_CloudStorage, self).__init__()
self.automatic_chunk_size = 5 * 1024 * 1024
self._initialized = False
2014-08-12 06:06:44 +00:00
self._bucket_name = bucket_name
self._access_key = access_key
self._secret_key = secret_key
self._root_path = storage_path
2014-08-12 06:06:44 +00:00
self._connection_class = connection_class
self._key_class = key_class
self._upload_params = upload_params
self._connect_kwargs = connect_kwargs
2014-08-12 06:06:44 +00:00
self._cloud_conn = None
self._cloud_bucket = None
2014-08-12 06:06:44 +00:00
def _initialize_cloud_conn(self):
if not self._initialized:
self._cloud_conn = self._connection_class(self._access_key, self._secret_key,
**self._connect_kwargs)
2014-08-12 06:06:44 +00:00
self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
self._initialized = True
def _debug_key(self, key):
"""Used for debugging only."""
orig_meth = key.bucket.connection.make_request
def new_meth(*args, **kwargs):
print '#' * 16
print args
print kwargs
print '#' * 16
return orig_meth(*args, **kwargs)
key.bucket.connection.make_request = new_meth
def _init_path(self, path=None):
path = os.path.join(self._root_path, path) if path else self._root_path
if path and path[0] == '/':
return path[1:]
return path
def get_cloud_conn(self):
self._initialize_cloud_conn()
return self._cloud_conn
def get_cloud_bucket(self):
return self._cloud_bucket
def get_content(self, path):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
return key.get_contents_as_string()
def put_content(self, path, content):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
key.set_contents_from_string(content, **self._upload_params)
return path
def get_supports_resumable_downloads(self):
return True
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
k = self._key_class(self._cloud_bucket, path)
return k.generate_url(expires_in)
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True)
return url
def stream_read(self, path):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
while True:
buf = key.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
if not key.exists():
raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key)
def __initiate_multipart_upload(self, path, content_type, content_encoding):
# Minimum size of upload part size on S3 is 5MB
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
metadata = {}
if content_type is not None:
metadata['Content-Type'] = content_type
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
2015-08-31 17:48:52 +00:00
app.metric_queue.put('MultipartUploadStart', 1)
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None):
return self._stream_write_internal(path, fp, content_type, content_encoding)
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO()
num_part = 1
total_bytes_written = 0
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
bytes_to_copy = self.automatic_chunk_size
if size != filelike.READ_UNTIL_END:
# We never want to ask for more bytes than our caller has indicated to copy
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
buf.seek(0)
try:
# Stage the bytes into the buffer for use with the multipart upload file API
bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy)
if bytes_staged == 0:
break
buf.seek(0)
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged
num_part += 1
except IOError:
2015-08-31 17:48:52 +00:00
app.metric_queue.put('MultipartUploadFailure', 1)
if cancel_on_error:
mp.cancel_upload()
return 0
if total_bytes_written > 0:
app.metric_queue.put('MultipartUploadSuccess', 1)
mp.complete_upload()
return total_bytes_written
def list_directory(self, path=None):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
if not path.endswith('/'):
path += '/'
ln = 0
if self._root_path != '/':
ln = len(self._root_path)
exists = False
2014-08-12 06:06:44 +00:00
for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
exists = True
name = key.name
if name.endswith('/'):
yield name[ln:-1]
else:
yield name[ln:]
if exists is False:
# In order to be compliant with the LocalStorage API. Even though
# S3 does not have a concept of folders.
raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
return key.exists()
def remove(self, path):
2014-08-12 06:06:44 +00:00
self._initialize_cloud_conn()
path = self._init_path(path)
2014-08-12 06:06:44 +00:00
key = self._key_class(self._cloud_bucket, path)
if key.exists():
# It's a file
key.delete()
return
# We assume it's a directory
if not path.endswith('/'):
path += '/'
2014-08-12 06:06:44 +00:00
for key in self._cloud_bucket.list(prefix=path):
key.delete()
2014-08-12 06:06:44 +00:00
def get_checksum(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
k = self._cloud_bucket.lookup(key)
if k is None:
raise IOError('No such key: \'{0}\''.format(path))
return k.etag[1:-1][:7]
def copy_to(self, destination, path):
2015-09-08 20:55:47 +00:00
self._initialize_cloud_conn()
# First try to copy directly via boto, but only if the storages are the
# same type, with the same access information.
if (self.__class__ == destination.__class__ and
self._access_key == destination._access_key and
self._secret_key == destination._secret_key):
logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket,
destination._cloud_bucket)
source_path = self._init_path(path)
source_key = self._key_class(self._cloud_bucket, source_path)
2015-09-08 20:55:47 +00:00
destination._initialize_cloud_conn()
dest_path = destination._init_path(path)
source_key.copy(destination._cloud_bucket, dest_path)
return
# Fallback to a slower, default copy.
logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket,
destination)
with self.stream_read_file(path) as fp:
destination.stream_write(path, fp)
def _rel_upload_path(self, uuid):
return 'uploads/{0}'.format(uuid)
def initiate_chunked_upload(self):
self._initialize_cloud_conn()
random_uuid = str(uuid4())
metadata = {
_CHUNKS_KEY: [],
}
return random_uuid, metadata
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
self._initialize_cloud_conn()
# We are going to upload each chunk to a separate key
chunk_path = self._rel_upload_path(str(uuid4()))
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
size=length)
new_metadata = copy.deepcopy(storage_metadata)
# We are only going to track keys to which data was confirmed written
if bytes_written > 0:
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
return bytes_written, new_metadata
def _chunk_generator(self, chunk_list):
for chunk in chunk_list:
yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length)
@staticmethod
def _chunk_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
2015-09-30 21:46:22 +00:00
def _client_side_chunk_join(self, final_path, chunk_list):
# If there's only one chunk, just "move" (copy and delete) the key and call it a day.
if len(chunk_list) == 1:
chunk_path = chunk_list[0].path
# Let the copy raise an exception if it fails.
self._cloud_bucket.copy_key(final_path, self._bucket_name, chunk_path)
# Attempt to clean up the old chunk.
try:
self._cloud_bucket.delete_key(chunk_path)
except IOError:
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
msg = 'Failed to clean up chunk %s for move of %s'
logger.exception(msg, chunk_path, final_path)
else:
# Concatenate and write all the chunks as one key.
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
self.stream_write(final_path, concatenated)
# Attempt to clean up all the chunks.
for chunk in chunk_list:
try:
self._cloud_bucket.delete_key(chunk.path)
except IOError:
# We failed to delete a chunk. This sucks, but we shouldn't fail the push.
msg = 'Failed to clean up chunk %s for reupload of %s'
logger.exception(msg, chunk.path, final_path)
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn()
# Here is where things get interesting: we are going to try to assemble this server side
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
server_side_assembly = True
chunk_list = self._chunk_list_from_metadata(storage_metadata)
for chunk_offset, chunk in enumerate(chunk_list):
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
server_side_assembly = False
break
if server_side_assembly:
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
try:
# Awesome, we can do this completely server side, now we have to start a new multipart
# upload and use copy_part_from_key to set all of the chunks.
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
for chunk_offset, chunk in enumerate(chunk_list):
abs_chunk_path = self._init_path(chunk.path)
part_num = chunk_offset + 1
chunk_end_offset_inclusive = chunk.length - 1
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num,
start=0, end=chunk_end_offset_inclusive)
mpu.complete_upload()
except IOError as ioe:
# Something bad happened, log it and then give up
msg = 'Exception when attempting server-side assembly for: %s'
logger.exception(msg, final_path)
mpu.cancel_upload()
raise ioe
else:
# We are going to turn all of the server side objects into a single file-like stream, and
# pass that to stream_write to chunk and upload the final object.
2015-09-30 21:46:22 +00:00
self._client_side_chunk_join(final_path, chunk_list)
def cancel_chunked_upload(self, uuid, storage_metadata):
self._initialize_cloud_conn()
# We have to go through and delete all of the uploaded chunks
for chunk in self._chunk_list_from_metadata(storage_metadata):
self.remove(chunk.path)
2014-08-12 06:06:44 +00:00
class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
upload_params = {
'encrypt_key': True,
}
connect_kwargs = {}
2014-08-12 06:06:44 +00:00
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, s3_access_key,
s3_secret_key, s3_bucket)
2014-08-12 06:06:44 +00:00
def setup(self):
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<CORSRule>
<AllowedOrigin>*</AllowedOrigin>
<AllowedMethod>GET</AllowedMethod>
<MaxAgeSeconds>3000</MaxAgeSeconds>
<AllowedHeader>Authorization</AllowedHeader>
</CORSRule>
<CORSRule>
<AllowedOrigin>*</AllowedOrigin>
<AllowedMethod>PUT</AllowedMethod>
<MaxAgeSeconds>3000</MaxAgeSeconds>
<AllowedHeader>Content-Type</AllowedHeader>
<AllowedHeader>x-amz-acl</AllowedHeader>
<AllowedHeader>origin</AllowedHeader>
</CORSRule>
</CORSConfiguration>""")
2014-08-12 06:06:44 +00:00
class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name)
2014-08-12 06:06:44 +00:00
def setup(self):
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
<CorsConfig>
<Cors>
<Origins>
<Origin>*</Origin>
</Origins>
<Methods>
<Method>GET</Method>
<Method>PUT</Method>
</Methods>
<ResponseHeaders>
<ResponseHeader>Content-Type</ResponseHeader>
</ResponseHeaders>
<MaxAgeSec>3000</MaxAgeSec>
</Cors>
</CorsConfig>""")
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
2014-08-12 06:06:44 +00:00
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
if content_type is not None:
key.set_metadata('Content-Type', content_type)
if content_encoding is not None:
key.set_metadata('Content-Encoding', content_encoding)
if size != filelike.READ_UNTIL_END:
fp = filelike.StreamSlice(fp, 0, size)
# TODO figure out how to handle cancel_on_error=False
2014-08-12 06:06:44 +00:00
key.set_contents_from_stream(fp)
return key.size
class RadosGWStorage(_CloudStorage):
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {
'host': hostname,
'is_secure': is_secure,
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
}
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, access_key,
secret_key, bucket_name)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
self._initialize_cloud_conn()
2015-09-30 21:46:22 +00:00
# RadosGW does not support multipart copying from keys, so we are forced to join
# it all locally and then reupload.
# See https://github.com/ceph/ceph/pull/5139
chunk_list = self._chunk_list_from_metadata(storage_metadata)
2015-09-30 21:46:22 +00:00
self._client_side_chunk_join(final_path, chunk_list)