Merge pull request #2891 from coreos-inc/joseph.schorr/QS-38/refactor-storage

Small storage refactoring
This commit is contained in:
josephschorr 2017-10-30 17:40:07 -04:00 committed by GitHub
commit a6e3686c58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 11 deletions

View file

@ -4,6 +4,7 @@ from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage from storage.distributedstorage import DistributedStorage
from storage.swift import SwiftStorage from storage.swift import SwiftStorage
from storage.downloadproxy import DownloadProxy from storage.downloadproxy import DownloadProxy
from util.ipresolver import NoopIPResolver
STORAGE_DRIVER_CLASSES = { STORAGE_DRIVER_CLASSES = {
'LocalStorage': LocalStorage, 'LocalStorage': LocalStorage,
@ -15,13 +16,15 @@ STORAGE_DRIVER_CLASSES = {
} }
def get_storage_driver(location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver, storage_params): def get_storage_driver(location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver,
storage_params):
""" Returns a storage driver class for the given storage configuration """ Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """ (a pair of string name and a dict of parameters). """
driver = storage_params[0] driver = storage_params[0]
parameters = storage_params[1] parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
context = StorageContext(location, metric_queue, chunk_cleanup_queue, config_provider, ip_resolver) context = StorageContext(location, metric_queue, chunk_cleanup_queue, config_provider,
ip_resolver)
return driver_class(context, **parameters) return driver_class(context, **parameters)
@ -31,7 +34,7 @@ class StorageContext(object):
self.metric_queue = metric_queue self.metric_queue = metric_queue
self.chunk_cleanup_queue = chunk_cleanup_queue self.chunk_cleanup_queue = chunk_cleanup_queue
self.config_provider = config_provider self.config_provider = config_provider
self.ip_resolver = ip_resolver self.ip_resolver = ip_resolver or NoopIPResolver()
class Storage(object): class Storage(object):
@ -44,7 +47,8 @@ class Storage(object):
else: else:
self.state = None self.state = None
def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver): def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys, config_provider,
ip_resolver):
storages = {} storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue, storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue,

View file

@ -577,12 +577,13 @@ class RadosGWStorage(_CloudStorage):
storage_path, bucket_name, access_key, secret_key) storage_path, bucket_name, access_key, secret_key)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_download_url(self, path, request_ip=None, expires_in=60, requires_cors=False, head=False): def get_direct_download_url(self, path, request_ip=None, expires_in=60, requires_cors=False,
head=False):
if requires_cors: if requires_cors:
return None return None
return super(RadosGWStorage, self).get_direct_download_url(path, request_ip, expires_in, requires_cors, return super(RadosGWStorage, self).get_direct_download_url(path, request_ip, expires_in,
head) requires_cors, head)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_upload_url(self, path, mime_type, requires_cors=True): def get_direct_upload_url(self, path, mime_type, requires_cors=True):
@ -621,9 +622,9 @@ class CloudFrontedS3Storage(S3Storage):
resolved_ip_info = None resolved_ip_info = None
logger.debug('Got direct download request for path "%s" with IP "%s"', path, request_ip) logger.debug('Got direct download request for path "%s" with IP "%s"', path, request_ip)
if request_ip is not None and self._context.ip_resolver is not None: if request_ip is not None:
# Lookup the IP address in our resolution table and determine whether it is under AWS. If it is, # Lookup the IP address in our resolution table and determine whether it is under AWS.
# then return an S3 signed URL, since we are in-network. # If it is, then return an S3 signed URL, since we are in-network.
resolved_ip_info = self._context.ip_resolver.resolve_ip(request_ip) resolved_ip_info = self._context.ip_resolver.resolve_ip(request_ip)
logger.debug('Resolved IP information for IP %s: %s', request_ip, resolved_ip_info) logger.debug('Resolved IP information for IP %s: %s', request_ip, resolved_ip_info)
if resolved_ip_info and resolved_ip_info.provider == 'aws': if resolved_ip_info and resolved_ip_info.provider == 'aws':

View file

@ -2,6 +2,9 @@ import logging
import json import json
import requests import requests
from abc import ABCMeta, abstractmethod
from six import add_metaclass
from cachetools import ttl_cache, lru_cache from cachetools import ttl_cache, lru_cache
from collections import namedtuple, defaultdict from collections import namedtuple, defaultdict
from netaddr import IPNetwork, IPAddress, IPSet, AddrFormatError from netaddr import IPNetwork, IPAddress, IPSet, AddrFormatError
@ -9,6 +12,8 @@ from netaddr import IPNetwork, IPAddress, IPSet, AddrFormatError
import geoip2.database import geoip2.database
import geoip2.errors import geoip2.errors
from util.abchelpers import nooper
ResolvedLocation = namedtuple('ResolvedLocation', ['provider', 'region', 'service', 'sync_token']) ResolvedLocation = namedtuple('ResolvedLocation', ['provider', 'region', 'service', 'sync_token'])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,7 +33,25 @@ def update_resolver_datafiles():
f.write(response.text) f.write(response.text)
logger.debug('Successfully wrote %s', filename) logger.debug('Successfully wrote %s', filename)
class IPResolver(object):
@add_metaclass(ABCMeta)
class IPResolverInterface(object):
""" Helper class for resolving information about an IP address. """
@abstractmethod
def resolve_ip(self, ip_address):
""" Attempts to return resolved information about the specified IP Address. If such an attempt
fails, returns None.
"""
pass
@nooper
class NoopIPResolver(IPResolverInterface):
""" No-op version of the security scanner API. """
pass
class IPResolver(IPResolverInterface):
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.geoip_db = geoip2.database.Reader('util/ipresolver/GeoLite2-Country.mmdb') self.geoip_db = geoip2.database.Reader('util/ipresolver/GeoLite2-Country.mmdb')