This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/__init__.py
Charlton Austin 3fd8c8a60d feature(app.py): adding queue_metrics to queues
publishing queue metrics for SRE

[none]
2017-02-14 16:01:28 -05:00

66 lines
2.5 KiB
Python

from storage.local import LocalStorage
from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage
from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
from storage.swift import SwiftStorage
from storage.downloadproxy import DownloadProxy
STORAGE_DRIVER_CLASSES = {
'LocalStorage': LocalStorage,
'S3Storage': S3Storage,
'GoogleCloudStorage': GoogleCloudStorage,
'RadosGWStorage': RadosGWStorage,
'SwiftStorage': SwiftStorage,
}
def get_storage_driver(location, metric_queue, chunk_cleanup_queue, storage_params):
""" Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """
driver = storage_params[0]
parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
context = StorageContext(location, metric_queue, chunk_cleanup_queue)
return driver_class(context, **parameters)
class StorageContext(object):
def __init__(self, location, metric_queue, chunk_cleanup_queue):
self.location = location
self.metric_queue = metric_queue
self.chunk_cleanup_queue = chunk_cleanup_queue
class Storage(object):
def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None):
self.app = app
if app is not None:
self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys)
else:
self.state = None
def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys):
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue,
storage_params)
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
if not preference:
preference = storages.keys()
default_locations = app.config.get('DISTRIBUTED_STORAGE_DEFAULT_LOCATIONS') or []
download_proxy = None
if app.config.get('FEATURE_PROXY_STORAGE', False) and instance_keys is not None:
download_proxy = DownloadProxy(app, instance_keys)
d_storage = DistributedStorage(storages, preference, default_locations, download_proxy)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['storage'] = d_storage
return d_storage
def __getattr__(self, name):
return getattr(self.state, name, None)