import cStringIO as StringIO import os import logging import boto.s3.connection import boto.s3.key from storage.basestorage import BaseStorage logger = logging.getLogger(__name__) class StreamReadKeyAsFile(object): def __init__(self, key): self._key = key self._finished = False def __enter__(self): return self def __exit__(self, type, value, tb): self._key.close(fast=True) def read(self, amt=None): if self._finished: return None resp = self._key.read(amt) if not resp: self._finished = True return resp class S3Storage(BaseStorage): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): self._initialized = False self._bucket_name = s3_bucket self._access_key = s3_access_key self._secret_key = s3_secret_key self._root_path = storage_path self._s3_conn = None self._s3_bucket = None def _initialize_s3(self): if not self._initialized: self._s3_conn = boto.s3.connection.S3Connection(self._access_key, self._secret_key) self._s3_bucket = self._s3_conn.get_bucket(self._bucket_name) self._initialized = True def _debug_key(self, key): """Used for debugging only.""" orig_meth = key.bucket.connection.make_request def new_meth(*args, **kwargs): print '#' * 16 print args print kwargs print '#' * 16 return orig_meth(*args, **kwargs) key.bucket.connection.make_request = new_meth def _init_path(self, path=None): path = os.path.join(self._root_path, path) if path else self._root_path if path and path[0] == '/': return path[1:] return path def get_content(self, path): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return key.get_contents_as_string() def put_content(self, path, content): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) key.set_contents_from_string(content, encrypt_key=True) return path def get_supports_resumeable_downloads(self, path): return True def get_direct_download_url(self, path, expires_in=60): self._initialize_s3() path = self._init_path(path) k = boto.s3.key.Key(self._s3_bucket, path) return k.generate_url(expires_in) def stream_read(self, path): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) while True: buf = key.read(self.buffer_size) if not buf: break yield buf def stream_read_file(self, path): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) def stream_write(self, path, fp): # Minimum size of upload part size on S3 is 5MB self._initialize_s3() buffer_size = 5 * 1024 * 1024 if self.buffer_size > buffer_size: buffer_size = self.buffer_size path = self._init_path(path) mp = self._s3_bucket.initiate_multipart_upload(path, encrypt_key=True) num_part = 1 while True: try: buf = fp.read(buffer_size) if not buf: break io = StringIO.StringIO(buf) mp.upload_part_from_file(io, num_part) num_part += 1 io.close() except IOError: break mp.complete_upload() def list_directory(self, path=None): self._initialize_s3() path = self._init_path(path) if not path.endswith('/'): path += '/' ln = 0 if self._root_path != '/': ln = len(self._root_path) exists = False for key in self._s3_bucket.list(prefix=path, delimiter='/'): exists = True name = key.name if name.endswith('/'): yield name[ln:-1] else: yield name[ln:] if exists is False: # In order to be compliant with the LocalStorage API. Even though # S3 does not have a concept of folders. raise OSError('No such directory: \'{0}\''.format(path)) def exists(self, path): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) return key.exists() def remove(self, path): self._initialize_s3() path = self._init_path(path) key = boto.s3.key.Key(self._s3_bucket, path) if key.exists(): # It's a file key.delete() return # We assume it's a directory if not path.endswith('/'): path += '/' for key in self._s3_bucket.list(prefix=path): key.delete()