Add a test for swift path computation
This commit is contained in:
parent
c6d7eba98d
commit
909e7d45b7
7 changed files with 74 additions and 27 deletions
4
app.py
4
app.py
|
@ -131,7 +131,8 @@ Principal(app, use_sessions=False)
|
||||||
avatar = Avatar(app)
|
avatar = Avatar(app)
|
||||||
login_manager = LoginManager(app)
|
login_manager = LoginManager(app)
|
||||||
mail = Mail(app)
|
mail = Mail(app)
|
||||||
storage = Storage(app)
|
metric_queue = MetricQueue()
|
||||||
|
storage = Storage(app, metric_queue)
|
||||||
userfiles = Userfiles(app, storage)
|
userfiles = Userfiles(app, storage)
|
||||||
log_archive = LogArchive(app, storage)
|
log_archive = LogArchive(app, storage)
|
||||||
analytics = Analytics(app)
|
analytics = Analytics(app)
|
||||||
|
@ -142,7 +143,6 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY)
|
||||||
userevents = UserEventsBuilderModule(app)
|
userevents = UserEventsBuilderModule(app)
|
||||||
superusers = SuperUserManager(app)
|
superusers = SuperUserManager(app)
|
||||||
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
|
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
|
||||||
metric_queue = MetricQueue()
|
|
||||||
start_cloudwatch_sender(metric_queue, app)
|
start_cloudwatch_sender(metric_queue, app)
|
||||||
|
|
||||||
tf = app.config['DB_TRANSACTION_FACTORY']
|
tf = app.config['DB_TRANSACTION_FACTORY']
|
||||||
|
|
|
@ -13,27 +13,27 @@ STORAGE_DRIVER_CLASSES = {
|
||||||
'SwiftStorage': SwiftStorage,
|
'SwiftStorage': SwiftStorage,
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_storage_driver(storage_params):
|
def get_storage_driver(metric_queue, storage_params):
|
||||||
""" Returns a storage driver class for the given storage configuration
|
""" Returns a storage driver class for the given storage configuration
|
||||||
(a pair of string name and a dict of parameters). """
|
(a pair of string name and a dict of parameters). """
|
||||||
driver = storage_params[0]
|
driver = storage_params[0]
|
||||||
parameters = storage_params[1]
|
parameters = storage_params[1]
|
||||||
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
|
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
|
||||||
return driver_class(**parameters)
|
return driver_class(metric_queue, **parameters)
|
||||||
|
|
||||||
|
|
||||||
class Storage(object):
|
class Storage(object):
|
||||||
def __init__(self, app=None):
|
def __init__(self, app=None, metric_queue=None):
|
||||||
self.app = app
|
self.app = app
|
||||||
if app is not None:
|
if app is not None and metric_queue is not None:
|
||||||
self.state = self.init_app(app)
|
self.state = self.init_app(app, metric_queue)
|
||||||
else:
|
else:
|
||||||
self.state = None
|
self.state = None
|
||||||
|
|
||||||
def init_app(self, app):
|
def init_app(self, app, metric_queue):
|
||||||
storages = {}
|
storages = {}
|
||||||
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
|
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
|
||||||
storages[location] = get_storage_driver(storage_params)
|
storages[location] = get_storage_driver(metric_queue, storage_params)
|
||||||
|
|
||||||
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
|
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
|
||||||
if not preference:
|
if not preference:
|
||||||
|
|
|
@ -15,7 +15,6 @@ from collections import namedtuple
|
||||||
|
|
||||||
from util.registry import filelike
|
from util.registry import filelike
|
||||||
from storage.basestorage import BaseStorageV2, InvalidChunkException
|
from storage.basestorage import BaseStorageV2, InvalidChunkException
|
||||||
import app
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -48,8 +47,8 @@ class StreamReadKeyAsFile(BufferedIOBase):
|
||||||
|
|
||||||
|
|
||||||
class _CloudStorage(BaseStorageV2):
|
class _CloudStorage(BaseStorageV2):
|
||||||
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
|
def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params,
|
||||||
access_key, secret_key, bucket_name):
|
storage_path, access_key, secret_key, bucket_name):
|
||||||
super(_CloudStorage, self).__init__()
|
super(_CloudStorage, self).__init__()
|
||||||
|
|
||||||
self.automatic_chunk_size = 5 * 1024 * 1024
|
self.automatic_chunk_size = 5 * 1024 * 1024
|
||||||
|
@ -65,6 +64,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
self._connect_kwargs = connect_kwargs
|
self._connect_kwargs = connect_kwargs
|
||||||
self._cloud_conn = None
|
self._cloud_conn = None
|
||||||
self._cloud_bucket = None
|
self._cloud_bucket = None
|
||||||
|
self._metric_queue = metric_queue
|
||||||
|
|
||||||
def _initialize_cloud_conn(self):
|
def _initialize_cloud_conn(self):
|
||||||
if not self._initialized:
|
if not self._initialized:
|
||||||
|
@ -161,7 +161,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
if content_encoding is not None:
|
if content_encoding is not None:
|
||||||
metadata['Content-Encoding'] = content_encoding
|
metadata['Content-Encoding'] = content_encoding
|
||||||
|
|
||||||
app.metric_queue.put('MultipartUploadStart', 1)
|
self._metric_queue.put('MultipartUploadStart', 1)
|
||||||
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
||||||
**self._upload_params)
|
**self._upload_params)
|
||||||
|
|
||||||
|
@ -198,7 +198,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
except IOError as ex:
|
except IOError as ex:
|
||||||
logger.warn('stream write error: %s', ex)
|
logger.warn('stream write error: %s', ex)
|
||||||
error = ex
|
error = ex
|
||||||
app.metric_queue.put('MultipartUploadFailure', 1)
|
self._metric_queue.put('MultipartUploadFailure', 1)
|
||||||
if cancel_on_error:
|
if cancel_on_error:
|
||||||
mp.cancel_upload()
|
mp.cancel_upload()
|
||||||
return 0, error
|
return 0, error
|
||||||
|
@ -206,7 +206,7 @@ class _CloudStorage(BaseStorageV2):
|
||||||
break
|
break
|
||||||
|
|
||||||
if total_bytes_written > 0:
|
if total_bytes_written > 0:
|
||||||
app.metric_queue.put('MultipartUploadSuccess', 1)
|
self._metric_queue.put('MultipartUploadSuccess', 1)
|
||||||
mp.complete_upload()
|
mp.complete_upload()
|
||||||
return total_bytes_written, error
|
return total_bytes_written, error
|
||||||
|
|
||||||
|
@ -380,7 +380,8 @@ class _CloudStorage(BaseStorageV2):
|
||||||
|
|
||||||
|
|
||||||
class S3Storage(_CloudStorage):
|
class S3Storage(_CloudStorage):
|
||||||
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket, host=None):
|
def __init__(self, metric_queue, storage_path, s3_access_key, s3_secret_key, s3_bucket,
|
||||||
|
host=None):
|
||||||
upload_params = {
|
upload_params = {
|
||||||
'encrypt_key': True,
|
'encrypt_key': True,
|
||||||
}
|
}
|
||||||
|
@ -390,7 +391,7 @@ class S3Storage(_CloudStorage):
|
||||||
raise ValueError('host name must not start with http:// or https://')
|
raise ValueError('host name must not start with http:// or https://')
|
||||||
|
|
||||||
connect_kwargs['host'] = host
|
connect_kwargs['host'] = host
|
||||||
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key,
|
||||||
connect_kwargs, upload_params, storage_path, s3_access_key,
|
connect_kwargs, upload_params, storage_path, s3_access_key,
|
||||||
s3_secret_key, s3_bucket)
|
s3_secret_key, s3_bucket)
|
||||||
|
|
||||||
|
@ -414,12 +415,12 @@ class S3Storage(_CloudStorage):
|
||||||
</CORSConfiguration>""")
|
</CORSConfiguration>""")
|
||||||
|
|
||||||
class GoogleCloudStorage(_CloudStorage):
|
class GoogleCloudStorage(_CloudStorage):
|
||||||
def __init__(self, storage_path, access_key, secret_key, bucket_name):
|
def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name):
|
||||||
upload_params = {}
|
upload_params = {}
|
||||||
connect_kwargs = {}
|
connect_kwargs = {}
|
||||||
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
|
super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection,
|
||||||
connect_kwargs, upload_params, storage_path,
|
boto.gs.key.Key, connect_kwargs, upload_params,
|
||||||
access_key, secret_key, bucket_name)
|
storage_path, access_key, secret_key, bucket_name)
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
@ -474,16 +475,17 @@ class GoogleCloudStorage(_CloudStorage):
|
||||||
|
|
||||||
|
|
||||||
class RadosGWStorage(_CloudStorage):
|
class RadosGWStorage(_CloudStorage):
|
||||||
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
|
def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key,
|
||||||
|
bucket_name):
|
||||||
upload_params = {}
|
upload_params = {}
|
||||||
connect_kwargs = {
|
connect_kwargs = {
|
||||||
'host': hostname,
|
'host': hostname,
|
||||||
'is_secure': is_secure,
|
'is_secure': is_secure,
|
||||||
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
|
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
|
||||||
}
|
}
|
||||||
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection,
|
||||||
connect_kwargs, upload_params, storage_path, access_key,
|
boto.s3.key.Key, connect_kwargs, upload_params,
|
||||||
secret_key, bucket_name)
|
storage_path, access_key, secret_key, bucket_name)
|
||||||
|
|
||||||
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
||||||
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
||||||
|
|
|
@ -9,6 +9,9 @@ from storage.basestorage import BaseStorageV2
|
||||||
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
|
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
|
||||||
|
|
||||||
class FakeStorage(BaseStorageV2):
|
class FakeStorage(BaseStorageV2):
|
||||||
|
def __init__(self, metric_queue):
|
||||||
|
super(FakeStorage, self).__init__()
|
||||||
|
|
||||||
def _init_path(self, path=None, create=False):
|
def _init_path(self, path=None, create=False):
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LocalStorage(BaseStorageV2):
|
class LocalStorage(BaseStorageV2):
|
||||||
def __init__(self, storage_path):
|
def __init__(self, metric_queue, storage_path):
|
||||||
super(LocalStorage, self).__init__()
|
super(LocalStorage, self).__init__()
|
||||||
self._root_path = storage_path
|
self._root_path = storage_path
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ _SEGMENT_DIRECTORY = 'segments'
|
||||||
_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB
|
_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB
|
||||||
|
|
||||||
class SwiftStorage(BaseStorage):
|
class SwiftStorage(BaseStorage):
|
||||||
def __init__(self, swift_container, storage_path, auth_url, swift_user,
|
def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user,
|
||||||
swift_password, auth_version=None, os_options=None, ca_cert_path=None,
|
swift_password, auth_version=None, os_options=None, ca_cert_path=None,
|
||||||
temp_url_key=None, simple_path_concat=False):
|
temp_url_key=None, simple_path_concat=False):
|
||||||
super(SwiftStorage, self).__init__()
|
super(SwiftStorage, self).__init__()
|
||||||
|
|
42
test/test_swift.py
Normal file
42
test/test_swift.py
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from storage.swift import SwiftStorage
|
||||||
|
from mock import MagicMock
|
||||||
|
|
||||||
|
|
||||||
|
class TestSwiftStorage(SwiftStorage):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(TestSwiftStorage, self).__init__(*args, **kwargs)
|
||||||
|
self._connection = MagicMock()
|
||||||
|
|
||||||
|
def _get_connection(self):
|
||||||
|
return self._connection
|
||||||
|
|
||||||
|
|
||||||
|
class SwiftTests(unittest.TestCase):
|
||||||
|
base_args = {
|
||||||
|
'metric_queue': None,
|
||||||
|
'swift_container': 'container-name',
|
||||||
|
'storage_path': '/basepath',
|
||||||
|
'auth_url': 'https://auth.com',
|
||||||
|
'swift_user': 'root',
|
||||||
|
'swift_password': 'password',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_fixed_path_concat(self):
|
||||||
|
swift = TestSwiftStorage(**self.base_args)
|
||||||
|
swift.exists('object/path')
|
||||||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_path_concat(self):
|
||||||
|
simple_concat_args = dict(self.base_args)
|
||||||
|
simple_concat_args['simple_path_concat'] = True
|
||||||
|
swift = TestSwiftStorage(**simple_concat_args)
|
||||||
|
swift.exists('object/path')
|
||||||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Reference in a new issue