From 29f1b048a31e102fcce6e0149cda820b561062e2 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Tue, 12 Aug 2014 02:06:44 -0400 Subject: [PATCH] Add support for Google Cloud Storage. --- endpoints/registry.py | 3 ++ storage/__init__.py | 18 ++++---- storage/{s3.py => cloud.py} | 91 ++++++++++++++++++++++++------------- 3 files changed, 72 insertions(+), 40 deletions(-) rename storage/{s3.py => cloud.py} (59%) diff --git a/endpoints/registry.py b/endpoints/registry.py index 19eedf1ce..47d4f65ac 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -37,6 +37,9 @@ class SocketReader(object): handler(buf) return buf + def tell(self): + raise IOError('Stream is not seekable.') + def image_is_uploading(repo_image): if repo_image is None: diff --git a/storage/__init__.py b/storage/__init__.py index 57e383f1c..6700dab0b 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -1,9 +1,16 @@ from storage.local import LocalStorage -from storage.s3 import S3Storage +from storage.cloud import S3Storage, GoogleCloudStorage from storage.fakestorage import FakeStorage from storage.distributedstorage import DistributedStorage +STORAGE_DRIVER_CLASSES = { + 'LocalStorage': LocalStorage, + 'S3Storage': S3Storage, + 'GoogleCloudStorage': GoogleCloudStorage, +} + + class Storage(object): def __init__(self, app=None): self.app = app @@ -18,13 +25,8 @@ class Storage(object): driver = storage_params[0] parameters = storage_params[1] - if driver == 'LocalStorage': - storage = LocalStorage(**parameters) - elif driver == 'S3Storage': - storage = S3Storage(**parameters) - else: - storage = FakeStorage() - + driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) + storage = driver_class(**parameters) storages[location] = storage preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) diff --git a/storage/s3.py b/storage/cloud.py similarity index 59% rename from storage/s3.py rename to storage/cloud.py index 12c2373de..d64415410 100644 --- a/storage/s3.py +++ b/storage/cloud.py @@ -3,7 +3,9 @@ import os import logging import boto.s3.connection +import boto.gs.connection import boto.s3.key +import boto.gs.key from storage.basestorage import BaseStorage @@ -32,22 +34,24 @@ class StreamReadKeyAsFile(object): return resp -class S3Storage(BaseStorage): - - def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): +class _CloudStorage(BaseStorage): + def __init__(self, connection_class, key_class, upload_params, storage_path, access_key, + secret_key, bucket_name): self._initialized = False - self._bucket_name = s3_bucket - self._access_key = s3_access_key - self._secret_key = s3_secret_key + self._bucket_name = bucket_name + self._access_key = access_key + self._secret_key = secret_key self._root_path = storage_path - self._s3_conn = None - self._s3_bucket = None + self._connection_class = connection_class + self._key_class = key_class + self._upload_params = upload_params + self._cloud_conn = None + self._cloud_bucket = None - def _initialize_s3(self): + def _initialize_cloud_conn(self): if not self._initialized: - self._s3_conn = boto.s3.connection.S3Connection(self._access_key, - self._secret_key) - self._s3_bucket = self._s3_conn.get_bucket(self._bucket_name) + self._cloud_conn = self._connection_class(self._access_key, self._secret_key) + self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name) self._initialized = True def _debug_key(self, key): @@ -69,33 +73,33 @@ class S3Storage(BaseStorage): return path def get_content(self, path): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) + key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return key.get_contents_as_string() def put_content(self, path, content): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) - key.set_contents_from_string(content, encrypt_key=True) + key = self._key_class(self._cloud_bucket, path) + key.set_contents_from_string(content, **self._upload_params) return path def get_supports_resumeable_downloads(self): return True def get_direct_download_url(self, path, expires_in=60): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - k = boto.s3.key.Key(self._s3_bucket, path) + k = self._key_class(self._cloud_bucket, path) return k.generate_url(expires_in) def stream_read(self, path): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) + key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) while True: @@ -105,21 +109,21 @@ class S3Storage(BaseStorage): yield buf def stream_read_file(self, path): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) + key = self._key_class(self._cloud_bucket, path) if not key.exists(): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) def stream_write(self, path, fp): # Minimum size of upload part size on S3 is 5MB - self._initialize_s3() + self._initialize_cloud_conn() buffer_size = 5 * 1024 * 1024 if self.buffer_size > buffer_size: buffer_size = self.buffer_size path = self._init_path(path) - mp = self._s3_bucket.initiate_multipart_upload(path, encrypt_key=True) + mp = self._cloud_bucket.initiate_multipart_upload(path, **self._upload_params) num_part = 1 while True: try: @@ -135,7 +139,7 @@ class S3Storage(BaseStorage): mp.complete_upload() def list_directory(self, path=None): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) if not path.endswith('/'): path += '/' @@ -143,7 +147,7 @@ class S3Storage(BaseStorage): if self._root_path != '/': ln = len(self._root_path) exists = False - for key in self._s3_bucket.list(prefix=path, delimiter='/'): + for key in self._cloud_bucket.list(prefix=path, delimiter='/'): exists = True name = key.name if name.endswith('/'): @@ -156,15 +160,15 @@ class S3Storage(BaseStorage): raise OSError('No such directory: \'{0}\''.format(path)) def exists(self, path): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) + key = self._key_class(self._cloud_bucket, path) return key.exists() def remove(self, path): - self._initialize_s3() + self._initialize_cloud_conn() path = self._init_path(path) - key = boto.s3.key.Key(self._s3_bucket, path) + key = self._key_class(self._cloud_bucket, path) if key.exists(): # It's a file key.delete() @@ -172,5 +176,28 @@ class S3Storage(BaseStorage): # We assume it's a directory if not path.endswith('/'): path += '/' - for key in self._s3_bucket.list(prefix=path): + for key in self._cloud_bucket.list(prefix=path): key.delete() + + +class S3Storage(_CloudStorage): + def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): + upload_params = { + 'encrypt_key': True, + } + super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, + upload_params, storage_path, s3_access_key, s3_secret_key, + s3_bucket) + + +class GoogleCloudStorage(_CloudStorage): + def __init__(self, storage_path, access_key, secret_key, bucket_name): + super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, {}, + storage_path, access_key, secret_key, bucket_name) + + def stream_write(self, path, fp): + # Minimum size of upload part size on S3 is 5MB + self._initialize_cloud_conn() + path = self._init_path(path) + key = self._key_class(self._cloud_bucket, path) + key.set_contents_from_stream(fp)