diff --git a/app.py b/app.py index beb4c027f..7d551189c 100644 --- a/app.py +++ b/app.py @@ -131,7 +131,8 @@ Principal(app, use_sessions=False) avatar = Avatar(app) login_manager = LoginManager(app) mail = Mail(app) -storage = Storage(app) +metric_queue = MetricQueue() +storage = Storage(app, metric_queue) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) analytics = Analytics(app) @@ -142,7 +143,6 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY) userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) -metric_queue = MetricQueue() start_cloudwatch_sender(metric_queue, app) tf = app.config['DB_TRANSACTION_FACTORY'] diff --git a/storage/__init__.py b/storage/__init__.py index 354430603..f78a8f5e7 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -13,27 +13,27 @@ STORAGE_DRIVER_CLASSES = { 'SwiftStorage': SwiftStorage, } -def get_storage_driver(storage_params): +def get_storage_driver(metric_queue, storage_params): """ Returns a storage driver class for the given storage configuration (a pair of string name and a dict of parameters). """ driver = storage_params[0] parameters = storage_params[1] driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) - return driver_class(**parameters) + return driver_class(metric_queue, **parameters) class Storage(object): - def __init__(self, app=None): + def __init__(self, app=None, metric_queue=None): self.app = app - if app is not None: - self.state = self.init_app(app) + if app is not None and metric_queue is not None: + self.state = self.init_app(app, metric_queue) else: self.state = None - def init_app(self, app): + def init_app(self, app, metric_queue): storages = {} for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): - storages[location] = get_storage_driver(storage_params) + storages[location] = get_storage_driver(metric_queue, storage_params) preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) if not preference: diff --git a/storage/cloud.py b/storage/cloud.py index 2f0be6838..e6831d271 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -15,7 +15,6 @@ from collections import namedtuple from util.registry import filelike from storage.basestorage import BaseStorageV2, InvalidChunkException -import app logger = logging.getLogger(__name__) @@ -48,8 +47,8 @@ class StreamReadKeyAsFile(BufferedIOBase): class _CloudStorage(BaseStorageV2): - def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path, - access_key, secret_key, bucket_name): + def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params, + storage_path, access_key, secret_key, bucket_name): super(_CloudStorage, self).__init__() self.automatic_chunk_size = 5 * 1024 * 1024 @@ -65,6 +64,7 @@ class _CloudStorage(BaseStorageV2): self._connect_kwargs = connect_kwargs self._cloud_conn = None self._cloud_bucket = None + self._metric_queue = metric_queue def _initialize_cloud_conn(self): if not self._initialized: @@ -161,7 +161,7 @@ class _CloudStorage(BaseStorageV2): if content_encoding is not None: metadata['Content-Encoding'] = content_encoding - app.metric_queue.put('MultipartUploadStart', 1) + self._metric_queue.put('MultipartUploadStart', 1) return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -198,7 +198,7 @@ class _CloudStorage(BaseStorageV2): except IOError as ex: logger.warn('stream write error: %s', ex) error = ex - app.metric_queue.put('MultipartUploadFailure', 1) + self._metric_queue.put('MultipartUploadFailure', 1) if cancel_on_error: mp.cancel_upload() return 0, error @@ -206,7 +206,7 @@ class _CloudStorage(BaseStorageV2): break if total_bytes_written > 0: - app.metric_queue.put('MultipartUploadSuccess', 1) + self._metric_queue.put('MultipartUploadSuccess', 1) mp.complete_upload() return total_bytes_written, error @@ -380,7 +380,8 @@ class _CloudStorage(BaseStorageV2): class S3Storage(_CloudStorage): - def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket, host=None): + def __init__(self, metric_queue, storage_path, s3_access_key, s3_secret_key, s3_bucket, + host=None): upload_params = { 'encrypt_key': True, } @@ -390,7 +391,7 @@ class S3Storage(_CloudStorage): raise ValueError('host name must not start with http:// or https://') connect_kwargs['host'] = host - super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, + super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key, connect_kwargs, upload_params, storage_path, s3_access_key, s3_secret_key, s3_bucket) @@ -414,12 +415,12 @@ class S3Storage(_CloudStorage): """) class GoogleCloudStorage(_CloudStorage): - def __init__(self, storage_path, access_key, secret_key, bucket_name): + def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name): upload_params = {} connect_kwargs = {} - super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, - connect_kwargs, upload_params, storage_path, - access_key, secret_key, bucket_name) + super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection, + boto.gs.key.Key, connect_kwargs, upload_params, + storage_path, access_key, secret_key, bucket_name) def setup(self): self.get_cloud_bucket().set_cors_xml(""" @@ -474,16 +475,17 @@ class GoogleCloudStorage(_CloudStorage): class RadosGWStorage(_CloudStorage): - def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name): + def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key, + bucket_name): upload_params = {} connect_kwargs = { 'host': hostname, 'is_secure': is_secure, 'calling_format': boto.s3.connection.OrdinaryCallingFormat(), } - super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, - connect_kwargs, upload_params, storage_path, access_key, - secret_key, bucket_name) + super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection, + boto.s3.key.Key, connect_kwargs, upload_params, + storage_path, access_key, secret_key, bucket_name) # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 def get_direct_download_url(self, path, expires_in=60, requires_cors=False): diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 12440a32c..40af6a04c 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -9,6 +9,9 @@ from storage.basestorage import BaseStorageV2 _FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) class FakeStorage(BaseStorageV2): + def __init__(self, metric_queue): + super(FakeStorage, self).__init__() + def _init_path(self, path=None, create=False): return path diff --git a/storage/local.py b/storage/local.py index 7f92d6652..ef515f543 100644 --- a/storage/local.py +++ b/storage/local.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class LocalStorage(BaseStorageV2): - def __init__(self, storage_path): + def __init__(self, metric_queue, storage_path): super(LocalStorage, self).__init__() self._root_path = storage_path diff --git a/storage/swift.py b/storage/swift.py index 2618c6731..e1dbdfe96 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -29,7 +29,7 @@ _SEGMENT_DIRECTORY = 'segments' _MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB class SwiftStorage(BaseStorage): - def __init__(self, swift_container, storage_path, auth_url, swift_user, + def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user, swift_password, auth_version=None, os_options=None, ca_cert_path=None, temp_url_key=None, simple_path_concat=False): super(SwiftStorage, self).__init__() diff --git a/test/test_swift.py b/test/test_swift.py new file mode 100644 index 000000000..83c604cb5 --- /dev/null +++ b/test/test_swift.py @@ -0,0 +1,42 @@ +import unittest + +from storage.swift import SwiftStorage +from mock import MagicMock + + +class TestSwiftStorage(SwiftStorage): + def __init__(self, *args, **kwargs): + super(TestSwiftStorage, self).__init__(*args, **kwargs) + self._connection = MagicMock() + + def _get_connection(self): + return self._connection + + +class SwiftTests(unittest.TestCase): + base_args = { + 'metric_queue': None, + 'swift_container': 'container-name', + 'storage_path': '/basepath', + 'auth_url': 'https://auth.com', + 'swift_user': 'root', + 'swift_password': 'password', + } + + + def test_fixed_path_concat(self): + swift = TestSwiftStorage(**self.base_args) + swift.exists('object/path') + swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path') + + + def test_simple_path_concat(self): + simple_concat_args = dict(self.base_args) + simple_concat_args['simple_path_concat'] = True + swift = TestSwiftStorage(**simple_concat_args) + swift.exists('object/path') + swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path') + + +if __name__ == '__main__': + unittest.main()