import cStringIO as StringIO import os import logging import boto.s3.connection import boto.gs.connection import boto.s3.key import boto.gs.key from io import BufferedIOBase import app from storage.basestorage import BaseStorage logger = logging.getLogger(__name__) class StreamReadKeyAsFile(BufferedIOBase): def __init__(self, key): self._key = key def read(self, amt=None): if self.closed: return None resp = self._key.read(amt) return resp def readable(self): return True @property def closed(self): return self._key.closed def close(self): self._key.close(fast=True) class _CloudStorage(BaseStorage): def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name): self._initialized = False self._bucket_name = bucket_name self._access_key = access_key self._secret_key = secret_key self._root_path = storage_path self._connection_class = connection_class self._key_class = key_class self._upload_params = upload_params self._connect_kwargs = connect_kwargs self._cloud_conn = None self._cloud_bucket = None def _initialize_cloud_conn(self): if not self._initialized: self._cloud_conn = self._connection_class(self._access_key, self._secret_key, **self._connect_kwargs) self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name) self._initialized = True def _debug_key(self, key): """Used for debugging only.""" orig_meth = key.bucket.connection.make_request def new_meth(*args, **kwargs): print '#' * 16 print args print kwargs print '#' * 16 return orig_meth(*args, **kwargs) key.bucket.connection.make_request = new_meth def _init_path(self, path=None): path = os.path.join(self._root_path, path) if path else self._root_path if path and path[0] == '/': return path[1:] return path def get_cloud_conn(self): self._initialize_cloud_conn() return self._cloud_conn def get_cloud_bucket(self): return self._cloud_bucket def get_content(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return key.get_contents_as_string() def put_content(self, path, content): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) key.set_contents_from_string(content, **self._upload_params) return path def get_supports_resumable_downloads(self): return True def get_direct_download_url(self, path, expires_in=60, requires_cors=False): self._initialize_cloud_conn() path = self._init_path(path) k = self._key_class(self._cloud_bucket, path) return k.generate_url(expires_in) def get_direct_upload_url(self, path, mime_type, requires_cors=True): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True) return url def stream_read(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) while True: buf = key.read(self.buffer_size) if not buf: break yield buf def stream_read_file(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() buffer_size = 5 * 1024 * 1024 if self.buffer_size > buffer_size: buffer_size = self.buffer_size path = self._init_path(path) metadata = {} if content_type is not None: metadata['Content-Type'] = content_type if content_encoding is not None: metadata['Content-Encoding'] = content_encoding mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) app.metric_queue.put('MultipartUploadStart', 1) num_part = 1 while True: try: buf = fp.read(buffer_size) if not buf: break io = StringIO.StringIO(buf) mp.upload_part_from_file(io, num_part) num_part += 1 io.close() except IOError: app.metric_queue.put('MultipartUploadFailure', 1) mp.cancel_upload() raise app.metric_queue.put('MultipartUploadSuccess', 1) mp.complete_upload() def list_directory(self, path=None): self._initialize_cloud_conn() path = self._init_path(path) if not path.endswith('/'): path += '/' ln = 0 if self._root_path != '/': ln = len(self._root_path) exists = False for key in self._cloud_bucket.list(prefix=path, delimiter='/'): exists = True name = key.name if name.endswith('/'): yield name[ln:-1] else: yield name[ln:] if exists is False: # In order to be compliant with the LocalStorage API. Even though # S3 does not have a concept of folders. raise OSError('No such directory: \'{0}\''.format(path)) def exists(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) return key.exists() def remove(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) if key.exists(): # It's a file key.delete() return # We assume it's a directory if not path.endswith('/'): path += '/' for key in self._cloud_bucket.list(prefix=path): key.delete() def get_checksum(self, path): self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) k = self._cloud_bucket.lookup(key) if k is None: raise IOError('No such key: \'{0}\''.format(path)) return k.etag[1:-1][:7] def copy_to(self, destination, path): # First try to copy directly via boto, but only if the storages are the # same type, with the same access information. if (self.__class__ == destination.__class__ and self._access_key == destination._access_key and self._secret_key == destination._secret_key): logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket, destination._cloud_bucket) source_path = self._init_path(path) source_key = self._key_class(self._cloud_bucket, source_path) dest_path = destination._init_path(path) source_key.copy(destination._cloud_bucket, dest_path) return # Fallback to a slower, default copy. logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket, destination) with self.stream_read_file(path) as fp: destination.stream_write(path, fp) class S3Storage(_CloudStorage): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): upload_params = { 'encrypt_key': True, } connect_kwargs = {} super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, connect_kwargs, upload_params, storage_path, s3_access_key, s3_secret_key, s3_bucket) def setup(self): self.get_cloud_bucket().set_cors_xml(""" * GET 3000 Authorization * PUT 3000 Content-Type x-amz-acl origin """) class GoogleCloudStorage(_CloudStorage): def __init__(self, storage_path, access_key, secret_key, bucket_name): upload_params = {} connect_kwargs = {} super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name) def setup(self): self.get_cloud_bucket().set_cors_xml(""" * GET PUT Content-Type 3000 """) def stream_write(self, path, fp, content_type=None, content_encoding=None): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) if content_type is not None: key.set_metadata('Content-Type', content_type) if content_encoding is not None: key.set_metadata('Content-Encoding', content_encoding) key.set_contents_from_stream(fp) class RadosGWStorage(_CloudStorage): def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name): upload_params = {} connect_kwargs = { 'host': hostname, 'is_secure': is_secure, 'calling_format': boto.s3.connection.OrdinaryCallingFormat(), } super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name) # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 def get_direct_download_url(self, path, expires_in=60, requires_cors=False): if requires_cors: return None return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors) # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 def get_direct_upload_url(self, path, mime_type, requires_cors=True): if requires_cors: return None return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)