Add support for Google Cloud Storage.

This commit is contained in:
Jake Moshenko 2014-08-12 02:06:44 -04:00
parent b9c6c4c2f2
commit 29f1b048a3
3 changed files with 72 additions and 40 deletions

View file

@ -37,6 +37,9 @@ class SocketReader(object):
handler(buf) handler(buf)
return buf return buf
def tell(self):
raise IOError('Stream is not seekable.')
def image_is_uploading(repo_image): def image_is_uploading(repo_image):
if repo_image is None: if repo_image is None:

View file

@ -1,9 +1,16 @@
from storage.local import LocalStorage from storage.local import LocalStorage
from storage.s3 import S3Storage from storage.cloud import S3Storage, GoogleCloudStorage
from storage.fakestorage import FakeStorage from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage from storage.distributedstorage import DistributedStorage
STORAGE_DRIVER_CLASSES = {
'LocalStorage': LocalStorage,
'S3Storage': S3Storage,
'GoogleCloudStorage': GoogleCloudStorage,
}
class Storage(object): class Storage(object):
def __init__(self, app=None): def __init__(self, app=None):
self.app = app self.app = app
@ -18,13 +25,8 @@ class Storage(object):
driver = storage_params[0] driver = storage_params[0]
parameters = storage_params[1] parameters = storage_params[1]
if driver == 'LocalStorage': driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
storage = LocalStorage(**parameters) storage = driver_class(**parameters)
elif driver == 'S3Storage':
storage = S3Storage(**parameters)
else:
storage = FakeStorage()
storages[location] = storage storages[location] = storage
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)

View file

@ -3,7 +3,9 @@ import os
import logging import logging
import boto.s3.connection import boto.s3.connection
import boto.gs.connection
import boto.s3.key import boto.s3.key
import boto.gs.key
from storage.basestorage import BaseStorage from storage.basestorage import BaseStorage
@ -32,22 +34,24 @@ class StreamReadKeyAsFile(object):
return resp return resp
class S3Storage(BaseStorage): class _CloudStorage(BaseStorage):
def __init__(self, connection_class, key_class, upload_params, storage_path, access_key,
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): secret_key, bucket_name):
self._initialized = False self._initialized = False
self._bucket_name = s3_bucket self._bucket_name = bucket_name
self._access_key = s3_access_key self._access_key = access_key
self._secret_key = s3_secret_key self._secret_key = secret_key
self._root_path = storage_path self._root_path = storage_path
self._s3_conn = None self._connection_class = connection_class
self._s3_bucket = None self._key_class = key_class
self._upload_params = upload_params
self._cloud_conn = None
self._cloud_bucket = None
def _initialize_s3(self): def _initialize_cloud_conn(self):
if not self._initialized: if not self._initialized:
self._s3_conn = boto.s3.connection.S3Connection(self._access_key, self._cloud_conn = self._connection_class(self._access_key, self._secret_key)
self._secret_key) self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
self._s3_bucket = self._s3_conn.get_bucket(self._bucket_name)
self._initialized = True self._initialized = True
def _debug_key(self, key): def _debug_key(self, key):
@ -69,33 +73,33 @@ class S3Storage(BaseStorage):
return path return path
def get_content(self, path): def get_content(self, path):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
if not key.exists(): if not key.exists():
raise IOError('No such key: \'{0}\''.format(path)) raise IOError('No such key: \'{0}\''.format(path))
return key.get_contents_as_string() return key.get_contents_as_string()
def put_content(self, path, content): def put_content(self, path, content):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
key.set_contents_from_string(content, encrypt_key=True) key.set_contents_from_string(content, **self._upload_params)
return path return path
def get_supports_resumeable_downloads(self): def get_supports_resumeable_downloads(self):
return True return True
def get_direct_download_url(self, path, expires_in=60): def get_direct_download_url(self, path, expires_in=60):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
k = boto.s3.key.Key(self._s3_bucket, path) k = self._key_class(self._cloud_bucket, path)
return k.generate_url(expires_in) return k.generate_url(expires_in)
def stream_read(self, path): def stream_read(self, path):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
if not key.exists(): if not key.exists():
raise IOError('No such key: \'{0}\''.format(path)) raise IOError('No such key: \'{0}\''.format(path))
while True: while True:
@ -105,21 +109,21 @@ class S3Storage(BaseStorage):
yield buf yield buf
def stream_read_file(self, path): def stream_read_file(self, path):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
if not key.exists(): if not key.exists():
raise IOError('No such key: \'{0}\''.format(path)) raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key) return StreamReadKeyAsFile(key)
def stream_write(self, path, fp): def stream_write(self, path, fp):
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_s3() self._initialize_cloud_conn()
buffer_size = 5 * 1024 * 1024 buffer_size = 5 * 1024 * 1024
if self.buffer_size > buffer_size: if self.buffer_size > buffer_size:
buffer_size = self.buffer_size buffer_size = self.buffer_size
path = self._init_path(path) path = self._init_path(path)
mp = self._s3_bucket.initiate_multipart_upload(path, encrypt_key=True) mp = self._cloud_bucket.initiate_multipart_upload(path, **self._upload_params)
num_part = 1 num_part = 1
while True: while True:
try: try:
@ -135,7 +139,7 @@ class S3Storage(BaseStorage):
mp.complete_upload() mp.complete_upload()
def list_directory(self, path=None): def list_directory(self, path=None):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
if not path.endswith('/'): if not path.endswith('/'):
path += '/' path += '/'
@ -143,7 +147,7 @@ class S3Storage(BaseStorage):
if self._root_path != '/': if self._root_path != '/':
ln = len(self._root_path) ln = len(self._root_path)
exists = False exists = False
for key in self._s3_bucket.list(prefix=path, delimiter='/'): for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
exists = True exists = True
name = key.name name = key.name
if name.endswith('/'): if name.endswith('/'):
@ -156,15 +160,15 @@ class S3Storage(BaseStorage):
raise OSError('No such directory: \'{0}\''.format(path)) raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path): def exists(self, path):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
return key.exists() return key.exists()
def remove(self, path): def remove(self, path):
self._initialize_s3() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
key = boto.s3.key.Key(self._s3_bucket, path) key = self._key_class(self._cloud_bucket, path)
if key.exists(): if key.exists():
# It's a file # It's a file
key.delete() key.delete()
@ -172,5 +176,28 @@ class S3Storage(BaseStorage):
# We assume it's a directory # We assume it's a directory
if not path.endswith('/'): if not path.endswith('/'):
path += '/' path += '/'
for key in self._s3_bucket.list(prefix=path): for key in self._cloud_bucket.list(prefix=path):
key.delete() key.delete()
class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
upload_params = {
'encrypt_key': True,
}
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
upload_params, storage_path, s3_access_key, s3_secret_key,
s3_bucket)
class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name):
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, {},
storage_path, access_key, secret_key, bucket_name)
def stream_write(self, path, fp):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
key.set_contents_from_stream(fp)